diff --git a/docs/dev_guide/登录系统对接.md b/docs/dev_guide/登录系统对接.md new file mode 100644 index 00000000..e1038c63 --- /dev/null +++ b/docs/dev_guide/登录系统对接.md @@ -0,0 +1,199 @@ + + +![Logo](https://user-images.githubusercontent.com/71620349/185368586-aed82d30-1534-453d-86ff-ecfa9d0f35bd.png) + +## 登录系统对接 + +[KnowStreaming](https://github.com/didi/KnowStreaming)(以下简称KS) 除了实现基于本地MySQL的用户登录认证方式外,还已经实现了基于Ldap的登录认证。 + +但是,登录认证系统并非仅此两种。因此,为了具有更好的拓展性,KS具有自定义登陆认证逻辑,快速对接已有系统的特性。 + +在KS中,我们将登陆认证相关的一些文件放在[km-extends](https://github.com/didi/KnowStreaming/tree/master/km-extends)模块下的[km-account](https://github.com/didi/KnowStreaming/tree/master/km-extends/km-account)模块里。 + +本文将介绍KS如何快速对接自有的用户登录认证系统。 + +### 对接步骤 + +- 创建一个登陆认证类,实现[LogiCommon](https://github.com/didi/LogiCommon)的LoginExtend接口; +- 将[application.yml](https://github.com/didi/KnowStreaming/blob/master/km-rest/src/main/resources/application.yml)中的spring.logi-security.login-extend-bean-name字段改为登陆认证类的bean名称; + +```Java +//LoginExtend 接口 +public interface LoginExtend { + + /** + * 验证登录信息,同时记住登录状态 + */ + UserBriefVO verifyLogin(AccountLoginDTO var1, HttpServletRequest var2, HttpServletResponse var3) throws LogiSecurityException; + + /** + * 登出接口,清楚登录状态 + */ + Result logout(HttpServletRequest var1, HttpServletResponse var2); + + /** + * 检查是否已经登录 + */ + boolean interceptorCheck(HttpServletRequest var1, HttpServletResponse var2, String var3, List var4) throws IOException; + +} + +``` + + + +### 对接例子 + +我们以Ldap对接为例,说明KS如何对接登录认证系统。 + ++ 编写[LdapLoginServiceImpl](https://github.com/didi/KnowStreaming/blob/master/km-extends/km-account/src/main/java/com/xiaojukeji/know/streaming/km/account/login/ldap/LdapLoginServiceImpl.java)类,实现LoginExtend接口。 ++ 设置[application.yml](https://github.com/didi/KnowStreaming/blob/master/km-rest/src/main/resources/application.yml)中的spring.logi-security.login-extend-bean-name=ksLdapLoginService。 + +完成上述两步即可实现KS对接Ldap认证登陆。 + +```Java +@Service("ksLdapLoginService") +public class LdapLoginServiceImpl implements LoginExtend { + + + @Override + public UserBriefVO verifyLogin(AccountLoginDTO loginDTO, + HttpServletRequest request, + HttpServletResponse response) throws LogiSecurityException { + String decodePasswd = AESUtils.decrypt(loginDTO.getPw()); + + // 去LDAP验证账密 + LdapPrincipal ldapAttrsInfo = ldapAuthentication.authenticate(loginDTO.getUserName(), decodePasswd); + if (ldapAttrsInfo == null) { + // 用户不存在,正常来说上如果有问题,上一步会直接抛出异常 + throw new LogiSecurityException(ResultCode.USER_NOT_EXISTS); + } + + // 进行业务相关操作 + + // 记录登录状态,Ldap因为无法记录登录状态,因此有KnowStreaming进行记录 + initLoginContext(request, response, loginDTO.getUserName(), user.getId()); + return CopyBeanUtil.copy(user, UserBriefVO.class); + } + + @Override + public Result logout(HttpServletRequest request, HttpServletResponse response) { + + //清理cookie和session + + return Result.buildSucc(Boolean.TRUE); + } + + @Override + public boolean interceptorCheck(HttpServletRequest request, HttpServletResponse response, String requestMappingValue, List whiteMappingValues) throws IOException { + + // 检查是否已经登录 + String userName = HttpRequestUtil.getOperator(request); + if (StringUtils.isEmpty(userName)) { + // 未登录,则进行登出 + logout(request, response); + return Boolean.FALSE; + } + + return Boolean.TRUE; + } +} + +``` + + + +### 实现原理 + +因为登陆和登出整体实现逻辑是一致的,所以我们以登陆逻辑为例进行介绍。 + ++ 登陆原理 + +登陆走的是[LogiCommon](https://github.com/didi/LogiCommon)自带的LoginController。 + +```java +@RestController +public class LoginController { + + + //登陆接口 + @PostMapping({"/login"}) + public Result login(HttpServletRequest request, HttpServletResponse response, @RequestBody AccountLoginDTO loginDTO) { + try { + //登陆认证 + UserBriefVO userBriefVO = this.loginService.verifyLogin(loginDTO, request, response); + return Result.success(userBriefVO); + + } catch (LogiSecurityException var5) { + return Result.fail(var5); + } + } + +} +``` + +而登陆操作是调用LoginServiceImpl类来实现,但是具体由哪个登陆认证类来执行登陆操作却由loginExtendBeanTool来指定。 + +```java +//LoginServiceImpl类 +@Service +public class LoginServiceImpl implements LoginService { + + //实现登陆操作,但是具体哪个登陆类由loginExtendBeanTool来管理 + public UserBriefVO verifyLogin(AccountLoginDTO loginDTO, HttpServletRequest request, HttpServletResponse response) throws LogiSecurityException { + + return this.loginExtendBeanTool.getLoginExtendImpl().verifyLogin(loginDTO, request, response); + } + + +} +``` + +而loginExtendBeanTool类会优先去查找用户指定的登陆认证类,如果失败则调用默认的登陆认证函数。 + +```java +//LoginExtendBeanTool类 +@Component("logiSecurityLoginExtendBeanTool") +public class LoginExtendBeanTool { + + public LoginExtend getLoginExtendImpl() { + LoginExtend loginExtend; + //先调用用户指定登陆类,如果失败则调用系统默认登陆认证 + try { + //调用的类由spring.logi-security.login-extend-bean-name指定 + loginExtend = this.getCustomLoginExtendImplBean(); + } catch (UnsupportedOperationException var3) { + loginExtend = this.getDefaultLoginExtendImplBean(); + } + + return loginExtend; + } +} +``` + ++ 认证原理 + +认证的实现则比较简单,向Spring中注册我们的拦截器PermissionInterceptor。 + +拦截器会调用LoginServiceImpl类的拦截方法,LoginServiceImpl后续处理逻辑就和前面登陆是一致的。 + +```java +public class PermissionInterceptor implements HandlerInterceptor { + + + /** + * 拦截预处理 + * @return boolean false:拦截, 不向下执行, true:放行 + */ + @Override + public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { + + //免登录相关校验,如果验证通过,提前返回 + + //走拦截函数,进行普通用户验证 + return loginService.interceptorCheck(request, response, classRequestMappingValue, whiteMappingValues); + } + +} +``` + diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java index 84a06c05..15aafdb5 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java @@ -20,6 +20,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewV import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum; +import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; @@ -171,7 +172,7 @@ public class GroupManagerImpl implements GroupManager { } if (!ConsumerGroupState.EMPTY.equals(description.state()) && !ConsumerGroupState.DEAD.equals(description.state())) { - return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, String.format("group处于%s, 重置失败(仅Empty情况可重置)", description.state().name())); + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, String.format("group处于%s, 重置失败(仅Empty情况可重置)", GroupStateEnum.getByRawState(description.state()).getState())); } // 获取offset diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/ClusterPhyAddedEvent.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/ClusterPhyAddedEvent.java new file mode 100644 index 00000000..67aa707e --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/ClusterPhyAddedEvent.java @@ -0,0 +1,15 @@ +package com.xiaojukeji.know.streaming.km.common.bean.event.cluster; + +import lombok.Getter; + +/** + * 集群新增事件 + * @author zengqiao + * @date 22/02/25 + */ +@Getter +public class ClusterPhyAddedEvent extends ClusterPhyBaseEvent { + public ClusterPhyAddedEvent(Object source, Long clusterPhyId) { + super(source, clusterPhyId); + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/kafka/zk/BaseKafkaZKEvent.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/kafka/zk/BaseKafkaZKEvent.java deleted file mode 100644 index 6852cf9c..00000000 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/kafka/zk/BaseKafkaZKEvent.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.xiaojukeji.know.streaming.km.common.bean.event.kafka.zk; - -import lombok.Getter; - -@Getter -public abstract class BaseKafkaZKEvent { - /** - * 触发时间 - */ - protected Long eventTime; - - /** - * 初始化数据的事件 - */ - protected Boolean initEvent; - - /** - * 集群ID - */ - protected Long clusterPhyId; - - protected BaseKafkaZKEvent(Long eventTime, Long clusterPhyId) { - this.eventTime = eventTime; - this.clusterPhyId = clusterPhyId; - } -} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/kafka/zk/ControllerChangeEvent.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/kafka/zk/ControllerChangeEvent.java deleted file mode 100644 index 58238a4b..00000000 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/kafka/zk/ControllerChangeEvent.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.xiaojukeji.know.streaming.km.common.bean.event.kafka.zk; - -import lombok.Getter; - -@Getter -public class ControllerChangeEvent extends BaseKafkaZKEvent { - public ControllerChangeEvent(Long eventTime, Long clusterPhyId) { - super(eventTime, clusterPhyId); - } -} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ReassignConverter.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ReassignConverter.java index ee67b5c2..9d00febe 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ReassignConverter.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ReassignConverter.java @@ -170,6 +170,7 @@ public class ReassignConverter { detail.setOriginalBrokerIdList(CommonUtils.string2IntList(subJobPO.getOriginalBrokerIds())); detail.setReassignBrokerIdList(CommonUtils.string2IntList(subJobPO.getReassignBrokerIds())); detail.setStatus(subJobPO.getStatus()); + detail.setOldReplicaNum(detail.getOriginalBrokerIdList().size()); ReassignSubJobExtendData extendData = ConvertUtil.str2ObjByJson(subJobPO.getExtendData(), ReassignSubJobExtendData.class); if (extendData != null) { @@ -217,6 +218,7 @@ public class ReassignConverter { topicDetail.setPresentReplicaNum(partitionDetailList.get(0).getPresentReplicaNum()); topicDetail.setNewReplicaNum(partitionDetailList.get(0).getNewReplicaNum()); + topicDetail.setOldReplicaNum(partitionDetailList.get(0).getOldReplicaNum()); topicDetail.setOriginalRetentionTimeUnitMs(partitionDetailList.get(0).getOriginalRetentionTimeUnitMs()); topicDetail.setReassignRetentionTimeUnitMs(partitionDetailList.get(0).getReassignRetentionTimeUnitMs()); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java index 2ba13738..43a0557c 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java @@ -7,7 +7,9 @@ import com.didiglobal.logi.security.common.dto.oplog.OplogDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent; import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO; +import com.xiaojukeji.know.streaming.km.common.component.SpringTool; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; @@ -106,6 +108,8 @@ public class ClusterPhyServiceImpl implements ClusterPhyService { log.info("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster finished", clusterPhyPO.getId(), operator); + // 发布添加集群事件 + SpringTool.publish(new ClusterPhyAddedEvent(this, clusterPhyPO.getId())); return clusterPhyPO.getId(); } catch (DuplicateKeyException dke) { log.warn("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=duplicate data", clusterPhyPO.getId(), operator); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/ReassignJobService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/ReassignJobService.java index 83c4a7d2..eef02e6a 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/ReassignJobService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/ReassignJobService.java @@ -60,4 +60,7 @@ public interface ReassignJobService { * 依据任务状态或者其中一个任务ID */ Long getOneRunningJobId(Long clusterPhyId); + + + Result preferredReplicaElection(Long jobId); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java index 1cabf913..4fabb154 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java @@ -31,6 +31,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; +import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignJobService; import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignService; @@ -85,6 +86,9 @@ public class ReassignJobServiceImpl implements ReassignJobService { @Autowired private TopicConfigService topicConfigService; + @Autowired + private OpPartitionService opPartitionService; + @Override @Transactional public Result create(Long jobId, ReplaceReassignJob replaceReassignJob, String creator) { @@ -343,6 +347,7 @@ public class ReassignJobServiceImpl implements ReassignJobService { } @Override + @Transactional public Result verifyAndUpdateStatue(Long jobId) { if (jobId == null) { return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, MsgConstant.getJobIdCanNotNull()); @@ -379,7 +384,18 @@ public class ReassignJobServiceImpl implements ReassignJobService { } // 更新任务状态 - return this.checkAndSetSuccessIfFinished(jobPO, rrr.getData()); + Result result = this.checkAndSetSuccessIfFinished(jobPO, rrr.getData()); + if (!result.hasData()){ + return Result.buildFromIgnoreData(result); + } + + //已完成 + rv = this.preferredReplicaElection(jobId); + if (rv.failed()){ + TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); + } + + return Result.buildSuc(); } @Override @@ -466,6 +482,37 @@ public class ReassignJobServiceImpl implements ReassignJobService { return subPOList.get(0).getJobId(); } + @Override + public Result preferredReplicaElection(Long jobId) { + // 获取任务 + ReassignJobPO jobPO = reassignJobDAO.selectById(jobId); + if (jobPO == null) { + // 任务不存在 + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getJobNotExist(jobId)); + } + if (!JobStatusEnum.isFinished(jobPO.getStatus())){ + return Result.buildSuc(); + } + + // 获取子任务 + List subJobPOList = this.getSubJobsByJobId(jobId); + List topicPartitions = new ArrayList<>(); + subJobPOList.stream().forEach(reassignPO -> { + Integer targetLeader = CommonUtils.string2IntList(reassignPO.getReassignBrokerIds()).get(0); + Integer originalLeader = CommonUtils.string2IntList(reassignPO.getOriginalBrokerIds()).get(0); + //替换过leader的添加到优先副本选举任务列表 + if (!originalLeader.equals(targetLeader)){ + topicPartitions.add(new TopicPartition(reassignPO.getTopicName(), reassignPO.getPartitionId())); + } + }); + + if (!topicPartitions.isEmpty()){ + return opPartitionService.preferredReplicaElection(jobPO.getClusterPhyId(), topicPartitions); + } + + return Result.buildSuc(); + } + /**************************************************** private method ****************************************************/ @@ -510,7 +557,8 @@ public class ReassignJobServiceImpl implements ReassignJobService { } - private Result checkAndSetSuccessIfFinished(ReassignJobPO jobPO, ReassignResult reassignmentResult) { + @Transactional + public Result checkAndSetSuccessIfFinished(ReassignJobPO jobPO, ReassignResult reassignmentResult) { long now = System.currentTimeMillis(); boolean existNotFinished = false; diff --git a/km-dist/helm/Chart.yaml b/km-dist/helm/Chart.yaml index d37c93e8..bd9b7d98 100644 --- a/km-dist/helm/Chart.yaml +++ b/km-dist/helm/Chart.yaml @@ -1,16 +1,16 @@ apiVersion: v2 name: knowstreaming-manager -description: A Helm chart for Kubernetes +description: knowstreaming-manager Helm chart type: application -version: 0.1.0 +version: 0.1.3 maintainers: - email: didicloud@didiglobal.com name: didicloud -appVersion: "1.0.0" +appVersion: "3.0.0-beta.1" dependencies: - name: knowstreaming-web diff --git a/km-dist/helm/charts/ksmysql/templates/statefulset.yaml b/km-dist/helm/charts/ksmysql/templates/statefulset.yaml index d61c506a..15ab2858 100644 --- a/km-dist/helm/charts/ksmysql/templates/statefulset.yaml +++ b/km-dist/helm/charts/ksmysql/templates/statefulset.yaml @@ -21,7 +21,7 @@ spec: {{- include "ksmysql.selectorLabels" . | nindent 8 }} spec: containers: - - image: knowstreaming/knowstreaming-mysql:latest + - image: knowstreaming/knowstreaming-mysql:0.1.0 name: {{ .Chart.Name }} env: - name: MYSQL_DATABASE diff --git a/km-dist/helm/values.yaml b/km-dist/helm/values.yaml index 293bd001..036f5cc8 100644 --- a/km-dist/helm/values.yaml +++ b/km-dist/helm/values.yaml @@ -3,7 +3,7 @@ replicaCount: 2 image: repository: knowstreaming/knowstreaming-manager pullPolicy: IfNotPresent - tag: "latest" + tag: "0.1.0" imagePullSecrets: [] nameOverride: "" @@ -73,7 +73,7 @@ knowstreaming-web: image: repository: knowstreaming/knowstreaming-ui pullPolicy: IfNotPresent - tag: "latest" + tag: "0.1.0" service: type: NodePort diff --git a/km-extends/km-account/src/main/java/com/xiaojukeji/know/streaming/km/account/KmAccountConfig.java b/km-extends/km-account/src/main/java/com/xiaojukeji/know/streaming/km/account/KmAccountConfig.java index a1acc73e..c21c0b3b 100644 --- a/km-extends/km-account/src/main/java/com/xiaojukeji/know/streaming/km/account/KmAccountConfig.java +++ b/km-extends/km-account/src/main/java/com/xiaojukeji/know/streaming/km/account/KmAccountConfig.java @@ -34,7 +34,7 @@ public class KmAccountConfig { /**************************************************** Ldap 用户注册到KM相关 ****************************************************/ - @Value(value = "${account.ldap.auth-user-registration-role:0,1,2}") + @Value(value = "${account.ldap.auth-user-registration-role:1677}") private String authUserRegistrationRole; // ldap自动注册的默认角色。请注意:它通常来说都是低权限角色 @Value(value = "${account.ldap.auth-user-registration:false}") diff --git a/km-rest/src/main/resources/application.yml b/km-rest/src/main/resources/application.yml index 7af15333..08cac4af 100644 --- a/km-rest/src/main/resources/application.yml +++ b/km-rest/src/main/resources/application.yml @@ -60,9 +60,15 @@ thread-pool: suitable-queue-size: 1000 # 线程池理想的队列大小,非主要,可不修改 task: # 任务模块的配置 - heaven: # 采集任务配置 - thread-num: 20 # 采集任务线程池核心线程数 - queue-size: 1000 # 采集任务线程池队列大小 + metrics: # metrics采集任务配置 + thread-num: 18 # metrics采集任务线程池核心线程数 + queue-size: 180 # metrics采集任务线程池队列大小 + metadata: # metadata同步任务配置 + thread-num: 27 # metadata同步任务线程池核心线程数 + queue-size: 270 # metadata同步任务线程池队列大小 + common: # 剩余其他任务配置 + thread-num: 15 # 剩余其他任务线程池核心线程数 + queue-size: 150 # 剩余其他任务线程池队列大小 # 客户端池大小相关配置 diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/AbstractAsyncCommonDispatchTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/AbstractAsyncCommonDispatchTask.java new file mode 100644 index 00000000..0f8f5797 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/AbstractAsyncCommonDispatchTask.java @@ -0,0 +1,46 @@ +package com.xiaojukeji.know.streaming.km.task; + +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * other相关任务 + */ +public abstract class AbstractAsyncCommonDispatchTask extends AbstractClusterPhyDispatchTask { + private static final ILog log = LogFactory.getLog(AbstractAsyncCommonDispatchTask.class); + + public abstract TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception; + + @Autowired + private TaskThreadPoolService taskThreadPoolService; + + @Override + protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + return this.asyncProcessSubTask(clusterPhy, triggerTimeUnitMs); + } + + public TaskResult asyncProcessSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + taskThreadPoolService.submitCommonTask( + String.format("taskName=%s||clusterPhyId=%d", this.taskName, clusterPhy.getId()), + this.timeoutUnitSec.intValue() * 1000, + () -> { + try { + TaskResult tr = this.processClusterTask(clusterPhy, triggerTimeUnitMs); + if (TaskResult.SUCCESS_CODE != tr.getCode()) { + log.error("class=AbstractAsyncCommonDispatchTask||taskName={}||clusterPhyId={}||taskResult={}||msg=failed", this.taskName, clusterPhy.getId(), tr); + } else { + log.debug("class=AbstractAsyncCommonDispatchTask||taskName={}||clusterPhyId={}||msg=success", this.taskName, clusterPhy.getId()); + } + } catch (Exception e) { + log.error("class=AbstractAsyncCommonDispatchTask||taskName={}||clusterPhyId={}||errMsg=exception", this.taskName, clusterPhy.getId(), e); + } + } + ); + + return TaskResult.SUCCESS; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/AbstractDispatchTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/AbstractDispatchTask.java index 0be382fe..6fce1bb3 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/AbstractDispatchTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/AbstractDispatchTask.java @@ -130,4 +130,12 @@ public abstract class AbstractDispatchTask this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs) - ); - - return TaskResult.SUCCESS; + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); } - private void calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { // 获取配置,<配置名,配置信息> Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); @@ -91,6 +81,8 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask { } catch (Exception e) { log.error("method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); } + + return TaskResult.SUCCESS; } private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/job/CommunityReassignJobTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/job/CommunityReassignJobTask.java index 179aa30a..3a01e80c 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/job/CommunityReassignJobTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/job/CommunityReassignJobTask.java @@ -8,7 +8,7 @@ import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignJobService; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; +import com.xiaojukeji.know.streaming.km.task.AbstractAsyncCommonDispatchTask; import org.springframework.beans.factory.annotation.Autowired; @Task(name = "CommunityReassignJobTask", @@ -17,14 +17,14 @@ import org.springframework.beans.factory.annotation.Autowired; autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 6 * 60) -public class CommunityReassignJobTask extends AbstractClusterPhyDispatchTask { +public class CommunityReassignJobTask extends AbstractAsyncCommonDispatchTask { private static final ILog log = LogFactory.getLog(CommunityReassignJobTask.class); @Autowired private ReassignJobService reassignJobService; @Override - protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { // 获取迁移中的任务 Long jobId = reassignJobService.getOneRunningJobId(clusterPhy.getId()); if (jobId == null) { diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/job/KMJobTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/job/KMJobTask.java index 3d3039b8..c0b64e63 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/job/KMJobTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/job/KMJobTask.java @@ -5,7 +5,7 @@ import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.core.service.job.JobService; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; +import com.xiaojukeji.know.streaming.km.task.AbstractAsyncCommonDispatchTask; import org.springframework.beans.factory.annotation.Autowired; @Task(name = "kmJobTask", @@ -14,13 +14,13 @@ import org.springframework.beans.factory.annotation.Autowired; autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 6 * 60) -public class KMJobTask extends AbstractClusterPhyDispatchTask { +public class KMJobTask extends AbstractAsyncCommonDispatchTask { @Autowired private JobService jobService; @Override - protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { jobService.scheduleJobByClusterId(clusterPhy.getId()); return TaskResult.SUCCESS; } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/AbstractAsyncMetadataDispatchTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/AbstractAsyncMetadataDispatchTask.java new file mode 100644 index 00000000..668208c7 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/AbstractAsyncMetadataDispatchTask.java @@ -0,0 +1,47 @@ +package com.xiaojukeji.know.streaming.km.task.metadata; + +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; +import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * 元数据同步相关任务 + */ +public abstract class AbstractAsyncMetadataDispatchTask extends AbstractClusterPhyDispatchTask { + private static final ILog log = LogFactory.getLog(AbstractAsyncMetadataDispatchTask.class); + + public abstract TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception; + + @Autowired + private TaskThreadPoolService taskThreadPoolService; + + @Override + protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + return this.asyncProcessSubTask(clusterPhy, triggerTimeUnitMs); + } + + public TaskResult asyncProcessSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + taskThreadPoolService.submitMetadataTask( + String.format("taskName=%s||clusterPhyId=%d", this.taskName, clusterPhy.getId()), + this.timeoutUnitSec.intValue() * 1000, + () -> { + try { + TaskResult tr = this.processClusterTask(clusterPhy, triggerTimeUnitMs); + if (TaskResult.SUCCESS_CODE != tr.getCode()) { + log.error("class=AbstractAsyncMetadataDispatchTask||taskName={}||clusterPhyId={}||taskResult={}||msg=failed", this.taskName, clusterPhy.getId(), tr); + } else { + log.debug("class=AbstractAsyncMetadataDispatchTask||taskName={}||clusterPhyId={}||msg=success", this.taskName, clusterPhy.getId()); + } + } catch (Exception e) { + log.error("class=AbstractAsyncMetadataDispatchTask||taskName={}||clusterPhyId={}||errMsg=exception", this.taskName, clusterPhy.getId(), e); + } + } + ); + + return TaskResult.SUCCESS; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncBrokerConfigDiffTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncBrokerConfigDiffTask.java index ea286d3e..60ffbd84 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncBrokerConfigDiffTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncBrokerConfigDiffTask.java @@ -13,7 +13,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerConfigPO; import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigDiffTypeEnum; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerConfigService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; import org.springframework.beans.factory.annotation.Autowired; import java.util.*; @@ -25,12 +24,12 @@ import java.util.stream.Collectors; * @date 22/02/25 */ @Task(name = "SyncBrokerConfigDiffTask", - description = "Broker配置的Diff信息同步到DB,", + description = "Broker配置的Diff信息同步到DB", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 6 * 60) -public class SyncBrokerConfigDiffTask extends AbstractClusterPhyDispatchTask { +public class SyncBrokerConfigDiffTask extends AbstractAsyncMetadataDispatchTask { protected static final ILog log = LogFactory.getLog(SyncBrokerConfigDiffTask.class); @Autowired @@ -40,7 +39,7 @@ public class SyncBrokerConfigDiffTask extends AbstractClusterPhyDispatchTask { private BrokerConfigService brokerConfigService; @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { // > Map> allConfigMap = new HashMap<>(); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncBrokerTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncBrokerTask.java index 78ff9560..a4d1dc99 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncBrokerTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncBrokerTask.java @@ -9,25 +9,24 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; import org.springframework.beans.factory.annotation.Autowired; import java.util.List; @Task(name = "SyncBrokerTask", - description = "Broker信息同步到DB,", + description = "Broker信息同步到DB", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class SyncBrokerTask extends AbstractClusterPhyDispatchTask { +public class SyncBrokerTask extends AbstractAsyncMetadataDispatchTask { private static final ILog log = LogFactory.getLog(SyncBrokerTask.class); @Autowired private BrokerService brokerService; @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { Result> brokersResult = brokerService.listBrokersFromKafka(clusterPhy); if (brokersResult.failed()) { return new TaskResult(TaskResult.FAIL_CODE, brokersResult.getMessage()); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncControllerTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncControllerTask.java index e87122a6..971ef852 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncControllerTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncControllerTask.java @@ -9,24 +9,23 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; import org.springframework.beans.factory.annotation.Autowired; @Task(name = "SyncControllerTask", - description = "Controller信息同步到DB,", + description = "Controller信息同步到DB", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class SyncControllerTask extends AbstractClusterPhyDispatchTask { +public class SyncControllerTask extends AbstractAsyncMetadataDispatchTask { private static final ILog log = LogFactory.getLog(SyncControllerTask.class); @Autowired private KafkaControllerService kafkaControllerService; @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { Result controllerResult = kafkaControllerService.getControllerFromKafka(clusterPhy); if (controllerResult.failed()) { return new TaskResult(TaskResult.FAIL_CODE, controllerResult.getMessage()); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaAclTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaAclTask.java index 58ee3dd4..8b87d36e 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaAclTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaAclTask.java @@ -11,7 +11,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO; import com.xiaojukeji.know.streaming.km.common.converter.KafkaAclConverter; import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService; import com.xiaojukeji.know.streaming.km.core.service.acl.OpKafkaAclService; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; import org.apache.kafka.common.acl.AclBinding; import org.springframework.beans.factory.annotation.Autowired; @@ -19,12 +18,12 @@ import java.util.List; import java.util.stream.Collectors; @Task(name = "SyncKafkaAclTask", - description = "KafkaAcl信息同步到DB,", + description = "KafkaAcl信息同步到DB", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class SyncKafkaAclTask extends AbstractClusterPhyDispatchTask { +public class SyncKafkaAclTask extends AbstractAsyncMetadataDispatchTask { private static final ILog log = LogFactory.getLog(SyncKafkaAclTask.class); @Autowired @@ -34,7 +33,7 @@ public class SyncKafkaAclTask extends AbstractClusterPhyDispatchTask { private OpKafkaAclService opKafkaAclService; @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { Result> aclBindingListResult = kafkaAclService.getAclFromKafka(clusterPhy.getId()); if (aclBindingListResult.failed()) { return TaskResult.FAIL; diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java index 8861d085..65b64b96 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java @@ -12,7 +12,6 @@ import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; @@ -22,19 +21,19 @@ import java.util.stream.Collectors; @Task(name = "SyncKafkaGroupTask", - description = "KafkaGroup信息同步到DB,", + description = "KafkaGroup信息同步到DB", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class SyncKafkaGroupTask extends AbstractClusterPhyDispatchTask { +public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { private static final ILog log = LogFactory.getLog(SyncKafkaGroupTask.class); @Autowired private GroupService groupService; @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { TaskResult tr = TaskResult.SUCCESS; List groupNameList = groupService.listGroupsFromKafka(clusterPhy.getId()); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaUserTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaUserTask.java index e716d4ee..2e2fd63e 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaUserTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaUserTask.java @@ -9,26 +9,25 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkauser.KafkaUser; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.core.service.kafkauser.KafkaUserService; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; import org.springframework.beans.factory.annotation.Autowired; import java.util.List; import java.util.stream.Collectors; @Task(name = "SyncKafkaUserTask", - description = "KafkaUser信息同步到DB,", + description = "KafkaUser信息同步到DB", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class SyncKafkaUserTask extends AbstractClusterPhyDispatchTask { +public class SyncKafkaUserTask extends AbstractAsyncMetadataDispatchTask { private static final ILog log = LogFactory.getLog(SyncKafkaUserTask.class); @Autowired private KafkaUserService kafkaUserService; @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { // 查询KafkaUser数据 Result> kafkaUserResult = kafkaUserService.getKafkaUserFromKafka(clusterPhy.getId()); if (kafkaUserResult.failed()) { diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncPartitionTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncPartitionTask.java index b1edc024..e4b07ec3 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncPartitionTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncPartitionTask.java @@ -10,7 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.po.partition.PartitionPO; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; import org.springframework.beans.factory.annotation.Autowired; import java.util.ArrayList; @@ -19,19 +18,19 @@ import java.util.List; import java.util.Map; @Task(name = "SyncPartitionTask", - description = "Partition信息同步到DB,", + description = "Partition信息同步到DB", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class SyncPartitionTask extends AbstractClusterPhyDispatchTask { +public class SyncPartitionTask extends AbstractAsyncMetadataDispatchTask { private static final ILog log = LogFactory.getLog(SyncPartitionTask.class); @Autowired private PartitionService partitionService; @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { Result>> partitionsResult = partitionService.listPartitionsFromKafka(clusterPhy); if (partitionsResult.failed()) { return new TaskResult(TaskResult.FAIL_CODE, partitionsResult.getMessage()); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncTopicConfigTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncTopicConfigTask.java index 1940c6b8..69b71d06 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncTopicConfigTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncTopicConfigTask.java @@ -12,7 +12,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicConfig; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; import org.springframework.beans.factory.annotation.Autowired; import java.util.ArrayList; @@ -27,12 +26,12 @@ import java.util.Map; * @date 22/02/25 */ @Task(name = "SyncTopicConfigTask", - description = "Topic保存时间配置同步DB,", + description = "Topic保存时间配置同步DB", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class SyncTopicConfigTask extends AbstractClusterPhyDispatchTask { +public class SyncTopicConfigTask extends AbstractAsyncMetadataDispatchTask { protected static final ILog log = LogFactory.getLog(SyncTopicConfigTask.class); @Autowired @@ -42,7 +41,7 @@ public class SyncTopicConfigTask extends AbstractClusterPhyDispatchTask { private TopicConfigService kafkaConfigService; @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { boolean success = true; List topicConfigList = new ArrayList<>(); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncTopicTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncTopicTask.java index 2b851566..4ed42c64 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncTopicTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncTopicTask.java @@ -9,25 +9,24 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; import org.springframework.beans.factory.annotation.Autowired; import java.util.List; @Task(name = "SyncTopicTask", - description = "Topic信息同步到DB,", + description = "Topic信息同步到DB", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class SyncTopicTask extends AbstractClusterPhyDispatchTask { +public class SyncTopicTask extends AbstractAsyncMetadataDispatchTask { private static final ILog log = LogFactory.getLog(SyncTopicTask.class); @Autowired private TopicService topicService; @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { Result> topicsResult = topicService.listTopicsFromKafka(clusterPhy); if (topicsResult.failed()) { return new TaskResult(TaskResult.FAIL_CODE, topicsResult.getMessage()); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/AbstractAsyncMetricsDispatchTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/AbstractAsyncMetricsDispatchTask.java new file mode 100644 index 00000000..705c2301 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/AbstractAsyncMetricsDispatchTask.java @@ -0,0 +1,47 @@ +package com.xiaojukeji.know.streaming.km.task.metrics; + +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; +import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * 指标采集相关任务 + */ +public abstract class AbstractAsyncMetricsDispatchTask extends AbstractClusterPhyDispatchTask { + private static final ILog log = LogFactory.getLog(AbstractAsyncMetricsDispatchTask.class); + + public abstract TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception; + + @Autowired + private TaskThreadPoolService taskThreadPoolService; + + @Override + protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + return this.asyncProcessSubTask(clusterPhy, triggerTimeUnitMs); + } + + public TaskResult asyncProcessSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + taskThreadPoolService.submitMetricsTask( + String.format("taskName=%s||clusterPhyId=%d", this.taskName, clusterPhy.getId()), + this.timeoutUnitSec.intValue() * 1000, + () -> { + try { + TaskResult tr = this.processClusterTask(clusterPhy, triggerTimeUnitMs); + if (TaskResult.SUCCESS_CODE != tr.getCode()) { + log.error("class=AbstractAsyncMetricsDispatchTask||taskName={}||clusterPhyId={}||taskResult={}||msg=failed", this.taskName, clusterPhy.getId(), tr); + } else { + log.debug("class=AbstractAsyncMetricsDispatchTask||taskName={}||clusterPhyId={}||msg=success", this.taskName, clusterPhy.getId()); + } + } catch (Exception e) { + log.error("class=AbstractAsyncMetricsDispatchTask||taskName={}||clusterPhyId={}||errMsg=exception", this.taskName, clusterPhy.getId(), e); + } + } + ); + + return TaskResult.SUCCESS; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/BrokerMetricCollectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/BrokerMetricCollectorTask.java index 486b66b0..16334246 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/BrokerMetricCollectorTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/BrokerMetricCollectorTask.java @@ -7,35 +7,26 @@ import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.collector.metric.BrokerMetricCollector; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; -import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService; import org.springframework.beans.factory.annotation.Autowired; /** * @author didi */ @Task(name = "BrokerMetricCollectorTask", - description = "Broker指标采集任务,", + description = "Broker指标采集任务", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class BrokerMetricCollectorTask extends AbstractClusterPhyDispatchTask { +public class BrokerMetricCollectorTask extends AbstractAsyncMetricsDispatchTask { private static final ILog log = LogFactory.getLog(BrokerMetricCollectorTask.class); @Autowired private BrokerMetricCollector brokerMetricCollector; - @Autowired - private TaskThreadPoolService taskThreadPoolService; - @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { - taskThreadPoolService.submitHeavenTask( - String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()), - 100000, - () -> brokerMetricCollector.collectMetrics(clusterPhy) - ); + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + brokerMetricCollector.collectMetrics(clusterPhy); return TaskResult.SUCCESS; } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ClusterMetricCollectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ClusterMetricCollectorTask.java index 2b6a3b3b..4abf0372 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ClusterMetricCollectorTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ClusterMetricCollectorTask.java @@ -7,35 +7,26 @@ import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.collector.metric.ClusterMetricCollector; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; -import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService; import org.springframework.beans.factory.annotation.Autowired; /** * @author didi */ @Task(name = "ClusterMetricCollectorTask", - description = "Cluster指标采集任务,", + description = "Cluster指标采集任务", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class ClusterMetricCollectorTask extends AbstractClusterPhyDispatchTask { +public class ClusterMetricCollectorTask extends AbstractAsyncMetricsDispatchTask { private static final ILog log = LogFactory.getLog(ClusterMetricCollectorTask.class); @Autowired private ClusterMetricCollector clusterMetricCollector; - @Autowired - private TaskThreadPoolService taskThreadPoolService; - @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { - taskThreadPoolService.submitHeavenTask( - String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()), - 100000, - () -> clusterMetricCollector.collectMetrics(clusterPhy) - ); + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + clusterMetricCollector.collectMetrics(clusterPhy); return TaskResult.SUCCESS; } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/GroupMetricCollectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/GroupMetricCollectorTask.java index 5ca97906..595930b8 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/GroupMetricCollectorTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/GroupMetricCollectorTask.java @@ -7,35 +7,26 @@ import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.collector.metric.GroupMetricCollector; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; -import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService; import org.springframework.beans.factory.annotation.Autowired; /** * @author didi */ @Task(name = "GroupMetricCollectorTask", - description = "Group指标采集任务,", + description = "Group指标采集任务", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class GroupMetricCollectorTask extends AbstractClusterPhyDispatchTask { +public class GroupMetricCollectorTask extends AbstractAsyncMetricsDispatchTask { private static final ILog log = LogFactory.getLog(GroupMetricCollectorTask.class); @Autowired private GroupMetricCollector groupMetricCollector; - @Autowired - private TaskThreadPoolService taskThreadPoolService; - @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { - taskThreadPoolService.submitHeavenTask( - String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()), - 100000, - () -> groupMetricCollector.collectMetrics(clusterPhy) - ); + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + groupMetricCollector.collectMetrics(clusterPhy); return TaskResult.SUCCESS; } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/PartitionMetricCollectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/PartitionMetricCollectorTask.java index 47b2fc60..21f9133b 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/PartitionMetricCollectorTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/PartitionMetricCollectorTask.java @@ -5,34 +5,25 @@ import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; import com.xiaojukeji.know.streaming.km.collector.metric.PartitionMetricCollector; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; -import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService; import org.springframework.beans.factory.annotation.Autowired; /** * @author didi */ @Task(name = "PartitionMetricCollectorTask", - description = "Partition指标采集任务,", + description = "Partition指标采集任务", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class PartitionMetricCollectorTask extends AbstractClusterPhyDispatchTask { +public class PartitionMetricCollectorTask extends AbstractAsyncMetricsDispatchTask { @Autowired private PartitionMetricCollector partitionMetricCollector; - @Autowired - private TaskThreadPoolService taskThreadPoolService; - @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { - taskThreadPoolService.submitHeavenTask( - String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()), - 100000, - () -> partitionMetricCollector.collectMetrics(clusterPhy) - ); + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + partitionMetricCollector.collectMetrics(clusterPhy); return TaskResult.SUCCESS; } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ReplicaMetricCollectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ReplicaMetricCollectorTask.java index 5e7a738b..6b93e324 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ReplicaMetricCollectorTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ReplicaMetricCollectorTask.java @@ -5,8 +5,6 @@ import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; import com.xiaojukeji.know.streaming.km.collector.metric.ReplicaMetricCollector; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; -import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -15,26 +13,19 @@ import org.springframework.beans.factory.annotation.Autowired; */ @Slf4j @Task(name = "ReplicaMetricCollectorTask", - description = "Replica指标采集任务,", + description = "Replica指标采集任务", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class ReplicaMetricCollectorTask extends AbstractClusterPhyDispatchTask { +public class ReplicaMetricCollectorTask extends AbstractAsyncMetricsDispatchTask { @Autowired private ReplicaMetricCollector replicaMetricCollector; - @Autowired - private TaskThreadPoolService taskThreadPoolService; - @Override - protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { - taskThreadPoolService.submitHeavenTask( - String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()), - 100000, - () -> replicaMetricCollector.collectMetrics(clusterPhy) - ); + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + replicaMetricCollector.collectMetrics(clusterPhy); return TaskResult.SUCCESS; } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/TopicMetricCollectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/TopicMetricCollectorTask.java index 9c3b7863..9f8a5de7 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/TopicMetricCollectorTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/TopicMetricCollectorTask.java @@ -7,8 +7,6 @@ import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.collector.metric.TopicMetricCollector; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; -import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService; import org.springframework.beans.factory.annotation.Autowired; /** @@ -20,22 +18,15 @@ import org.springframework.beans.factory.annotation.Autowired; autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class TopicMetricCollectorTask extends AbstractClusterPhyDispatchTask { +public class TopicMetricCollectorTask extends AbstractAsyncMetricsDispatchTask { private static final ILog log = LogFactory.getLog(TopicMetricCollectorTask.class); @Autowired private TopicMetricCollector topicMetricCollector; - @Autowired - private TaskThreadPoolService taskThreadPoolService; - @Override - public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { - taskThreadPoolService.submitHeavenTask( - String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()), - 100000, - () -> topicMetricCollector.collectMetrics(clusterPhy) - ); + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + topicMetricCollector.collectMetrics(clusterPhy); return TaskResult.SUCCESS; } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/TaskThreadPoolService.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/TaskThreadPoolService.java index 884a572d..f4bb6826 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/TaskThreadPoolService.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/TaskThreadPoolService.java @@ -16,23 +16,72 @@ import javax.annotation.PostConstruct; @NoArgsConstructor public class TaskThreadPoolService { /** - * 较重任务,比如指标采集 + * metrics任务,比如指标采集 */ - private FutureNoWaitUtil heavenTaskThreadPool; + private FutureNoWaitUtil metricsTaskThreadPool; + + @Value(value = "${thread-pool.task.metrics.thread-num:18}") + private Integer metricsTaskThreadNum; + + @Value(value = "${thread-pool.task.metrics.queue-size:180}") + private Integer metricsTaskQueueSize; - @Value(value = "${thread-pool.task.heaven.thread-num:12}") - private Integer heavenTaskThreadNum; + /** + * metadata任务 + */ + private FutureNoWaitUtil metadataTaskThreadPool; - @Value(value = "${thread-pool.task.heaven.queue-size:1000}") - private Integer heavenTaskQueueSize; + @Value(value = "${thread-pool.task.metadata.thread-num:27}") + private Integer metadataTaskThreadNum; + + @Value(value = "${thread-pool.task.metadata.queue-size:270}") + private Integer metadataTaskQueueSize; + + /** + * common任务 + */ + private FutureNoWaitUtil commonTaskThreadPool; + + @Value(value = "${thread-pool.task.common.thread-num:15}") + private Integer commonTaskThreadNum; + + @Value(value = "${thread-pool.task.common.queue-size:150}") + private Integer commonTaskQueueSize; @PostConstruct private void init() { - heavenTaskThreadPool = FutureNoWaitUtil.init("heavenTaskThreadPool", heavenTaskThreadNum, heavenTaskThreadNum, heavenTaskQueueSize); + metricsTaskThreadPool = FutureNoWaitUtil.init( + "metricsTaskThreadPool", + metricsTaskThreadNum, + metricsTaskThreadNum, + metricsTaskQueueSize + ); + + metadataTaskThreadPool = FutureNoWaitUtil.init( + "metadataTaskThreadPool", + metadataTaskThreadNum, + metadataTaskThreadNum, + metadataTaskQueueSize + ); + + commonTaskThreadPool = FutureNoWaitUtil.init( + "commonTaskThreadPool", + commonTaskThreadNum, + commonTaskThreadNum, + commonTaskQueueSize + ); } - public void submitHeavenTask(String taskName, Integer timeoutUnisMs, Runnable runnable) { - heavenTaskThreadPool.runnableTask(taskName, timeoutUnisMs, runnable); + public void submitMetricsTask(String taskName, Integer timeoutUnisMs, Runnable runnable) { + metricsTaskThreadPool.runnableTask(taskName, timeoutUnisMs, runnable); + } + + public void submitMetadataTask(String taskName, Integer timeoutUnisMs, Runnable runnable) { + metadataTaskThreadPool.runnableTask(taskName, timeoutUnisMs, runnable); + } + + public void submitCommonTask(String taskName, Integer timeoutUnisMs, Runnable runnable) { + commonTaskThreadPool.runnableTask(taskName, timeoutUnisMs, runnable); } } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java new file mode 100644 index 00000000..b0886754 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java @@ -0,0 +1,76 @@ +package com.xiaojukeji.know.streaming.km.task.service.listener; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent; +import com.xiaojukeji.know.streaming.km.common.component.SpringTool; +import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; +import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; +import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; +import com.xiaojukeji.know.streaming.km.task.metadata.AbstractAsyncMetadataDispatchTask; +import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; + +@Service +public class TaskClusterAddedListener implements ApplicationListener { + private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class); + + @Override + public void onApplicationEvent(ClusterPhyAddedEvent event) { + LOGGER.info("class=TaskClusterAddedListener||method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId()); + Long now = System.currentTimeMillis(); + + // 交由KS自定义的线程池,异步执行任务 + FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now)); + } + + private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) { + ClusterPhy tempClusterPhy = null; + + // 120秒内无加载进来,则直接返回退出 + while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) { + tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId); + if (tempClusterPhy != null) { + break; + } + + BackoffUtils.backoff(1000); + } + + if (tempClusterPhy == null) { + return; + } + + // 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定 + BackoffUtils.backoff(5000); + final ClusterPhy clusterPhy = tempClusterPhy; + + // 集群执行集群元信息同步 + List metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values()); + for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) { + try { + dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs); + } catch (Exception e) { + // ignore + } + } + + // 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定 + BackoffUtils.backoff(5000); + + // 集群集群指标采集 + List metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values()); + for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) { + try { + dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs); + } catch (Exception e) { + // ignore + } + } + } +}