mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-06 05:22:16 +08:00
199
docs/dev_guide/登录系统对接.md
Normal file
199
docs/dev_guide/登录系统对接.md
Normal file
@@ -0,0 +1,199 @@
|
|||||||
|
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
## 登录系统对接
|
||||||
|
|
||||||
|
[KnowStreaming](https://github.com/didi/KnowStreaming)(以下简称KS) 除了实现基于本地MySQL的用户登录认证方式外,还已经实现了基于Ldap的登录认证。
|
||||||
|
|
||||||
|
但是,登录认证系统并非仅此两种。因此,为了具有更好的拓展性,KS具有自定义登陆认证逻辑,快速对接已有系统的特性。
|
||||||
|
|
||||||
|
在KS中,我们将登陆认证相关的一些文件放在[km-extends](https://github.com/didi/KnowStreaming/tree/master/km-extends)模块下的[km-account](https://github.com/didi/KnowStreaming/tree/master/km-extends/km-account)模块里。
|
||||||
|
|
||||||
|
本文将介绍KS如何快速对接自有的用户登录认证系统。
|
||||||
|
|
||||||
|
### 对接步骤
|
||||||
|
|
||||||
|
- 创建一个登陆认证类,实现[LogiCommon](https://github.com/didi/LogiCommon)的LoginExtend接口;
|
||||||
|
- 将[application.yml](https://github.com/didi/KnowStreaming/blob/master/km-rest/src/main/resources/application.yml)中的spring.logi-security.login-extend-bean-name字段改为登陆认证类的bean名称;
|
||||||
|
|
||||||
|
```Java
|
||||||
|
//LoginExtend 接口
|
||||||
|
public interface LoginExtend {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 验证登录信息,同时记住登录状态
|
||||||
|
*/
|
||||||
|
UserBriefVO verifyLogin(AccountLoginDTO var1, HttpServletRequest var2, HttpServletResponse var3) throws LogiSecurityException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 登出接口,清楚登录状态
|
||||||
|
*/
|
||||||
|
Result<Boolean> logout(HttpServletRequest var1, HttpServletResponse var2);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 检查是否已经登录
|
||||||
|
*/
|
||||||
|
boolean interceptorCheck(HttpServletRequest var1, HttpServletResponse var2, String var3, List<String> var4) throws IOException;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
### 对接例子
|
||||||
|
|
||||||
|
我们以Ldap对接为例,说明KS如何对接登录认证系统。
|
||||||
|
|
||||||
|
+ 编写[LdapLoginServiceImpl](https://github.com/didi/KnowStreaming/blob/master/km-extends/km-account/src/main/java/com/xiaojukeji/know/streaming/km/account/login/ldap/LdapLoginServiceImpl.java)类,实现LoginExtend接口。
|
||||||
|
+ 设置[application.yml](https://github.com/didi/KnowStreaming/blob/master/km-rest/src/main/resources/application.yml)中的spring.logi-security.login-extend-bean-name=ksLdapLoginService。
|
||||||
|
|
||||||
|
完成上述两步即可实现KS对接Ldap认证登陆。
|
||||||
|
|
||||||
|
```Java
|
||||||
|
@Service("ksLdapLoginService")
|
||||||
|
public class LdapLoginServiceImpl implements LoginExtend {
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UserBriefVO verifyLogin(AccountLoginDTO loginDTO,
|
||||||
|
HttpServletRequest request,
|
||||||
|
HttpServletResponse response) throws LogiSecurityException {
|
||||||
|
String decodePasswd = AESUtils.decrypt(loginDTO.getPw());
|
||||||
|
|
||||||
|
// 去LDAP验证账密
|
||||||
|
LdapPrincipal ldapAttrsInfo = ldapAuthentication.authenticate(loginDTO.getUserName(), decodePasswd);
|
||||||
|
if (ldapAttrsInfo == null) {
|
||||||
|
// 用户不存在,正常来说上如果有问题,上一步会直接抛出异常
|
||||||
|
throw new LogiSecurityException(ResultCode.USER_NOT_EXISTS);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 进行业务相关操作
|
||||||
|
|
||||||
|
// 记录登录状态,Ldap因为无法记录登录状态,因此有KnowStreaming进行记录
|
||||||
|
initLoginContext(request, response, loginDTO.getUserName(), user.getId());
|
||||||
|
return CopyBeanUtil.copy(user, UserBriefVO.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result<Boolean> logout(HttpServletRequest request, HttpServletResponse response) {
|
||||||
|
|
||||||
|
//清理cookie和session
|
||||||
|
|
||||||
|
return Result.buildSucc(Boolean.TRUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean interceptorCheck(HttpServletRequest request, HttpServletResponse response, String requestMappingValue, List<String> whiteMappingValues) throws IOException {
|
||||||
|
|
||||||
|
// 检查是否已经登录
|
||||||
|
String userName = HttpRequestUtil.getOperator(request);
|
||||||
|
if (StringUtils.isEmpty(userName)) {
|
||||||
|
// 未登录,则进行登出
|
||||||
|
logout(request, response);
|
||||||
|
return Boolean.FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Boolean.TRUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
### 实现原理
|
||||||
|
|
||||||
|
因为登陆和登出整体实现逻辑是一致的,所以我们以登陆逻辑为例进行介绍。
|
||||||
|
|
||||||
|
+ 登陆原理
|
||||||
|
|
||||||
|
登陆走的是[LogiCommon](https://github.com/didi/LogiCommon)自带的LoginController。
|
||||||
|
|
||||||
|
```java
|
||||||
|
@RestController
|
||||||
|
public class LoginController {
|
||||||
|
|
||||||
|
|
||||||
|
//登陆接口
|
||||||
|
@PostMapping({"/login"})
|
||||||
|
public Result<UserBriefVO> login(HttpServletRequest request, HttpServletResponse response, @RequestBody AccountLoginDTO loginDTO) {
|
||||||
|
try {
|
||||||
|
//登陆认证
|
||||||
|
UserBriefVO userBriefVO = this.loginService.verifyLogin(loginDTO, request, response);
|
||||||
|
return Result.success(userBriefVO);
|
||||||
|
|
||||||
|
} catch (LogiSecurityException var5) {
|
||||||
|
return Result.fail(var5);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
而登陆操作是调用LoginServiceImpl类来实现,但是具体由哪个登陆认证类来执行登陆操作却由loginExtendBeanTool来指定。
|
||||||
|
|
||||||
|
```java
|
||||||
|
//LoginServiceImpl类
|
||||||
|
@Service
|
||||||
|
public class LoginServiceImpl implements LoginService {
|
||||||
|
|
||||||
|
//实现登陆操作,但是具体哪个登陆类由loginExtendBeanTool来管理
|
||||||
|
public UserBriefVO verifyLogin(AccountLoginDTO loginDTO, HttpServletRequest request, HttpServletResponse response) throws LogiSecurityException {
|
||||||
|
|
||||||
|
return this.loginExtendBeanTool.getLoginExtendImpl().verifyLogin(loginDTO, request, response);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
而loginExtendBeanTool类会优先去查找用户指定的登陆认证类,如果失败则调用默认的登陆认证函数。
|
||||||
|
|
||||||
|
```java
|
||||||
|
//LoginExtendBeanTool类
|
||||||
|
@Component("logiSecurityLoginExtendBeanTool")
|
||||||
|
public class LoginExtendBeanTool {
|
||||||
|
|
||||||
|
public LoginExtend getLoginExtendImpl() {
|
||||||
|
LoginExtend loginExtend;
|
||||||
|
//先调用用户指定登陆类,如果失败则调用系统默认登陆认证
|
||||||
|
try {
|
||||||
|
//调用的类由spring.logi-security.login-extend-bean-name指定
|
||||||
|
loginExtend = this.getCustomLoginExtendImplBean();
|
||||||
|
} catch (UnsupportedOperationException var3) {
|
||||||
|
loginExtend = this.getDefaultLoginExtendImplBean();
|
||||||
|
}
|
||||||
|
|
||||||
|
return loginExtend;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
+ 认证原理
|
||||||
|
|
||||||
|
认证的实现则比较简单,向Spring中注册我们的拦截器PermissionInterceptor。
|
||||||
|
|
||||||
|
拦截器会调用LoginServiceImpl类的拦截方法,LoginServiceImpl后续处理逻辑就和前面登陆是一致的。
|
||||||
|
|
||||||
|
```java
|
||||||
|
public class PermissionInterceptor implements HandlerInterceptor {
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 拦截预处理
|
||||||
|
* @return boolean false:拦截, 不向下执行, true:放行
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
|
||||||
|
|
||||||
|
//免登录相关校验,如果验证通过,提前返回
|
||||||
|
|
||||||
|
//走拦截函数,进行普通用户验证
|
||||||
|
return loginService.interceptorCheck(request, response, classRequestMappingValue, whiteMappingValues);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
@@ -20,6 +20,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewV
|
|||||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
|
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
|
||||||
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||||
@@ -171,7 +172,7 @@ public class GroupManagerImpl implements GroupManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!ConsumerGroupState.EMPTY.equals(description.state()) && !ConsumerGroupState.DEAD.equals(description.state())) {
|
if (!ConsumerGroupState.EMPTY.equals(description.state()) && !ConsumerGroupState.DEAD.equals(description.state())) {
|
||||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, String.format("group处于%s, 重置失败(仅Empty情况可重置)", description.state().name()));
|
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, String.format("group处于%s, 重置失败(仅Empty情况可重置)", GroupStateEnum.getByRawState(description.state()).getState()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取offset
|
// 获取offset
|
||||||
|
|||||||
@@ -0,0 +1,15 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.common.bean.event.cluster;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 集群新增事件
|
||||||
|
* @author zengqiao
|
||||||
|
* @date 22/02/25
|
||||||
|
*/
|
||||||
|
@Getter
|
||||||
|
public class ClusterPhyAddedEvent extends ClusterPhyBaseEvent {
|
||||||
|
public ClusterPhyAddedEvent(Object source, Long clusterPhyId) {
|
||||||
|
super(source, clusterPhyId);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,26 +0,0 @@
|
|||||||
package com.xiaojukeji.know.streaming.km.common.bean.event.kafka.zk;
|
|
||||||
|
|
||||||
import lombok.Getter;
|
|
||||||
|
|
||||||
@Getter
|
|
||||||
public abstract class BaseKafkaZKEvent {
|
|
||||||
/**
|
|
||||||
* 触发时间
|
|
||||||
*/
|
|
||||||
protected Long eventTime;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 初始化数据的事件
|
|
||||||
*/
|
|
||||||
protected Boolean initEvent;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 集群ID
|
|
||||||
*/
|
|
||||||
protected Long clusterPhyId;
|
|
||||||
|
|
||||||
protected BaseKafkaZKEvent(Long eventTime, Long clusterPhyId) {
|
|
||||||
this.eventTime = eventTime;
|
|
||||||
this.clusterPhyId = clusterPhyId;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,10 +0,0 @@
|
|||||||
package com.xiaojukeji.know.streaming.km.common.bean.event.kafka.zk;
|
|
||||||
|
|
||||||
import lombok.Getter;
|
|
||||||
|
|
||||||
@Getter
|
|
||||||
public class ControllerChangeEvent extends BaseKafkaZKEvent {
|
|
||||||
public ControllerChangeEvent(Long eventTime, Long clusterPhyId) {
|
|
||||||
super(eventTime, clusterPhyId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -170,6 +170,7 @@ public class ReassignConverter {
|
|||||||
detail.setOriginalBrokerIdList(CommonUtils.string2IntList(subJobPO.getOriginalBrokerIds()));
|
detail.setOriginalBrokerIdList(CommonUtils.string2IntList(subJobPO.getOriginalBrokerIds()));
|
||||||
detail.setReassignBrokerIdList(CommonUtils.string2IntList(subJobPO.getReassignBrokerIds()));
|
detail.setReassignBrokerIdList(CommonUtils.string2IntList(subJobPO.getReassignBrokerIds()));
|
||||||
detail.setStatus(subJobPO.getStatus());
|
detail.setStatus(subJobPO.getStatus());
|
||||||
|
detail.setOldReplicaNum(detail.getOriginalBrokerIdList().size());
|
||||||
|
|
||||||
ReassignSubJobExtendData extendData = ConvertUtil.str2ObjByJson(subJobPO.getExtendData(), ReassignSubJobExtendData.class);
|
ReassignSubJobExtendData extendData = ConvertUtil.str2ObjByJson(subJobPO.getExtendData(), ReassignSubJobExtendData.class);
|
||||||
if (extendData != null) {
|
if (extendData != null) {
|
||||||
@@ -217,6 +218,7 @@ public class ReassignConverter {
|
|||||||
|
|
||||||
topicDetail.setPresentReplicaNum(partitionDetailList.get(0).getPresentReplicaNum());
|
topicDetail.setPresentReplicaNum(partitionDetailList.get(0).getPresentReplicaNum());
|
||||||
topicDetail.setNewReplicaNum(partitionDetailList.get(0).getNewReplicaNum());
|
topicDetail.setNewReplicaNum(partitionDetailList.get(0).getNewReplicaNum());
|
||||||
|
topicDetail.setOldReplicaNum(partitionDetailList.get(0).getOldReplicaNum());
|
||||||
topicDetail.setOriginalRetentionTimeUnitMs(partitionDetailList.get(0).getOriginalRetentionTimeUnitMs());
|
topicDetail.setOriginalRetentionTimeUnitMs(partitionDetailList.get(0).getOriginalRetentionTimeUnitMs());
|
||||||
topicDetail.setReassignRetentionTimeUnitMs(partitionDetailList.get(0).getReassignRetentionTimeUnitMs());
|
topicDetail.setReassignRetentionTimeUnitMs(partitionDetailList.get(0).getReassignRetentionTimeUnitMs());
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,9 @@ import com.didiglobal.logi.security.common.dto.oplog.OplogDTO;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO;
|
import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
|
||||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
|
||||||
@@ -106,6 +108,8 @@ public class ClusterPhyServiceImpl implements ClusterPhyService {
|
|||||||
|
|
||||||
log.info("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster finished", clusterPhyPO.getId(), operator);
|
log.info("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster finished", clusterPhyPO.getId(), operator);
|
||||||
|
|
||||||
|
// 发布添加集群事件
|
||||||
|
SpringTool.publish(new ClusterPhyAddedEvent(this, clusterPhyPO.getId()));
|
||||||
return clusterPhyPO.getId();
|
return clusterPhyPO.getId();
|
||||||
} catch (DuplicateKeyException dke) {
|
} catch (DuplicateKeyException dke) {
|
||||||
log.warn("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=duplicate data", clusterPhyPO.getId(), operator);
|
log.warn("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=duplicate data", clusterPhyPO.getId(), operator);
|
||||||
|
|||||||
@@ -60,4 +60,7 @@ public interface ReassignJobService {
|
|||||||
* 依据任务状态或者其中一个任务ID
|
* 依据任务状态或者其中一个任务ID
|
||||||
*/
|
*/
|
||||||
Long getOneRunningJobId(Long clusterPhyId);
|
Long getOneRunningJobId(Long clusterPhyId);
|
||||||
|
|
||||||
|
|
||||||
|
Result<Void> preferredReplicaElection(Long jobId);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
|||||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
|
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
|
||||||
|
import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignJobService;
|
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignJobService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignService;
|
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignService;
|
||||||
@@ -85,6 +86,9 @@ public class ReassignJobServiceImpl implements ReassignJobService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private TopicConfigService topicConfigService;
|
private TopicConfigService topicConfigService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private OpPartitionService opPartitionService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional
|
@Transactional
|
||||||
public Result<Void> create(Long jobId, ReplaceReassignJob replaceReassignJob, String creator) {
|
public Result<Void> create(Long jobId, ReplaceReassignJob replaceReassignJob, String creator) {
|
||||||
@@ -343,6 +347,7 @@ public class ReassignJobServiceImpl implements ReassignJobService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@Transactional
|
||||||
public Result<Void> verifyAndUpdateStatue(Long jobId) {
|
public Result<Void> verifyAndUpdateStatue(Long jobId) {
|
||||||
if (jobId == null) {
|
if (jobId == null) {
|
||||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, MsgConstant.getJobIdCanNotNull());
|
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, MsgConstant.getJobIdCanNotNull());
|
||||||
@@ -379,7 +384,18 @@ public class ReassignJobServiceImpl implements ReassignJobService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 更新任务状态
|
// 更新任务状态
|
||||||
return this.checkAndSetSuccessIfFinished(jobPO, rrr.getData());
|
Result<Void> result = this.checkAndSetSuccessIfFinished(jobPO, rrr.getData());
|
||||||
|
if (!result.hasData()){
|
||||||
|
return Result.buildFromIgnoreData(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
//已完成
|
||||||
|
rv = this.preferredReplicaElection(jobId);
|
||||||
|
if (rv.failed()){
|
||||||
|
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
|
||||||
|
}
|
||||||
|
|
||||||
|
return Result.buildSuc();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -466,6 +482,37 @@ public class ReassignJobServiceImpl implements ReassignJobService {
|
|||||||
return subPOList.get(0).getJobId();
|
return subPOList.get(0).getJobId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result<Void> preferredReplicaElection(Long jobId) {
|
||||||
|
// 获取任务
|
||||||
|
ReassignJobPO jobPO = reassignJobDAO.selectById(jobId);
|
||||||
|
if (jobPO == null) {
|
||||||
|
// 任务不存在
|
||||||
|
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getJobNotExist(jobId));
|
||||||
|
}
|
||||||
|
if (!JobStatusEnum.isFinished(jobPO.getStatus())){
|
||||||
|
return Result.buildSuc();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取子任务
|
||||||
|
List<ReassignSubJobPO> subJobPOList = this.getSubJobsByJobId(jobId);
|
||||||
|
List<TopicPartition> topicPartitions = new ArrayList<>();
|
||||||
|
subJobPOList.stream().forEach(reassignPO -> {
|
||||||
|
Integer targetLeader = CommonUtils.string2IntList(reassignPO.getReassignBrokerIds()).get(0);
|
||||||
|
Integer originalLeader = CommonUtils.string2IntList(reassignPO.getOriginalBrokerIds()).get(0);
|
||||||
|
//替换过leader的添加到优先副本选举任务列表
|
||||||
|
if (!originalLeader.equals(targetLeader)){
|
||||||
|
topicPartitions.add(new TopicPartition(reassignPO.getTopicName(), reassignPO.getPartitionId()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!topicPartitions.isEmpty()){
|
||||||
|
return opPartitionService.preferredReplicaElection(jobPO.getClusterPhyId(), topicPartitions);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Result.buildSuc();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**************************************************** private method ****************************************************/
|
/**************************************************** private method ****************************************************/
|
||||||
|
|
||||||
@@ -510,7 +557,8 @@ public class ReassignJobServiceImpl implements ReassignJobService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private Result<Void> checkAndSetSuccessIfFinished(ReassignJobPO jobPO, ReassignResult reassignmentResult) {
|
@Transactional
|
||||||
|
public Result<Void> checkAndSetSuccessIfFinished(ReassignJobPO jobPO, ReassignResult reassignmentResult) {
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
|
|
||||||
boolean existNotFinished = false;
|
boolean existNotFinished = false;
|
||||||
|
|||||||
@@ -1,16 +1,16 @@
|
|||||||
apiVersion: v2
|
apiVersion: v2
|
||||||
name: knowstreaming-manager
|
name: knowstreaming-manager
|
||||||
description: A Helm chart for Kubernetes
|
description: knowstreaming-manager Helm chart
|
||||||
|
|
||||||
type: application
|
type: application
|
||||||
|
|
||||||
version: 0.1.0
|
version: 0.1.3
|
||||||
|
|
||||||
maintainers:
|
maintainers:
|
||||||
- email: didicloud@didiglobal.com
|
- email: didicloud@didiglobal.com
|
||||||
name: didicloud
|
name: didicloud
|
||||||
|
|
||||||
appVersion: "1.0.0"
|
appVersion: "3.0.0-beta.1"
|
||||||
|
|
||||||
dependencies:
|
dependencies:
|
||||||
- name: knowstreaming-web
|
- name: knowstreaming-web
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ spec:
|
|||||||
{{- include "ksmysql.selectorLabels" . | nindent 8 }}
|
{{- include "ksmysql.selectorLabels" . | nindent 8 }}
|
||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
- image: knowstreaming/knowstreaming-mysql:latest
|
- image: knowstreaming/knowstreaming-mysql:0.1.0
|
||||||
name: {{ .Chart.Name }}
|
name: {{ .Chart.Name }}
|
||||||
env:
|
env:
|
||||||
- name: MYSQL_DATABASE
|
- name: MYSQL_DATABASE
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ replicaCount: 2
|
|||||||
image:
|
image:
|
||||||
repository: knowstreaming/knowstreaming-manager
|
repository: knowstreaming/knowstreaming-manager
|
||||||
pullPolicy: IfNotPresent
|
pullPolicy: IfNotPresent
|
||||||
tag: "latest"
|
tag: "0.1.0"
|
||||||
|
|
||||||
imagePullSecrets: []
|
imagePullSecrets: []
|
||||||
nameOverride: ""
|
nameOverride: ""
|
||||||
@@ -73,7 +73,7 @@ knowstreaming-web:
|
|||||||
image:
|
image:
|
||||||
repository: knowstreaming/knowstreaming-ui
|
repository: knowstreaming/knowstreaming-ui
|
||||||
pullPolicy: IfNotPresent
|
pullPolicy: IfNotPresent
|
||||||
tag: "latest"
|
tag: "0.1.0"
|
||||||
|
|
||||||
service:
|
service:
|
||||||
type: NodePort
|
type: NodePort
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ public class KmAccountConfig {
|
|||||||
|
|
||||||
/**************************************************** Ldap 用户注册到KM相关 ****************************************************/
|
/**************************************************** Ldap 用户注册到KM相关 ****************************************************/
|
||||||
|
|
||||||
@Value(value = "${account.ldap.auth-user-registration-role:0,1,2}")
|
@Value(value = "${account.ldap.auth-user-registration-role:1677}")
|
||||||
private String authUserRegistrationRole; // ldap自动注册的默认角色。请注意:它通常来说都是低权限角色
|
private String authUserRegistrationRole; // ldap自动注册的默认角色。请注意:它通常来说都是低权限角色
|
||||||
|
|
||||||
@Value(value = "${account.ldap.auth-user-registration:false}")
|
@Value(value = "${account.ldap.auth-user-registration:false}")
|
||||||
|
|||||||
@@ -60,9 +60,15 @@ thread-pool:
|
|||||||
suitable-queue-size: 1000 # 线程池理想的队列大小,非主要,可不修改
|
suitable-queue-size: 1000 # 线程池理想的队列大小,非主要,可不修改
|
||||||
|
|
||||||
task: # 任务模块的配置
|
task: # 任务模块的配置
|
||||||
heaven: # 采集任务配置
|
metrics: # metrics采集任务配置
|
||||||
thread-num: 20 # 采集任务线程池核心线程数
|
thread-num: 18 # metrics采集任务线程池核心线程数
|
||||||
queue-size: 1000 # 采集任务线程池队列大小
|
queue-size: 180 # metrics采集任务线程池队列大小
|
||||||
|
metadata: # metadata同步任务配置
|
||||||
|
thread-num: 27 # metadata同步任务线程池核心线程数
|
||||||
|
queue-size: 270 # metadata同步任务线程池队列大小
|
||||||
|
common: # 剩余其他任务配置
|
||||||
|
thread-num: 15 # 剩余其他任务线程池核心线程数
|
||||||
|
queue-size: 150 # 剩余其他任务线程池队列大小
|
||||||
|
|
||||||
|
|
||||||
# 客户端池大小相关配置
|
# 客户端池大小相关配置
|
||||||
|
|||||||
@@ -0,0 +1,46 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.task;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.job.common.TaskResult;
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
|
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* other相关任务
|
||||||
|
*/
|
||||||
|
public abstract class AbstractAsyncCommonDispatchTask extends AbstractClusterPhyDispatchTask {
|
||||||
|
private static final ILog log = LogFactory.getLog(AbstractAsyncCommonDispatchTask.class);
|
||||||
|
|
||||||
|
public abstract TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TaskThreadPoolService taskThreadPoolService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||||
|
return this.asyncProcessSubTask(clusterPhy, triggerTimeUnitMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TaskResult asyncProcessSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||||
|
taskThreadPoolService.submitCommonTask(
|
||||||
|
String.format("taskName=%s||clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||||
|
this.timeoutUnitSec.intValue() * 1000,
|
||||||
|
() -> {
|
||||||
|
try {
|
||||||
|
TaskResult tr = this.processClusterTask(clusterPhy, triggerTimeUnitMs);
|
||||||
|
if (TaskResult.SUCCESS_CODE != tr.getCode()) {
|
||||||
|
log.error("class=AbstractAsyncCommonDispatchTask||taskName={}||clusterPhyId={}||taskResult={}||msg=failed", this.taskName, clusterPhy.getId(), tr);
|
||||||
|
} else {
|
||||||
|
log.debug("class=AbstractAsyncCommonDispatchTask||taskName={}||clusterPhyId={}||msg=success", this.taskName, clusterPhy.getId());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("class=AbstractAsyncCommonDispatchTask||taskName={}||clusterPhyId={}||errMsg=exception", this.taskName, clusterPhy.getId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
return TaskResult.SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -130,4 +130,12 @@ public abstract class AbstractDispatchTask<E extends Comparable & EntifyIdInterf
|
|||||||
}
|
}
|
||||||
return allTaskList.subList(idx * count, Math.min(idx * count + count, allTaskList.size()));
|
return allTaskList.subList(idx * count, Math.min(idx * count + count, allTaskList.size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getTaskName() {
|
||||||
|
return taskName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getTimeoutUnitSec() {
|
||||||
|
return timeoutUnitSec;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
|
|||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
@Task(name = "CheckJmxClientTask",
|
@Task(name = "CheckJmxClientTask",
|
||||||
description = "检查Jmx客户端,",
|
description = "检查Jmx客户端",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
timeout = 2 * 60,
|
timeout = 2 * 60,
|
||||||
|
|||||||
@@ -15,8 +15,7 @@ import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimension
|
|||||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
|
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
|
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask;
|
||||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -31,12 +30,9 @@ import java.util.*;
|
|||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
|
public class HealthCheckTask extends AbstractAsyncMetricsDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(HealthCheckTask.class);
|
private static final ILog log = LogFactory.getLog(HealthCheckTask.class);
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private TaskThreadPoolService taskThreadPoolService;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private HealthCheckResultService healthCheckResultService;
|
private HealthCheckResultService healthCheckResultService;
|
||||||
|
|
||||||
@@ -45,17 +41,11 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
|
|||||||
);
|
);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||||
taskThreadPoolService.submitHeavenTask(
|
return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs);
|
||||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
|
||||||
100000,
|
|
||||||
() -> this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs)
|
|
||||||
);
|
|
||||||
|
|
||||||
return TaskResult.SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||||
// 获取配置,<配置名,配置信息>
|
// 获取配置,<配置名,配置信息>
|
||||||
Map<String, BaseClusterHealthConfig> healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId());
|
Map<String, BaseClusterHealthConfig> healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId());
|
||||||
|
|
||||||
@@ -91,6 +81,8 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
|
log.error("method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return TaskResult.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<HealthCheckResult> getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map<String, BaseClusterHealthConfig> healthConfigMap) {
|
private List<HealthCheckResult> getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map<String, BaseClusterHealthConfig> healthConfigMap) {
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import com.didiglobal.logi.log.LogFactory;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignJobService;
|
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignJobService;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
import com.xiaojukeji.know.streaming.km.task.AbstractAsyncCommonDispatchTask;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
@Task(name = "CommunityReassignJobTask",
|
@Task(name = "CommunityReassignJobTask",
|
||||||
@@ -17,14 +17,14 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 6 * 60)
|
timeout = 6 * 60)
|
||||||
public class CommunityReassignJobTask extends AbstractClusterPhyDispatchTask {
|
public class CommunityReassignJobTask extends AbstractAsyncCommonDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(CommunityReassignJobTask.class);
|
private static final ILog log = LogFactory.getLog(CommunityReassignJobTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ReassignJobService reassignJobService;
|
private ReassignJobService reassignJobService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||||
// 获取迁移中的任务
|
// 获取迁移中的任务
|
||||||
Long jobId = reassignJobService.getOneRunningJobId(clusterPhy.getId());
|
Long jobId = reassignJobService.getOneRunningJobId(clusterPhy.getId());
|
||||||
if (jobId == null) {
|
if (jobId == null) {
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import com.didiglobal.logi.job.common.TaskResult;
|
|||||||
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.job.JobService;
|
import com.xiaojukeji.know.streaming.km.core.service.job.JobService;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
import com.xiaojukeji.know.streaming.km.task.AbstractAsyncCommonDispatchTask;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
@Task(name = "kmJobTask",
|
@Task(name = "kmJobTask",
|
||||||
@@ -14,13 +14,13 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 6 * 60)
|
timeout = 6 * 60)
|
||||||
public class KMJobTask extends AbstractClusterPhyDispatchTask {
|
public class KMJobTask extends AbstractAsyncCommonDispatchTask {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private JobService jobService;
|
private JobService jobService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||||
jobService.scheduleJobByClusterId(clusterPhy.getId());
|
jobService.scheduleJobByClusterId(clusterPhy.getId());
|
||||||
return TaskResult.SUCCESS;
|
return TaskResult.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,47 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.task.metadata;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.job.common.TaskResult;
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
|
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||||
|
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 元数据同步相关任务
|
||||||
|
*/
|
||||||
|
public abstract class AbstractAsyncMetadataDispatchTask extends AbstractClusterPhyDispatchTask {
|
||||||
|
private static final ILog log = LogFactory.getLog(AbstractAsyncMetadataDispatchTask.class);
|
||||||
|
|
||||||
|
public abstract TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TaskThreadPoolService taskThreadPoolService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||||
|
return this.asyncProcessSubTask(clusterPhy, triggerTimeUnitMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TaskResult asyncProcessSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||||
|
taskThreadPoolService.submitMetadataTask(
|
||||||
|
String.format("taskName=%s||clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||||
|
this.timeoutUnitSec.intValue() * 1000,
|
||||||
|
() -> {
|
||||||
|
try {
|
||||||
|
TaskResult tr = this.processClusterTask(clusterPhy, triggerTimeUnitMs);
|
||||||
|
if (TaskResult.SUCCESS_CODE != tr.getCode()) {
|
||||||
|
log.error("class=AbstractAsyncMetadataDispatchTask||taskName={}||clusterPhyId={}||taskResult={}||msg=failed", this.taskName, clusterPhy.getId(), tr);
|
||||||
|
} else {
|
||||||
|
log.debug("class=AbstractAsyncMetadataDispatchTask||taskName={}||clusterPhyId={}||msg=success", this.taskName, clusterPhy.getId());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("class=AbstractAsyncMetadataDispatchTask||taskName={}||clusterPhyId={}||errMsg=exception", this.taskName, clusterPhy.getId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
return TaskResult.SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -13,7 +13,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerConfigPO;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigDiffTypeEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigDiffTypeEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerConfigService;
|
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerConfigService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
@@ -25,12 +24,12 @@ import java.util.stream.Collectors;
|
|||||||
* @date 22/02/25
|
* @date 22/02/25
|
||||||
*/
|
*/
|
||||||
@Task(name = "SyncBrokerConfigDiffTask",
|
@Task(name = "SyncBrokerConfigDiffTask",
|
||||||
description = "Broker配置的Diff信息同步到DB,",
|
description = "Broker配置的Diff信息同步到DB",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 6 * 60)
|
timeout = 6 * 60)
|
||||||
public class SyncBrokerConfigDiffTask extends AbstractClusterPhyDispatchTask {
|
public class SyncBrokerConfigDiffTask extends AbstractAsyncMetadataDispatchTask {
|
||||||
protected static final ILog log = LogFactory.getLog(SyncBrokerConfigDiffTask.class);
|
protected static final ILog log = LogFactory.getLog(SyncBrokerConfigDiffTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@@ -40,7 +39,7 @@ public class SyncBrokerConfigDiffTask extends AbstractClusterPhyDispatchTask {
|
|||||||
private BrokerConfigService brokerConfigService;
|
private BrokerConfigService brokerConfigService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||||
// <configName, <BrokerId, ConfigValue>>
|
// <configName, <BrokerId, ConfigValue>>
|
||||||
Map<String, Map<Integer, String>> allConfigMap = new HashMap<>();
|
Map<String, Map<Integer, String>> allConfigMap = new HashMap<>();
|
||||||
|
|
||||||
|
|||||||
@@ -9,25 +9,24 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@Task(name = "SyncBrokerTask",
|
@Task(name = "SyncBrokerTask",
|
||||||
description = "Broker信息同步到DB,",
|
description = "Broker信息同步到DB",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class SyncBrokerTask extends AbstractClusterPhyDispatchTask {
|
public class SyncBrokerTask extends AbstractAsyncMetadataDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(SyncBrokerTask.class);
|
private static final ILog log = LogFactory.getLog(SyncBrokerTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private BrokerService brokerService;
|
private BrokerService brokerService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||||
Result<List<Broker>> brokersResult = brokerService.listBrokersFromKafka(clusterPhy);
|
Result<List<Broker>> brokersResult = brokerService.listBrokersFromKafka(clusterPhy);
|
||||||
if (brokersResult.failed()) {
|
if (brokersResult.failed()) {
|
||||||
return new TaskResult(TaskResult.FAIL_CODE, brokersResult.getMessage());
|
return new TaskResult(TaskResult.FAIL_CODE, brokersResult.getMessage());
|
||||||
|
|||||||
@@ -9,24 +9,23 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
|
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
|
|
||||||
@Task(name = "SyncControllerTask",
|
@Task(name = "SyncControllerTask",
|
||||||
description = "Controller信息同步到DB,",
|
description = "Controller信息同步到DB",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class SyncControllerTask extends AbstractClusterPhyDispatchTask {
|
public class SyncControllerTask extends AbstractAsyncMetadataDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(SyncControllerTask.class);
|
private static final ILog log = LogFactory.getLog(SyncControllerTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private KafkaControllerService kafkaControllerService;
|
private KafkaControllerService kafkaControllerService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||||
Result<KafkaController> controllerResult = kafkaControllerService.getControllerFromKafka(clusterPhy);
|
Result<KafkaController> controllerResult = kafkaControllerService.getControllerFromKafka(clusterPhy);
|
||||||
if (controllerResult.failed()) {
|
if (controllerResult.failed()) {
|
||||||
return new TaskResult(TaskResult.FAIL_CODE, controllerResult.getMessage());
|
return new TaskResult(TaskResult.FAIL_CODE, controllerResult.getMessage());
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.converter.KafkaAclConverter;
|
import com.xiaojukeji.know.streaming.km.common.converter.KafkaAclConverter;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
|
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.acl.OpKafkaAclService;
|
import com.xiaojukeji.know.streaming.km.core.service.acl.OpKafkaAclService;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import org.apache.kafka.common.acl.AclBinding;
|
import org.apache.kafka.common.acl.AclBinding;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
@@ -19,12 +18,12 @@ import java.util.List;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Task(name = "SyncKafkaAclTask",
|
@Task(name = "SyncKafkaAclTask",
|
||||||
description = "KafkaAcl信息同步到DB,",
|
description = "KafkaAcl信息同步到DB",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class SyncKafkaAclTask extends AbstractClusterPhyDispatchTask {
|
public class SyncKafkaAclTask extends AbstractAsyncMetadataDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(SyncKafkaAclTask.class);
|
private static final ILog log = LogFactory.getLog(SyncKafkaAclTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@@ -34,7 +33,7 @@ public class SyncKafkaAclTask extends AbstractClusterPhyDispatchTask {
|
|||||||
private OpKafkaAclService opKafkaAclService;
|
private OpKafkaAclService opKafkaAclService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||||
Result<List<AclBinding>> aclBindingListResult = kafkaAclService.getAclFromKafka(clusterPhy.getId());
|
Result<List<AclBinding>> aclBindingListResult = kafkaAclService.getAclFromKafka(clusterPhy.getId());
|
||||||
if (aclBindingListResult.failed()) {
|
if (aclBindingListResult.failed()) {
|
||||||
return TaskResult.FAIL;
|
return TaskResult.FAIL;
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
|
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import org.apache.kafka.clients.admin.*;
|
import org.apache.kafka.clients.admin.*;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -22,19 +21,19 @@ import java.util.stream.Collectors;
|
|||||||
|
|
||||||
|
|
||||||
@Task(name = "SyncKafkaGroupTask",
|
@Task(name = "SyncKafkaGroupTask",
|
||||||
description = "KafkaGroup信息同步到DB,",
|
description = "KafkaGroup信息同步到DB",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class SyncKafkaGroupTask extends AbstractClusterPhyDispatchTask {
|
public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(SyncKafkaGroupTask.class);
|
private static final ILog log = LogFactory.getLog(SyncKafkaGroupTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private GroupService groupService;
|
private GroupService groupService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||||
TaskResult tr = TaskResult.SUCCESS;
|
TaskResult tr = TaskResult.SUCCESS;
|
||||||
|
|
||||||
List<String> groupNameList = groupService.listGroupsFromKafka(clusterPhy.getId());
|
List<String> groupNameList = groupService.listGroupsFromKafka(clusterPhy.getId());
|
||||||
|
|||||||
@@ -9,26 +9,25 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkauser.KafkaUser;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkauser.KafkaUser;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.kafkauser.KafkaUserService;
|
import com.xiaojukeji.know.streaming.km.core.service.kafkauser.KafkaUserService;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Task(name = "SyncKafkaUserTask",
|
@Task(name = "SyncKafkaUserTask",
|
||||||
description = "KafkaUser信息同步到DB,",
|
description = "KafkaUser信息同步到DB",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class SyncKafkaUserTask extends AbstractClusterPhyDispatchTask {
|
public class SyncKafkaUserTask extends AbstractAsyncMetadataDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(SyncKafkaUserTask.class);
|
private static final ILog log = LogFactory.getLog(SyncKafkaUserTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private KafkaUserService kafkaUserService;
|
private KafkaUserService kafkaUserService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||||
// 查询KafkaUser数据
|
// 查询KafkaUser数据
|
||||||
Result<List<KafkaUser>> kafkaUserResult = kafkaUserService.getKafkaUserFromKafka(clusterPhy.getId());
|
Result<List<KafkaUser>> kafkaUserResult = kafkaUserService.getKafkaUserFromKafka(clusterPhy.getId());
|
||||||
if (kafkaUserResult.failed()) {
|
if (kafkaUserResult.failed()) {
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.po.partition.PartitionPO;
|
import com.xiaojukeji.know.streaming.km.common.bean.po.partition.PartitionPO;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@@ -19,19 +18,19 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@Task(name = "SyncPartitionTask",
|
@Task(name = "SyncPartitionTask",
|
||||||
description = "Partition信息同步到DB,",
|
description = "Partition信息同步到DB",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class SyncPartitionTask extends AbstractClusterPhyDispatchTask {
|
public class SyncPartitionTask extends AbstractAsyncMetadataDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(SyncPartitionTask.class);
|
private static final ILog log = LogFactory.getLog(SyncPartitionTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private PartitionService partitionService;
|
private PartitionService partitionService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||||
Result<Map<String, List<Partition>>> partitionsResult = partitionService.listPartitionsFromKafka(clusterPhy);
|
Result<Map<String, List<Partition>>> partitionsResult = partitionService.listPartitionsFromKafka(clusterPhy);
|
||||||
if (partitionsResult.failed()) {
|
if (partitionsResult.failed()) {
|
||||||
return new TaskResult(TaskResult.FAIL_CODE, partitionsResult.getMessage());
|
return new TaskResult(TaskResult.FAIL_CODE, partitionsResult.getMessage());
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicConfig;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
|
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@@ -27,12 +26,12 @@ import java.util.Map;
|
|||||||
* @date 22/02/25
|
* @date 22/02/25
|
||||||
*/
|
*/
|
||||||
@Task(name = "SyncTopicConfigTask",
|
@Task(name = "SyncTopicConfigTask",
|
||||||
description = "Topic保存时间配置同步DB,",
|
description = "Topic保存时间配置同步DB",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class SyncTopicConfigTask extends AbstractClusterPhyDispatchTask {
|
public class SyncTopicConfigTask extends AbstractAsyncMetadataDispatchTask {
|
||||||
protected static final ILog log = LogFactory.getLog(SyncTopicConfigTask.class);
|
protected static final ILog log = LogFactory.getLog(SyncTopicConfigTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@@ -42,7 +41,7 @@ public class SyncTopicConfigTask extends AbstractClusterPhyDispatchTask {
|
|||||||
private TopicConfigService kafkaConfigService;
|
private TopicConfigService kafkaConfigService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||||
boolean success = true;
|
boolean success = true;
|
||||||
|
|
||||||
List<TopicConfig> topicConfigList = new ArrayList<>();
|
List<TopicConfig> topicConfigList = new ArrayList<>();
|
||||||
|
|||||||
@@ -9,25 +9,24 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@Task(name = "SyncTopicTask",
|
@Task(name = "SyncTopicTask",
|
||||||
description = "Topic信息同步到DB,",
|
description = "Topic信息同步到DB",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class SyncTopicTask extends AbstractClusterPhyDispatchTask {
|
public class SyncTopicTask extends AbstractAsyncMetadataDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(SyncTopicTask.class);
|
private static final ILog log = LogFactory.getLog(SyncTopicTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private TopicService topicService;
|
private TopicService topicService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||||
Result<List<Topic>> topicsResult = topicService.listTopicsFromKafka(clusterPhy);
|
Result<List<Topic>> topicsResult = topicService.listTopicsFromKafka(clusterPhy);
|
||||||
if (topicsResult.failed()) {
|
if (topicsResult.failed()) {
|
||||||
return new TaskResult(TaskResult.FAIL_CODE, topicsResult.getMessage());
|
return new TaskResult(TaskResult.FAIL_CODE, topicsResult.getMessage());
|
||||||
|
|||||||
@@ -0,0 +1,47 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.task.metrics;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.job.common.TaskResult;
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
|
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||||
|
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指标采集相关任务
|
||||||
|
*/
|
||||||
|
public abstract class AbstractAsyncMetricsDispatchTask extends AbstractClusterPhyDispatchTask {
|
||||||
|
private static final ILog log = LogFactory.getLog(AbstractAsyncMetricsDispatchTask.class);
|
||||||
|
|
||||||
|
public abstract TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TaskThreadPoolService taskThreadPoolService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||||
|
return this.asyncProcessSubTask(clusterPhy, triggerTimeUnitMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TaskResult asyncProcessSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||||
|
taskThreadPoolService.submitMetricsTask(
|
||||||
|
String.format("taskName=%s||clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||||
|
this.timeoutUnitSec.intValue() * 1000,
|
||||||
|
() -> {
|
||||||
|
try {
|
||||||
|
TaskResult tr = this.processClusterTask(clusterPhy, triggerTimeUnitMs);
|
||||||
|
if (TaskResult.SUCCESS_CODE != tr.getCode()) {
|
||||||
|
log.error("class=AbstractAsyncMetricsDispatchTask||taskName={}||clusterPhyId={}||taskResult={}||msg=failed", this.taskName, clusterPhy.getId(), tr);
|
||||||
|
} else {
|
||||||
|
log.debug("class=AbstractAsyncMetricsDispatchTask||taskName={}||clusterPhyId={}||msg=success", this.taskName, clusterPhy.getId());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("class=AbstractAsyncMetricsDispatchTask||taskName={}||clusterPhyId={}||errMsg=exception", this.taskName, clusterPhy.getId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
return TaskResult.SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,35 +7,26 @@ import com.didiglobal.logi.log.ILog;
|
|||||||
import com.didiglobal.logi.log.LogFactory;
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
import com.xiaojukeji.know.streaming.km.collector.metric.BrokerMetricCollector;
|
import com.xiaojukeji.know.streaming.km.collector.metric.BrokerMetricCollector;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author didi
|
* @author didi
|
||||||
*/
|
*/
|
||||||
@Task(name = "BrokerMetricCollectorTask",
|
@Task(name = "BrokerMetricCollectorTask",
|
||||||
description = "Broker指标采集任务,",
|
description = "Broker指标采集任务",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class BrokerMetricCollectorTask extends AbstractClusterPhyDispatchTask {
|
public class BrokerMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(BrokerMetricCollectorTask.class);
|
private static final ILog log = LogFactory.getLog(BrokerMetricCollectorTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private BrokerMetricCollector brokerMetricCollector;
|
private BrokerMetricCollector brokerMetricCollector;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private TaskThreadPoolService taskThreadPoolService;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||||
taskThreadPoolService.submitHeavenTask(
|
brokerMetricCollector.collectMetrics(clusterPhy);
|
||||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
|
||||||
100000,
|
|
||||||
() -> brokerMetricCollector.collectMetrics(clusterPhy)
|
|
||||||
);
|
|
||||||
|
|
||||||
return TaskResult.SUCCESS;
|
return TaskResult.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,35 +7,26 @@ import com.didiglobal.logi.log.ILog;
|
|||||||
import com.didiglobal.logi.log.LogFactory;
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
import com.xiaojukeji.know.streaming.km.collector.metric.ClusterMetricCollector;
|
import com.xiaojukeji.know.streaming.km.collector.metric.ClusterMetricCollector;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author didi
|
* @author didi
|
||||||
*/
|
*/
|
||||||
@Task(name = "ClusterMetricCollectorTask",
|
@Task(name = "ClusterMetricCollectorTask",
|
||||||
description = "Cluster指标采集任务,",
|
description = "Cluster指标采集任务",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class ClusterMetricCollectorTask extends AbstractClusterPhyDispatchTask {
|
public class ClusterMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(ClusterMetricCollectorTask.class);
|
private static final ILog log = LogFactory.getLog(ClusterMetricCollectorTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ClusterMetricCollector clusterMetricCollector;
|
private ClusterMetricCollector clusterMetricCollector;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private TaskThreadPoolService taskThreadPoolService;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||||
taskThreadPoolService.submitHeavenTask(
|
clusterMetricCollector.collectMetrics(clusterPhy);
|
||||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
|
||||||
100000,
|
|
||||||
() -> clusterMetricCollector.collectMetrics(clusterPhy)
|
|
||||||
);
|
|
||||||
|
|
||||||
return TaskResult.SUCCESS;
|
return TaskResult.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,35 +7,26 @@ import com.didiglobal.logi.log.ILog;
|
|||||||
import com.didiglobal.logi.log.LogFactory;
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
import com.xiaojukeji.know.streaming.km.collector.metric.GroupMetricCollector;
|
import com.xiaojukeji.know.streaming.km.collector.metric.GroupMetricCollector;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author didi
|
* @author didi
|
||||||
*/
|
*/
|
||||||
@Task(name = "GroupMetricCollectorTask",
|
@Task(name = "GroupMetricCollectorTask",
|
||||||
description = "Group指标采集任务,",
|
description = "Group指标采集任务",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class GroupMetricCollectorTask extends AbstractClusterPhyDispatchTask {
|
public class GroupMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(GroupMetricCollectorTask.class);
|
private static final ILog log = LogFactory.getLog(GroupMetricCollectorTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private GroupMetricCollector groupMetricCollector;
|
private GroupMetricCollector groupMetricCollector;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private TaskThreadPoolService taskThreadPoolService;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||||
taskThreadPoolService.submitHeavenTask(
|
groupMetricCollector.collectMetrics(clusterPhy);
|
||||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
|
||||||
100000,
|
|
||||||
() -> groupMetricCollector.collectMetrics(clusterPhy)
|
|
||||||
);
|
|
||||||
|
|
||||||
return TaskResult.SUCCESS;
|
return TaskResult.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,34 +5,25 @@ import com.didiglobal.logi.job.common.TaskResult;
|
|||||||
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.collector.metric.PartitionMetricCollector;
|
import com.xiaojukeji.know.streaming.km.collector.metric.PartitionMetricCollector;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author didi
|
* @author didi
|
||||||
*/
|
*/
|
||||||
@Task(name = "PartitionMetricCollectorTask",
|
@Task(name = "PartitionMetricCollectorTask",
|
||||||
description = "Partition指标采集任务,",
|
description = "Partition指标采集任务",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class PartitionMetricCollectorTask extends AbstractClusterPhyDispatchTask {
|
public class PartitionMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private PartitionMetricCollector partitionMetricCollector;
|
private PartitionMetricCollector partitionMetricCollector;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private TaskThreadPoolService taskThreadPoolService;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||||
taskThreadPoolService.submitHeavenTask(
|
partitionMetricCollector.collectMetrics(clusterPhy);
|
||||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
|
||||||
100000,
|
|
||||||
() -> partitionMetricCollector.collectMetrics(clusterPhy)
|
|
||||||
);
|
|
||||||
|
|
||||||
return TaskResult.SUCCESS;
|
return TaskResult.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,8 +5,6 @@ import com.didiglobal.logi.job.common.TaskResult;
|
|||||||
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.collector.metric.ReplicaMetricCollector;
|
import com.xiaojukeji.know.streaming.km.collector.metric.ReplicaMetricCollector;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
@@ -15,26 +13,19 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Task(name = "ReplicaMetricCollectorTask",
|
@Task(name = "ReplicaMetricCollectorTask",
|
||||||
description = "Replica指标采集任务,",
|
description = "Replica指标采集任务",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "0 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class ReplicaMetricCollectorTask extends AbstractClusterPhyDispatchTask {
|
public class ReplicaMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ReplicaMetricCollector replicaMetricCollector;
|
private ReplicaMetricCollector replicaMetricCollector;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private TaskThreadPoolService taskThreadPoolService;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||||
taskThreadPoolService.submitHeavenTask(
|
replicaMetricCollector.collectMetrics(clusterPhy);
|
||||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
|
||||||
100000,
|
|
||||||
() -> replicaMetricCollector.collectMetrics(clusterPhy)
|
|
||||||
);
|
|
||||||
|
|
||||||
return TaskResult.SUCCESS;
|
return TaskResult.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,8 +7,6 @@ import com.didiglobal.logi.log.ILog;
|
|||||||
import com.didiglobal.logi.log.LogFactory;
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
import com.xiaojukeji.know.streaming.km.collector.metric.TopicMetricCollector;
|
import com.xiaojukeji.know.streaming.km.collector.metric.TopicMetricCollector;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
|
||||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -20,22 +18,15 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class TopicMetricCollectorTask extends AbstractClusterPhyDispatchTask {
|
public class TopicMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(TopicMetricCollectorTask.class);
|
private static final ILog log = LogFactory.getLog(TopicMetricCollectorTask.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private TopicMetricCollector topicMetricCollector;
|
private TopicMetricCollector topicMetricCollector;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private TaskThreadPoolService taskThreadPoolService;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||||
taskThreadPoolService.submitHeavenTask(
|
topicMetricCollector.collectMetrics(clusterPhy);
|
||||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
|
||||||
100000,
|
|
||||||
() -> topicMetricCollector.collectMetrics(clusterPhy)
|
|
||||||
);
|
|
||||||
|
|
||||||
return TaskResult.SUCCESS;
|
return TaskResult.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,23 +16,72 @@ import javax.annotation.PostConstruct;
|
|||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
public class TaskThreadPoolService {
|
public class TaskThreadPoolService {
|
||||||
/**
|
/**
|
||||||
* 较重任务,比如指标采集
|
* metrics任务,比如指标采集
|
||||||
*/
|
*/
|
||||||
private FutureNoWaitUtil<Object> heavenTaskThreadPool;
|
private FutureNoWaitUtil<Object> metricsTaskThreadPool;
|
||||||
|
|
||||||
|
@Value(value = "${thread-pool.task.metrics.thread-num:18}")
|
||||||
|
private Integer metricsTaskThreadNum;
|
||||||
|
|
||||||
|
@Value(value = "${thread-pool.task.metrics.queue-size:180}")
|
||||||
|
private Integer metricsTaskQueueSize;
|
||||||
|
|
||||||
|
|
||||||
@Value(value = "${thread-pool.task.heaven.thread-num:12}")
|
/**
|
||||||
private Integer heavenTaskThreadNum;
|
* metadata任务
|
||||||
|
*/
|
||||||
|
private FutureNoWaitUtil<Object> metadataTaskThreadPool;
|
||||||
|
|
||||||
@Value(value = "${thread-pool.task.heaven.queue-size:1000}")
|
@Value(value = "${thread-pool.task.metadata.thread-num:27}")
|
||||||
private Integer heavenTaskQueueSize;
|
private Integer metadataTaskThreadNum;
|
||||||
|
|
||||||
|
@Value(value = "${thread-pool.task.metadata.queue-size:270}")
|
||||||
|
private Integer metadataTaskQueueSize;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* common任务
|
||||||
|
*/
|
||||||
|
private FutureNoWaitUtil<Object> commonTaskThreadPool;
|
||||||
|
|
||||||
|
@Value(value = "${thread-pool.task.common.thread-num:15}")
|
||||||
|
private Integer commonTaskThreadNum;
|
||||||
|
|
||||||
|
@Value(value = "${thread-pool.task.common.queue-size:150}")
|
||||||
|
private Integer commonTaskQueueSize;
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
private void init() {
|
private void init() {
|
||||||
heavenTaskThreadPool = FutureNoWaitUtil.init("heavenTaskThreadPool", heavenTaskThreadNum, heavenTaskThreadNum, heavenTaskQueueSize);
|
metricsTaskThreadPool = FutureNoWaitUtil.init(
|
||||||
|
"metricsTaskThreadPool",
|
||||||
|
metricsTaskThreadNum,
|
||||||
|
metricsTaskThreadNum,
|
||||||
|
metricsTaskQueueSize
|
||||||
|
);
|
||||||
|
|
||||||
|
metadataTaskThreadPool = FutureNoWaitUtil.init(
|
||||||
|
"metadataTaskThreadPool",
|
||||||
|
metadataTaskThreadNum,
|
||||||
|
metadataTaskThreadNum,
|
||||||
|
metadataTaskQueueSize
|
||||||
|
);
|
||||||
|
|
||||||
|
commonTaskThreadPool = FutureNoWaitUtil.init(
|
||||||
|
"commonTaskThreadPool",
|
||||||
|
commonTaskThreadNum,
|
||||||
|
commonTaskThreadNum,
|
||||||
|
commonTaskQueueSize
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void submitHeavenTask(String taskName, Integer timeoutUnisMs, Runnable runnable) {
|
public void submitMetricsTask(String taskName, Integer timeoutUnisMs, Runnable runnable) {
|
||||||
heavenTaskThreadPool.runnableTask(taskName, timeoutUnisMs, runnable);
|
metricsTaskThreadPool.runnableTask(taskName, timeoutUnisMs, runnable);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void submitMetadataTask(String taskName, Integer timeoutUnisMs, Runnable runnable) {
|
||||||
|
metadataTaskThreadPool.runnableTask(taskName, timeoutUnisMs, runnable);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void submitCommonTask(String taskName, Integer timeoutUnisMs, Runnable runnable) {
|
||||||
|
commonTaskThreadPool.runnableTask(taskName, timeoutUnisMs, runnable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,76 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.task.service.listener;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
|
||||||
|
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
|
||||||
|
import com.xiaojukeji.know.streaming.km.task.metadata.AbstractAsyncMetadataDispatchTask;
|
||||||
|
import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask;
|
||||||
|
import org.springframework.context.ApplicationListener;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class TaskClusterAddedListener implements ApplicationListener<ClusterPhyAddedEvent> {
|
||||||
|
private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onApplicationEvent(ClusterPhyAddedEvent event) {
|
||||||
|
LOGGER.info("class=TaskClusterAddedListener||method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId());
|
||||||
|
Long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// 交由KS自定义的线程池,异步执行任务
|
||||||
|
FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) {
|
||||||
|
ClusterPhy tempClusterPhy = null;
|
||||||
|
|
||||||
|
// 120秒内无加载进来,则直接返回退出
|
||||||
|
while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) {
|
||||||
|
tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
|
||||||
|
if (tempClusterPhy != null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
BackoffUtils.backoff(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tempClusterPhy == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定
|
||||||
|
BackoffUtils.backoff(5000);
|
||||||
|
final ClusterPhy clusterPhy = tempClusterPhy;
|
||||||
|
|
||||||
|
// 集群执行集群元信息同步
|
||||||
|
List<AbstractAsyncMetadataDispatchTask> metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values());
|
||||||
|
for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) {
|
||||||
|
try {
|
||||||
|
dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定
|
||||||
|
BackoffUtils.backoff(5000);
|
||||||
|
|
||||||
|
// 集群集群指标采集
|
||||||
|
List<AbstractAsyncMetricsDispatchTask> metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values());
|
||||||
|
for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) {
|
||||||
|
try {
|
||||||
|
dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user