增加Connect相关任务

This commit is contained in:
zengqiao
2022-12-06 19:42:04 +08:00
committed by EricZeng
parent 186dcd07e0
commit 6aefc16fa0
11 changed files with 704 additions and 0 deletions

View File

@@ -0,0 +1,51 @@
package com.xiaojukeji.know.streaming.km.task.connect;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.task.AbstractDispatchTask;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
/**
* @author wyb
* @date 2022/11/7
*/
public abstract class AbstractConnectClusterDispatchTask extends AbstractDispatchTask<ConnectCluster> {
private static final ILog log = LogFactory.getLog(AbstractConnectClusterDispatchTask.class);
@Autowired
private ConnectClusterService connectClusterService;
protected abstract TaskResult processSubTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception;
@Override
protected List<ConnectCluster> listAllTasks() {
return connectClusterService.listAllClusters();
}
@Override
protected TaskResult processTask(List<ConnectCluster> subTaskList, long triggerTimeUnitMs) {
boolean allSuccess = true;
for (ConnectCluster elem : subTaskList) {
try {
log.debug("method=processTask||taskName={}||connectClusterId={}||msg=start", this.taskName, elem.getId());
TaskResult tr = this.processSubTask(elem, triggerTimeUnitMs);
if (TaskResult.SUCCESS.getCode() != tr.getCode()) {
log.warn("method=processTask||taskName={}||connectClusterId={}||msg=process failed", this.taskName, elem.getId());
allSuccess = false;
continue;
}
log.debug("method=processTask||taskName={}||connectClusterId={}||msg=finished", this.taskName, elem.getId());
} catch (Exception e) {
log.error("method=processTask||taskName={}||connectClusterId={}||errMsg=throw exception", this.taskName, elem.getId(), e);
}
}
return allSuccess ? TaskResult.SUCCESS : TaskResult.FAIL;
}
}

View File

@@ -0,0 +1,124 @@
package com.xiaojukeji.know.streaming.km.task.connect.health;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.collector.service.CollectThreadPoolService;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
import com.xiaojukeji.know.streaming.km.task.connect.metrics.AbstractAsyncMetricsDispatchTask;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
/**
* @author wyb
* @date 2022/11/8
*/
public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispatchTask {
private static final ILog log = LogFactory.getLog(AbstractHealthCheckTask.class);
@Autowired
private HealthCheckResultService healthCheckResultService;
@Autowired
private CollectThreadPoolService collectThreadPoolService;
public abstract AbstractHealthCheckService getCheckService();
@Override
public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception {
return this.calAndUpdateHealthCheckResult(connectCluster, triggerTimeUnitMs);
}
private TaskResult calAndUpdateHealthCheckResult(ConnectCluster connectCluster, long triggerTimeUnitMs){
// 获取配置,<配置名,配置信息>
Map<String, BaseClusterHealthConfig> healthConfigMap = healthCheckResultService.getClusterHealthConfig(connectCluster.getId());
// 获取资源列表
List<ClusterParam> paramList = this.getCheckService().getResList(connectCluster.getId());
// 检查结果
List<HealthCheckResult> checkResultList = Collections.synchronizedList(new ArrayList<>());
if (ValidateUtils.isEmptyList(paramList)) {
// 当前无该维度的资源,则直接设置为
checkResultList.addAll(this.getNoResResult(connectCluster.getId(), this.getCheckService(), healthConfigMap));
}
// 获取合适的线程池
FutureWaitUtil<Void> futureWaitUtil = collectThreadPoolService.selectSuitableFutureUtil(connectCluster.getId());
// 遍历资源
for (ClusterParam clusterParam: paramList) {
futureWaitUtil.runnableTask(
String.format("class=%s||method=calAndUpdateHealthCheckResult||clusterId=%d||resData=%s", this.getCheckService().getClass().getSimpleName(), connectCluster.getId(), clusterParam),
30000,
() -> checkResultList.addAll(this.checkAndGetResult(clusterParam, healthConfigMap))
);
}
futureWaitUtil.waitExecute(30000);
try {
healthCheckResultService.batchReplace(connectCluster.getId(), this.getCheckService().getHealthCheckDimensionEnum().getDimension(), checkResultList);
} catch (Exception e) {
log.error(
"class=%s||method=calAndUpdateHealthCheckResult||clusterId={}||errMsg=exception!",
this.getCheckService().getClass().getSimpleName(), connectCluster.getId(), e
);
}
return TaskResult.SUCCESS;
}
private List<HealthCheckResult> getNoResResult(Long connectClusterId, AbstractHealthCheckService healthCheckService, Map<String, BaseClusterHealthConfig> healthConfigMap) {
List<HealthCheckResult> resultList = new ArrayList<>();
// 进行检查
for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) {
HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum();
if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) {
// 类型不匹配
continue;
}
// 记录
HealthCheckResult checkResult = new HealthCheckResult(
dimensionEnum.getDimension(),
clusterHealthConfig.getCheckNameEnum().getConfigName(),
connectClusterId,
"-1"
);
checkResult.setPassed(Constant.YES);
resultList.add(checkResult);
}
return resultList;
}
private List<HealthCheckResult> checkAndGetResult(ClusterParam clusterParam,
Map<String, BaseClusterHealthConfig> healthConfigMap) {
List<HealthCheckResult> resultList = new ArrayList<>();
// 进行检查
for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) {
HealthCheckResult healthCheckResult = this.getCheckService().checkAndGetResult(clusterParam, clusterHealthConfig);
if (healthCheckResult == null) {
continue;
}
// 记录
resultList.add(healthCheckResult);
}
return resultList;
}
}

View File

@@ -0,0 +1,32 @@
package com.xiaojukeji.know.streaming.km.task.connect.health;
import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
import com.xiaojukeji.know.streaming.km.core.service.health.checker.connect.HealthCheckConnectClusterService;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author wyc
* @date 2022/11/9
*/
@NoArgsConstructor
@AllArgsConstructor
@Task(name = "ConnectClusterHealthCheckTask",
description = "ConnectCluster健康检查",
cron = "0 0/1 * * * ? *",
autoRegister = true,
consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60)
public class ConnectClusterHealthCheckTask extends AbstractHealthCheckTask {
@Autowired
private HealthCheckConnectClusterService healthCheckConnectClusterService;
@Override
public AbstractHealthCheckService getCheckService() {
return healthCheckConnectClusterService;
}
}

View File

@@ -0,0 +1,32 @@
package com.xiaojukeji.know.streaming.km.task.connect.health;
import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
import com.xiaojukeji.know.streaming.km.core.service.health.checker.connect.HealthCheckConnectorService;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author wyb
* @date 2022/11/8
*/
@NoArgsConstructor
@AllArgsConstructor
@Task(name = "ConnectorHealthCheckTask",
description = "Connector健康检查",
cron = "0 0/1 * * * ? *",
autoRegister = true,
consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60)
public class ConnectorHealthCheckTask extends AbstractHealthCheckTask {
@Autowired
HealthCheckConnectorService healthCheckConnectorService;
@Override
public AbstractHealthCheckService getCheckService() {
return healthCheckConnectorService;
}
}

View File

@@ -0,0 +1,47 @@
package com.xiaojukeji.know.streaming.km.task.connect.metadata;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import com.xiaojukeji.know.streaming.km.task.connect.AbstractConnectClusterDispatchTask;
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 元数据同步相关任务
*/
public abstract class AbstractAsyncMetadataDispatchTask extends AbstractConnectClusterDispatchTask {
private static final ILog log = LogFactory.getLog(AbstractAsyncMetadataDispatchTask.class);
public abstract TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception;
@Autowired
private TaskThreadPoolService taskThreadPoolService;
@Override
protected TaskResult processSubTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception {
return this.asyncProcessSubTask(connectCluster, triggerTimeUnitMs);
}
public TaskResult asyncProcessSubTask(ConnectCluster connectCluster, long triggerTimeUnitMs) {
taskThreadPoolService.submitMetadataTask(
String.format("taskName=%s||clusterPhyId=%d", this.taskName, connectCluster.getId()),
this.timeoutUnitSec.intValue() * 1000,
() -> {
try {
TaskResult tr = this.processClusterTask(connectCluster, triggerTimeUnitMs);
if (TaskResult.SUCCESS_CODE != tr.getCode()) {
log.error("class=AbstractAsyncMetadataDispatchTask||taskName={}||connectClusterId={}||taskResult={}||msg=failed", this.taskName, connectCluster.getId(), tr);
} else {
log.debug("class=AbstractAsyncMetadataDispatchTask||taskName={}||connectClusterId={}||msg=success", this.taskName, connectCluster.getId());
}
} catch (Exception e) {
log.error("class=AbstractAsyncMetadataDispatchTask||taskName={}||connectClusterId={}||errMsg=exception", this.taskName, connectCluster.getId(), e);
}
}
);
return TaskResult.SUCCESS;
}
}

View File

@@ -0,0 +1,60 @@
package com.xiaojukeji.know.streaming.km.task.connect.metadata;
import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@Task(name = "SyncConnectorTask",
description = "Connector信息同步到DB",
cron = "0 0/1 * * * ? *",
autoRegister = true,
consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60)
public class SyncConnectorTask extends AbstractAsyncMetadataDispatchTask {
private static final ILog LOGGER = LogFactory.getLog(SyncConnectorTask.class);
@Autowired
private ConnectorService connectorService;
@Override
public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) {
Result<List<String>> nameListResult = connectorService.listConnectorsFromCluster(connectCluster.getId());
if (nameListResult.failed()) {
return TaskResult.FAIL;
}
boolean allSuccess = true;
List<KSConnector> connectorList = new ArrayList<>();
for (String connectorName: nameListResult.getData()) {
Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(connectCluster.getId(), connectorName);
if (ksConnectorResult.failed()) {
LOGGER.error(
"class=SyncConnectorTask||method=processClusterTask||connectClusterId={}||connectorName={}||result={}",
connectCluster.getId(), connectorName, ksConnectorResult
);
allSuccess = false;
continue;
}
connectorList.add(ksConnectorResult.getData());
}
connectorService.batchReplace(connectCluster.getKafkaClusterPhyId(), connectCluster.getId(), connectorList, new HashSet<>(nameListResult.getData()));
return allSuccess? TaskResult.SUCCESS: TaskResult.FAIL;
}
}

View File

@@ -0,0 +1,83 @@
package com.xiaojukeji.know.streaming.km.task.connect.metadata;
import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.WorkerConnector;
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSGroupDescription;
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.List;
/**
* @author wyb
* @date 2022/11/14
*/
@Task(name = "SyncWorkerConnectorTask",
description = "WorkerConnector信息同步到DB",
cron = "0 0/1 * * * ? *",
autoRegister = true,
consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60)
public class SyncWorkerConnectorTask extends AbstractAsyncMetadataDispatchTask{
private static final ILog LOGGER = LogFactory.getLog(SyncWorkerConnectorTask.class);
@Autowired
ConnectorService connectorService;
@Autowired
private GroupService groupService;
@Autowired
private WorkerConnectorService workerConnectorService;
@Override
public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception {
List<ConnectorPO> connectorPOList = connectorService.listByConnectClusterIdFromDB(connectCluster.getId());
if (connectorPOList.isEmpty()) {
return TaskResult.SUCCESS;
}
//获取集群信息
ClusterPhy clusterPhy = LoadedClusterPhyCache.getByPhyId(connectCluster.getKafkaClusterPhyId());
if (clusterPhy == null) {
LOGGER.error(
"class=SyncWorkerConnectorTask||method=processClusterTask||connectClusterId={}||clusterPhyId={}||errMsg=clusterPhy not exist!.",
connectCluster.getId(), connectCluster.getKafkaClusterPhyId()
);
return TaskResult.FAIL;
}
KSGroupDescription ksGroupDescription = groupService.getGroupDescriptionFromKafka(clusterPhy, connectCluster.getGroupName());
//获取workerConnector列表
try {
List<WorkerConnector> workerConnectorList = new ArrayList<>();
for (ConnectorPO connectorPO : connectorPOList) {
workerConnectorList.addAll(workerConnectorService.getWorkerConnectorListFromCluster(connectCluster, connectorPO.getConnectorName()));
}
workerConnectorService.batchReplaceInDB(connectCluster.getId(), workerConnectorList);
} catch (Exception e) {
LOGGER.error(
"class=SyncWorkerConnectorTask||method=processClusterTask||connectClusterId={}||ksGroupDescription={}||errMsg=exception.",
connectCluster.getId(), ksGroupDescription, e
);
return TaskResult.FAIL;
}
return TaskResult.SUCCESS;
}
}

View File

@@ -0,0 +1,47 @@
package com.xiaojukeji.know.streaming.km.task.connect.metrics;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import com.xiaojukeji.know.streaming.km.task.connect.AbstractConnectClusterDispatchTask;
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
import org.springframework.beans.factory.annotation.Autowired;
/**
* Metrics相关任务
*/
public abstract class AbstractAsyncMetricsDispatchTask extends AbstractConnectClusterDispatchTask {
private static final ILog log = LogFactory.getLog(AbstractAsyncMetricsDispatchTask.class);
public abstract TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception;
@Autowired
private TaskThreadPoolService taskThreadPoolService;
@Override
protected TaskResult processSubTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception {
return this.asyncProcessSubTask(connectCluster, triggerTimeUnitMs);
}
public TaskResult asyncProcessSubTask(ConnectCluster connectCluster, long triggerTimeUnitMs) {
taskThreadPoolService.submitMetricsTask(
String.format("taskName=%s||clusterPhyId=%d", this.taskName, connectCluster.getId()),
this.timeoutUnitSec.intValue() * 1000,
() -> {
try {
TaskResult tr = this.processClusterTask(connectCluster, triggerTimeUnitMs);
if (TaskResult.SUCCESS_CODE != tr.getCode()) {
log.error("class=AbstractAsyncMetricsDispatchTask||taskName={}||connectClusterId={}||taskResult={}||msg=failed", this.taskName, connectCluster.getId(), tr);
} else {
log.debug("class=AbstractAsyncMetricsDispatchTask||taskName={}||connectClusterId={}||msg=success", this.taskName, connectCluster.getId());
}
} catch (Exception e) {
log.error("class=AbstractAsyncMetricsDispatchTask||taskName={}||connectClusterId={}||errMsg=exception", this.taskName, connectCluster.getId(), e);
}
}
);
return TaskResult.SUCCESS;
}
}

View File

@@ -0,0 +1,31 @@
package com.xiaojukeji.know.streaming.km.task.connect.metrics;
import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.xiaojukeji.know.streaming.km.collector.metric.connect.ConnectClusterMetricCollector;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author wyb
* @date 2022/11/7
*/
@Task(name = "ConnectClusterMetricCollectorTask",
description = "ConnectCluster指标采集任务",
cron = "0 0/1 * * * ? *",
autoRegister = true,
consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60)
public class ConnectClusterMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
@Autowired
private ConnectClusterMetricCollector connectClusterMetricCollector;
@Override
public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception {
connectClusterMetricCollector.collectMetrics(connectCluster);
return TaskResult.SUCCESS;
}
}

View File

@@ -0,0 +1,31 @@
package com.xiaojukeji.know.streaming.km.task.connect.metrics;
import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.xiaojukeji.know.streaming.km.collector.metric.connect.ConnectConnectorMetricCollector;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author wyb
* @date 2022/11/7
*/
@Task(name = "ConnectorMetricCollectorTask",
description = "Connector指标采集任务",
cron = "0 0/1 * * * ? *",
autoRegister = true,
consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60)
public class ConnectorMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
@Autowired
private ConnectConnectorMetricCollector connectConnectorMetricCollector;
@Override
public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception {
connectConnectorMetricCollector.collectMetrics(connectCluster);
return TaskResult.SUCCESS;
}
}

View File

@@ -0,0 +1,166 @@
package com.xiaojukeji.know.streaming.km.task.kafka.metadata;
import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectClusterMetadata;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectWorker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group;
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSGroupDescription;
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberConnectAssignment;
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberDescription;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.persistence.connect.cache.LoadedConnectClusterCache;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum.CONNECT_CLUSTER_PROTOCOL_TYPE;
@Task(name = "SyncClusterAndWorkerTask",
description = "Connect-Cluster&Worker信息同步到DB",
cron = "0 0/1 * * * ? *",
autoRegister = true,
consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60)
public class SyncConnectClusterAndWorkerTask extends AbstractAsyncMetadataDispatchTask {
private static final ILog LOGGER = LogFactory.getLog(SyncConnectClusterAndWorkerTask.class);
@Autowired
private GroupService groupService;
@Autowired
private WorkerService workerService;
@Autowired
private WorkerConnectorService workerConnectorService;
@Autowired
private ConnectClusterService connectClusterService;
@Override
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
boolean allSuccess = true;
//获取connect集群
List<Group> groupList = groupService.listClusterGroups(clusterPhy.getId()).stream().filter(elem->elem.getType()==GroupTypeEnum.CONNECT_CLUSTER).collect(Collectors.toList());
for (Group group: groupList) {
try {
KSGroupDescription ksGroupDescription = groupService.getGroupDescriptionFromKafka(clusterPhy, group.getName());
if (!ksGroupDescription.protocolType().equals(CONNECT_CLUSTER_PROTOCOL_TYPE)) {
continue;
}
Result<Long> rl = this.handleConnectClusterMetadata(clusterPhy.getId(), group.getName(), ksGroupDescription);
if (rl.failed()) {
allSuccess = false;
continue;
}
Result<Void> rv = this.handleWorkerMetadata(rl.getData(), ksGroupDescription);
if (rv.failed()) {
allSuccess = false;
}
} catch (Exception e) {
LOGGER.error(
"class=SyncClusterAndWorkerTask||method=processClusterTask||clusterPhyId={}||groupName={}||errMsg=exception.",
clusterPhy.getId(), group.getName(), e
);
allSuccess = false;
}
}
return allSuccess? TaskResult.SUCCESS: TaskResult.FAIL;
}
private Result<Void> handleWorkerMetadata(Long connectClusterId, KSGroupDescription ksGroupDescription) {
try {
List<ConnectWorker> workerList = new ArrayList<>();
ConnectCluster connectCluster = LoadedConnectClusterCache.getByPhyId(connectClusterId);
for (KSMemberDescription memberDescription: ksGroupDescription.members()) {
KSMemberConnectAssignment assignment = (KSMemberConnectAssignment) memberDescription.assignment();
if (assignment != null) {
workerList.add(new ConnectWorker(
connectCluster.getKafkaClusterPhyId(),
connectClusterId,
memberDescription.consumerId(),
memberDescription.host().substring(1),
Constant.INVALID_CODE,
assignment.getWorkerState().url(),
assignment.getAssignment().leaderUrl(),
memberDescription.consumerId().equals(assignment.getAssignment().leader()) ? Constant.YES : Constant.NO
));
} else {
workerList.add(new ConnectWorker(
connectCluster.getKafkaClusterPhyId(),
connectClusterId,
memberDescription.consumerId(),
memberDescription.host().substring(1),
Constant.INVALID_CODE,
"",
"",
Constant.NO
));
}
}
workerService.batchReplaceInDB(connectClusterId, workerList);
} catch (Exception e) {
LOGGER.error(
"class=SyncClusterAndWorkerTask||method=handleWorkerMetadata||connectClusterId={}||ksGroupDescription={}||errMsg=exception.",
connectClusterId, ksGroupDescription, e
);
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_OPERATE_FAILED, e.getMessage());
}
return Result.buildSuc();
}
private Result<Long> handleConnectClusterMetadata(Long clusterPhyId, String groupName, KSGroupDescription ksGroupDescription) {
try {
for (KSMemberDescription memberDescription: ksGroupDescription.members()) {
KSMemberConnectAssignment assignment = (KSMemberConnectAssignment) memberDescription.assignment();
ConnectClusterMetadata metadata = new ConnectClusterMetadata(
clusterPhyId,
groupName,
GroupStateEnum.getByRawState(ksGroupDescription.state()),
assignment == null? "": assignment.getAssignment().leaderUrl()
);
Long connectClusterId = connectClusterService.replaceAndReturnIdInDB(metadata);
return Result.buildSuc(connectClusterId);
}
} catch (Exception e) {
LOGGER.error(
"class=SyncClusterAndWorkerTask||method=handleConnectClusterMetadata||clusterPhyId={}||groupName={}||ksGroupDescription={}||errMsg=exception.",
clusterPhyId, groupName, ksGroupDescription, e
);
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_OPERATE_FAILED, e.getMessage());
}
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, "消费组无成员");
}
}