diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/JmxClientLegalFlusher.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/JmxClientLegalFlusher.java new file mode 100644 index 00000000..40a55d47 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/JmxClientLegalFlusher.java @@ -0,0 +1,44 @@ +package com.xiaojukeji.know.streaming.km.core.flusher; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; +import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; +import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +/** + * JMX连接检查 + */ +@Service +public class JmxClientLegalFlusher { + private static final ILog LOGGER = LogFactory.getLog(JmxClientLegalFlusher.class); + + @Autowired + private BrokerService brokerService; + + @Autowired + private KafkaJMXClient kafkaJMXClient; + + @Scheduled(cron="0 0/1 * * * ?") + public void checkJmxClient() { + for (ClusterPhy clusterPhy: LoadedClusterPhyCache.listAll().values()) { + FutureUtil.quickStartupFutureUtil.submitTask( + () -> { + try { + kafkaJMXClient.checkAndRemoveIfIllegal( + clusterPhy.getId(), + brokerService.listAliveBrokersFromDB(clusterPhy.getId()) + ); + } catch (Exception e) { + LOGGER.error("method=checkJmxClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); + } + } + ); + } + } +} diff --git a/km-rest/src/main/resources/application.yml b/km-rest/src/main/resources/application.yml index 291c6741..efbafef5 100644 --- a/km-rest/src/main/resources/application.yml +++ b/km-rest/src/main/resources/application.yml @@ -31,8 +31,9 @@ spring: init-sql: true init-thread-num: 20 max-thread-num: 50 - log-expire: 3 # 日志保存天数,以天为单位 + log-expire: 3 # 日志保存天数,以天为单位 app-name: know-streaming + enable: true # true表示开启job任务, false表关闭。KS在部署上可以考虑部署两套服务,一套处理前端请求,一套执行job任务,此时可以通过该字段进行控制 claim-strategy: com.didiglobal.logi.job.core.consensual.RandomConsensual logi-security: # know-streaming 依赖的 logi-security 模块的数据库的配置,默认与 know-streaming 的数据库配置保持一致即可 jdbc-url: jdbc:mariadb://127.0.0.1:3306/know_streaming?useUnicode=true&characterEncoding=utf8&jdbcCompliantTruncation=true&allowMultiQueries=true&useSSL=false&alwaysAutoGeneratedKeys=true&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/client/CheckJmxClientTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/client/CheckJmxClientTask.java deleted file mode 100644 index 2e67fcaa..00000000 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/client/CheckJmxClientTask.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.xiaojukeji.know.streaming.km.task.kafka.client; - -import com.didiglobal.logi.job.annotation.Task; -import com.didiglobal.logi.job.common.TaskResult; -import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -import com.didiglobal.logi.job.core.job.Job; -import com.didiglobal.logi.job.core.job.JobContext; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; -import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; -import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; -import org.springframework.beans.factory.annotation.Autowired; - -@Task(name = "CheckJmxClientTask", - description = "检查Jmx客户端", - cron = "0 0/1 * * * ? *", - autoRegister = true, - timeout = 2 * 60, - consensual = ConsensualEnum.BROADCAST) -public class CheckJmxClientTask implements Job { - private static final ILog log = LogFactory.getLog(CheckJmxClientTask.class); - - @Autowired - private BrokerService brokerService; - - @Autowired - private KafkaJMXClient kafkaJMXClient; - - @Override - public TaskResult execute(JobContext jobContext) { - boolean status = true; - for (ClusterPhy clusterPhy: LoadedClusterPhyCache.listAll().values()) { - if (this.checkJmxClient(clusterPhy)) { - continue; - } - - status = false; - } - - return status? TaskResult.SUCCESS: TaskResult.FAIL; - } - - private boolean checkJmxClient(ClusterPhy clusterPhy) { - try { - kafkaJMXClient.checkAndRemoveIfIllegal( - clusterPhy.getId(), - brokerService.listAliveBrokersFromDB(clusterPhy.getId()) - ); - - return true; - } catch (Exception e) { - log.error("method=checkJmxClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); - } - - return false; - } - -}