diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/AbstractConnectClusterDispatchTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/AbstractConnectClusterDispatchTask.java new file mode 100644 index 00000000..e3c1e611 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/AbstractConnectClusterDispatchTask.java @@ -0,0 +1,51 @@ +package com.xiaojukeji.know.streaming.km.task.connect; + +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; +import com.xiaojukeji.know.streaming.km.task.AbstractDispatchTask; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.List; + +/** + * @author wyb + * @date 2022/11/7 + */ +public abstract class AbstractConnectClusterDispatchTask extends AbstractDispatchTask { + private static final ILog log = LogFactory.getLog(AbstractConnectClusterDispatchTask.class); + + @Autowired + private ConnectClusterService connectClusterService; + + protected abstract TaskResult processSubTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception; + @Override + protected List listAllTasks() { + return connectClusterService.listAllClusters(); + } + + @Override + protected TaskResult processTask(List subTaskList, long triggerTimeUnitMs) { + boolean allSuccess = true; + for (ConnectCluster elem : subTaskList) { + try { + log.debug("method=processTask||taskName={}||connectClusterId={}||msg=start", this.taskName, elem.getId()); + + TaskResult tr = this.processSubTask(elem, triggerTimeUnitMs); + if (TaskResult.SUCCESS.getCode() != tr.getCode()) { + log.warn("method=processTask||taskName={}||connectClusterId={}||msg=process failed", this.taskName, elem.getId()); + allSuccess = false; + continue; + } + + log.debug("method=processTask||taskName={}||connectClusterId={}||msg=finished", this.taskName, elem.getId()); + } catch (Exception e) { + log.error("method=processTask||taskName={}||connectClusterId={}||errMsg=throw exception", this.taskName, elem.getId(), e); + } + } + + return allSuccess ? TaskResult.SUCCESS : TaskResult.FAIL; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/health/AbstractHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/health/AbstractHealthCheckTask.java new file mode 100644 index 00000000..2f726f80 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/health/AbstractHealthCheckTask.java @@ -0,0 +1,124 @@ +package com.xiaojukeji.know.streaming.km.task.connect.health; + +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.collector.service.CollectThreadPoolService; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; +import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; +import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; +import com.xiaojukeji.know.streaming.km.task.connect.metrics.AbstractAsyncMetricsDispatchTask; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.*; + +/** + * @author wyb + * @date 2022/11/8 + */ +public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispatchTask { + private static final ILog log = LogFactory.getLog(AbstractHealthCheckTask.class); + + @Autowired + private HealthCheckResultService healthCheckResultService; + + @Autowired + private CollectThreadPoolService collectThreadPoolService; + + public abstract AbstractHealthCheckService getCheckService(); + + @Override + public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception { + return this.calAndUpdateHealthCheckResult(connectCluster, triggerTimeUnitMs); + } + + private TaskResult calAndUpdateHealthCheckResult(ConnectCluster connectCluster, long triggerTimeUnitMs){ + // 获取配置,<配置名,配置信息> + Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(connectCluster.getId()); + + // 获取资源列表 + List paramList = this.getCheckService().getResList(connectCluster.getId()); + + // 检查结果 + List checkResultList = Collections.synchronizedList(new ArrayList<>()); + if (ValidateUtils.isEmptyList(paramList)) { + // 当前无该维度的资源,则直接设置为 + checkResultList.addAll(this.getNoResResult(connectCluster.getId(), this.getCheckService(), healthConfigMap)); + } + + // 获取合适的线程池 + FutureWaitUtil futureWaitUtil = collectThreadPoolService.selectSuitableFutureUtil(connectCluster.getId()); + + // 遍历资源 + for (ClusterParam clusterParam: paramList) { + futureWaitUtil.runnableTask( + String.format("class=%s||method=calAndUpdateHealthCheckResult||clusterId=%d||resData=%s", this.getCheckService().getClass().getSimpleName(), connectCluster.getId(), clusterParam), + 30000, + () -> checkResultList.addAll(this.checkAndGetResult(clusterParam, healthConfigMap)) + ); + } + + futureWaitUtil.waitExecute(30000); + + try { + healthCheckResultService.batchReplace(connectCluster.getId(), this.getCheckService().getHealthCheckDimensionEnum().getDimension(), checkResultList); + } catch (Exception e) { + log.error( + "class=%s||method=calAndUpdateHealthCheckResult||clusterId={}||errMsg=exception!", + this.getCheckService().getClass().getSimpleName(), connectCluster.getId(), e + ); + } + + return TaskResult.SUCCESS; + } + + private List getNoResResult(Long connectClusterId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { + List resultList = new ArrayList<>(); + + // 进行检查 + for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { + HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); + if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { + // 类型不匹配 + continue; + } + + // 记录 + HealthCheckResult checkResult = new HealthCheckResult( + dimensionEnum.getDimension(), + clusterHealthConfig.getCheckNameEnum().getConfigName(), + connectClusterId, + "-1" + ); + checkResult.setPassed(Constant.YES); + resultList.add(checkResult); + } + + return resultList; + } + + private List checkAndGetResult(ClusterParam clusterParam, + Map healthConfigMap) { + List resultList = new ArrayList<>(); + + // 进行检查 + for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { + HealthCheckResult healthCheckResult = this.getCheckService().checkAndGetResult(clusterParam, clusterHealthConfig); + if (healthCheckResult == null) { + continue; + } + + // 记录 + resultList.add(healthCheckResult); + } + + return resultList; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/health/ConnectClusterHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/health/ConnectClusterHealthCheckTask.java new file mode 100644 index 00000000..d0dc6b11 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/health/ConnectClusterHealthCheckTask.java @@ -0,0 +1,32 @@ +package com.xiaojukeji.know.streaming.km.task.connect.health; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.connect.HealthCheckConnectClusterService; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author wyc + * @date 2022/11/9 + */ +@NoArgsConstructor +@AllArgsConstructor +@Task(name = "ConnectClusterHealthCheckTask", + description = "ConnectCluster健康检查", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class ConnectClusterHealthCheckTask extends AbstractHealthCheckTask { + + @Autowired + private HealthCheckConnectClusterService healthCheckConnectClusterService; + + @Override + public AbstractHealthCheckService getCheckService() { + return healthCheckConnectClusterService; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/health/ConnectorHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/health/ConnectorHealthCheckTask.java new file mode 100644 index 00000000..eebd201b --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/health/ConnectorHealthCheckTask.java @@ -0,0 +1,32 @@ +package com.xiaojukeji.know.streaming.km.task.connect.health; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.connect.HealthCheckConnectorService; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author wyb + * @date 2022/11/8 + */ +@NoArgsConstructor +@AllArgsConstructor +@Task(name = "ConnectorHealthCheckTask", + description = "Connector健康检查", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class ConnectorHealthCheckTask extends AbstractHealthCheckTask { + + @Autowired + HealthCheckConnectorService healthCheckConnectorService; + + @Override + public AbstractHealthCheckService getCheckService() { + return healthCheckConnectorService; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/AbstractAsyncMetadataDispatchTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/AbstractAsyncMetadataDispatchTask.java new file mode 100644 index 00000000..fa46be8e --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/AbstractAsyncMetadataDispatchTask.java @@ -0,0 +1,47 @@ +package com.xiaojukeji.know.streaming.km.task.connect.metadata; + +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.task.connect.AbstractConnectClusterDispatchTask; +import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * 元数据同步相关任务 + */ +public abstract class AbstractAsyncMetadataDispatchTask extends AbstractConnectClusterDispatchTask { + private static final ILog log = LogFactory.getLog(AbstractAsyncMetadataDispatchTask.class); + + public abstract TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception; + + @Autowired + private TaskThreadPoolService taskThreadPoolService; + + @Override + protected TaskResult processSubTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception { + return this.asyncProcessSubTask(connectCluster, triggerTimeUnitMs); + } + + public TaskResult asyncProcessSubTask(ConnectCluster connectCluster, long triggerTimeUnitMs) { + taskThreadPoolService.submitMetadataTask( + String.format("taskName=%s||clusterPhyId=%d", this.taskName, connectCluster.getId()), + this.timeoutUnitSec.intValue() * 1000, + () -> { + try { + TaskResult tr = this.processClusterTask(connectCluster, triggerTimeUnitMs); + if (TaskResult.SUCCESS_CODE != tr.getCode()) { + log.error("class=AbstractAsyncMetadataDispatchTask||taskName={}||connectClusterId={}||taskResult={}||msg=failed", this.taskName, connectCluster.getId(), tr); + } else { + log.debug("class=AbstractAsyncMetadataDispatchTask||taskName={}||connectClusterId={}||msg=success", this.taskName, connectCluster.getId()); + } + } catch (Exception e) { + log.error("class=AbstractAsyncMetadataDispatchTask||taskName={}||connectClusterId={}||errMsg=exception", this.taskName, connectCluster.getId(), e); + } + } + ); + + return TaskResult.SUCCESS; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java new file mode 100644 index 00000000..4d45e7c2 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java @@ -0,0 +1,60 @@ +package com.xiaojukeji.know.streaming.km.task.connect.metadata; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + + +@Task(name = "SyncConnectorTask", + description = "Connector信息同步到DB", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class SyncConnectorTask extends AbstractAsyncMetadataDispatchTask { + private static final ILog LOGGER = LogFactory.getLog(SyncConnectorTask.class); + + @Autowired + private ConnectorService connectorService; + + @Override + public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) { + Result> nameListResult = connectorService.listConnectorsFromCluster(connectCluster.getId()); + if (nameListResult.failed()) { + return TaskResult.FAIL; + } + + boolean allSuccess = true; + + List connectorList = new ArrayList<>(); + for (String connectorName: nameListResult.getData()) { + Result ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(connectCluster.getId(), connectorName); + if (ksConnectorResult.failed()) { + LOGGER.error( + "class=SyncConnectorTask||method=processClusterTask||connectClusterId={}||connectorName={}||result={}", + connectCluster.getId(), connectorName, ksConnectorResult + ); + + allSuccess = false; + continue; + } + + connectorList.add(ksConnectorResult.getData()); + } + + connectorService.batchReplace(connectCluster.getKafkaClusterPhyId(), connectCluster.getId(), connectorList, new HashSet<>(nameListResult.getData())); + + return allSuccess? TaskResult.SUCCESS: TaskResult.FAIL; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncWorkerConnectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncWorkerConnectorTask.java new file mode 100644 index 00000000..2d576a06 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncWorkerConnectorTask.java @@ -0,0 +1,83 @@ +package com.xiaojukeji.know.streaming.km.task.connect.metadata; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.WorkerConnector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSGroupDescription; +import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; +import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.List; +/** + * @author wyb + * @date 2022/11/14 + */ +@Task(name = "SyncWorkerConnectorTask", + description = "WorkerConnector信息同步到DB", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class SyncWorkerConnectorTask extends AbstractAsyncMetadataDispatchTask{ + + private static final ILog LOGGER = LogFactory.getLog(SyncWorkerConnectorTask.class); + + @Autowired + ConnectorService connectorService; + + @Autowired + private GroupService groupService; + + @Autowired + private WorkerConnectorService workerConnectorService; + + @Override + public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception { + List connectorPOList = connectorService.listByConnectClusterIdFromDB(connectCluster.getId()); + if (connectorPOList.isEmpty()) { + return TaskResult.SUCCESS; + } + + //获取集群信息 + ClusterPhy clusterPhy = LoadedClusterPhyCache.getByPhyId(connectCluster.getKafkaClusterPhyId()); + if (clusterPhy == null) { + LOGGER.error( + "class=SyncWorkerConnectorTask||method=processClusterTask||connectClusterId={}||clusterPhyId={}||errMsg=clusterPhy not exist!.", + connectCluster.getId(), connectCluster.getKafkaClusterPhyId() + ); + return TaskResult.FAIL; + + } + KSGroupDescription ksGroupDescription = groupService.getGroupDescriptionFromKafka(clusterPhy, connectCluster.getGroupName()); + + //获取workerConnector列表 + try { + List workerConnectorList = new ArrayList<>(); + for (ConnectorPO connectorPO : connectorPOList) { + workerConnectorList.addAll(workerConnectorService.getWorkerConnectorListFromCluster(connectCluster, connectorPO.getConnectorName())); + } + workerConnectorService.batchReplaceInDB(connectCluster.getId(), workerConnectorList); + } catch (Exception e) { + LOGGER.error( + "class=SyncWorkerConnectorTask||method=processClusterTask||connectClusterId={}||ksGroupDescription={}||errMsg=exception.", + connectCluster.getId(), ksGroupDescription, e + ); + + return TaskResult.FAIL; + } + + return TaskResult.SUCCESS; + } + + +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metrics/AbstractAsyncMetricsDispatchTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metrics/AbstractAsyncMetricsDispatchTask.java new file mode 100644 index 00000000..72fa1430 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metrics/AbstractAsyncMetricsDispatchTask.java @@ -0,0 +1,47 @@ +package com.xiaojukeji.know.streaming.km.task.connect.metrics; + +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.task.connect.AbstractConnectClusterDispatchTask; +import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * Metrics相关任务 + */ +public abstract class AbstractAsyncMetricsDispatchTask extends AbstractConnectClusterDispatchTask { + private static final ILog log = LogFactory.getLog(AbstractAsyncMetricsDispatchTask.class); + + public abstract TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception; + + @Autowired + private TaskThreadPoolService taskThreadPoolService; + + @Override + protected TaskResult processSubTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception { + return this.asyncProcessSubTask(connectCluster, triggerTimeUnitMs); + } + + public TaskResult asyncProcessSubTask(ConnectCluster connectCluster, long triggerTimeUnitMs) { + taskThreadPoolService.submitMetricsTask( + String.format("taskName=%s||clusterPhyId=%d", this.taskName, connectCluster.getId()), + this.timeoutUnitSec.intValue() * 1000, + () -> { + try { + TaskResult tr = this.processClusterTask(connectCluster, triggerTimeUnitMs); + if (TaskResult.SUCCESS_CODE != tr.getCode()) { + log.error("class=AbstractAsyncMetricsDispatchTask||taskName={}||connectClusterId={}||taskResult={}||msg=failed", this.taskName, connectCluster.getId(), tr); + } else { + log.debug("class=AbstractAsyncMetricsDispatchTask||taskName={}||connectClusterId={}||msg=success", this.taskName, connectCluster.getId()); + } + } catch (Exception e) { + log.error("class=AbstractAsyncMetricsDispatchTask||taskName={}||connectClusterId={}||errMsg=exception", this.taskName, connectCluster.getId(), e); + } + } + ); + + return TaskResult.SUCCESS; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metrics/ConnectClusterMetricCollectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metrics/ConnectClusterMetricCollectorTask.java new file mode 100644 index 00000000..1fa1672c --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metrics/ConnectClusterMetricCollectorTask.java @@ -0,0 +1,31 @@ +package com.xiaojukeji.know.streaming.km.task.connect.metrics; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.xiaojukeji.know.streaming.km.collector.metric.connect.ConnectClusterMetricCollector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author wyb + * @date 2022/11/7 + */ +@Task(name = "ConnectClusterMetricCollectorTask", + description = "ConnectCluster指标采集任务", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class ConnectClusterMetricCollectorTask extends AbstractAsyncMetricsDispatchTask { + + @Autowired + private ConnectClusterMetricCollector connectClusterMetricCollector; + + @Override + public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception { + connectClusterMetricCollector.collectMetrics(connectCluster); + + return TaskResult.SUCCESS; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metrics/ConnectorMetricCollectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metrics/ConnectorMetricCollectorTask.java new file mode 100644 index 00000000..d3422f6f --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metrics/ConnectorMetricCollectorTask.java @@ -0,0 +1,31 @@ +package com.xiaojukeji.know.streaming.km.task.connect.metrics; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.xiaojukeji.know.streaming.km.collector.metric.connect.ConnectConnectorMetricCollector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author wyb + * @date 2022/11/7 + */ +@Task(name = "ConnectorMetricCollectorTask", + description = "Connector指标采集任务", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class ConnectorMetricCollectorTask extends AbstractAsyncMetricsDispatchTask { + + @Autowired + private ConnectConnectorMetricCollector connectConnectorMetricCollector; + + @Override + public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception { + connectConnectorMetricCollector.collectMetrics(connectCluster); + + return TaskResult.SUCCESS; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncConnectClusterAndWorkerTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncConnectClusterAndWorkerTask.java new file mode 100644 index 00000000..76fb2b99 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncConnectClusterAndWorkerTask.java @@ -0,0 +1,166 @@ +package com.xiaojukeji.know.streaming.km.task.kafka.metadata; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectClusterMetadata; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectWorker; +import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group; +import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSGroupDescription; +import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberConnectAssignment; +import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberDescription; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; +import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum; +import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; +import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService; +import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; +import com.xiaojukeji.know.streaming.km.persistence.connect.cache.LoadedConnectClusterCache; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum.CONNECT_CLUSTER_PROTOCOL_TYPE; + +@Task(name = "SyncClusterAndWorkerTask", + description = "Connect-Cluster&Worker信息同步到DB", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class SyncConnectClusterAndWorkerTask extends AbstractAsyncMetadataDispatchTask { + private static final ILog LOGGER = LogFactory.getLog(SyncConnectClusterAndWorkerTask.class); + + @Autowired + private GroupService groupService; + + @Autowired + private WorkerService workerService; + + @Autowired + private WorkerConnectorService workerConnectorService; + + @Autowired + private ConnectClusterService connectClusterService; + + @Override + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + boolean allSuccess = true; + + //获取connect集群 + List groupList = groupService.listClusterGroups(clusterPhy.getId()).stream().filter(elem->elem.getType()==GroupTypeEnum.CONNECT_CLUSTER).collect(Collectors.toList()); + for (Group group: groupList) { + + try { + KSGroupDescription ksGroupDescription = groupService.getGroupDescriptionFromKafka(clusterPhy, group.getName()); + if (!ksGroupDescription.protocolType().equals(CONNECT_CLUSTER_PROTOCOL_TYPE)) { + continue; + } + + Result rl = this.handleConnectClusterMetadata(clusterPhy.getId(), group.getName(), ksGroupDescription); + if (rl.failed()) { + allSuccess = false; + continue; + } + + Result rv = this.handleWorkerMetadata(rl.getData(), ksGroupDescription); + if (rv.failed()) { + allSuccess = false; + } + + } catch (Exception e) { + LOGGER.error( + "class=SyncClusterAndWorkerTask||method=processClusterTask||clusterPhyId={}||groupName={}||errMsg=exception.", + clusterPhy.getId(), group.getName(), e + ); + + allSuccess = false; + } + } + + return allSuccess? TaskResult.SUCCESS: TaskResult.FAIL; + } + + + private Result handleWorkerMetadata(Long connectClusterId, KSGroupDescription ksGroupDescription) { + try { + List workerList = new ArrayList<>(); + ConnectCluster connectCluster = LoadedConnectClusterCache.getByPhyId(connectClusterId); + + for (KSMemberDescription memberDescription: ksGroupDescription.members()) { + KSMemberConnectAssignment assignment = (KSMemberConnectAssignment) memberDescription.assignment(); + if (assignment != null) { + workerList.add(new ConnectWorker( + connectCluster.getKafkaClusterPhyId(), + connectClusterId, + memberDescription.consumerId(), + memberDescription.host().substring(1), + Constant.INVALID_CODE, + assignment.getWorkerState().url(), + assignment.getAssignment().leaderUrl(), + memberDescription.consumerId().equals(assignment.getAssignment().leader()) ? Constant.YES : Constant.NO + )); + } else { + workerList.add(new ConnectWorker( + connectCluster.getKafkaClusterPhyId(), + connectClusterId, + memberDescription.consumerId(), + memberDescription.host().substring(1), + Constant.INVALID_CODE, + "", + "", + Constant.NO + )); + } + } + + workerService.batchReplaceInDB(connectClusterId, workerList); + } catch (Exception e) { + LOGGER.error( + "class=SyncClusterAndWorkerTask||method=handleWorkerMetadata||connectClusterId={}||ksGroupDescription={}||errMsg=exception.", + connectClusterId, ksGroupDescription, e + ); + + return Result.buildFromRSAndMsg(ResultStatus.MYSQL_OPERATE_FAILED, e.getMessage()); + } + + return Result.buildSuc(); + } + + private Result handleConnectClusterMetadata(Long clusterPhyId, String groupName, KSGroupDescription ksGroupDescription) { + try { + for (KSMemberDescription memberDescription: ksGroupDescription.members()) { + KSMemberConnectAssignment assignment = (KSMemberConnectAssignment) memberDescription.assignment(); + + ConnectClusterMetadata metadata = new ConnectClusterMetadata( + clusterPhyId, + groupName, + GroupStateEnum.getByRawState(ksGroupDescription.state()), + assignment == null? "": assignment.getAssignment().leaderUrl() + ); + + Long connectClusterId = connectClusterService.replaceAndReturnIdInDB(metadata); + + return Result.buildSuc(connectClusterId); + } + } catch (Exception e) { + LOGGER.error( + "class=SyncClusterAndWorkerTask||method=handleConnectClusterMetadata||clusterPhyId={}||groupName={}||ksGroupDescription={}||errMsg=exception.", + clusterPhyId, groupName, ksGroupDescription, e + ); + + return Result.buildFromRSAndMsg(ResultStatus.MYSQL_OPERATE_FAILED, e.getMessage()); + } + + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, "消费组无成员"); + } +}