mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-05 13:08:48 +08:00
[Feature]支持拆分API服务和Job服务部署(#829)
1、JMX检查功能是每一个KS都必须要有的,因此从Task模块移动到Core模块; 2、application.yml中补充Task模块任务的整体开关字段;
This commit is contained in:
@@ -0,0 +1,44 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.core.flusher;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
|
||||||
|
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||||
|
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
|
||||||
|
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* JMX连接检查
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
public class JmxClientLegalFlusher {
|
||||||
|
private static final ILog LOGGER = LogFactory.getLog(JmxClientLegalFlusher.class);
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private BrokerService brokerService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private KafkaJMXClient kafkaJMXClient;
|
||||||
|
|
||||||
|
@Scheduled(cron="0 0/1 * * * ?")
|
||||||
|
public void checkJmxClient() {
|
||||||
|
for (ClusterPhy clusterPhy: LoadedClusterPhyCache.listAll().values()) {
|
||||||
|
FutureUtil.quickStartupFutureUtil.submitTask(
|
||||||
|
() -> {
|
||||||
|
try {
|
||||||
|
kafkaJMXClient.checkAndRemoveIfIllegal(
|
||||||
|
clusterPhy.getId(),
|
||||||
|
brokerService.listAliveBrokersFromDB(clusterPhy.getId())
|
||||||
|
);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("method=checkJmxClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -31,8 +31,9 @@ spring:
|
|||||||
init-sql: true
|
init-sql: true
|
||||||
init-thread-num: 20
|
init-thread-num: 20
|
||||||
max-thread-num: 50
|
max-thread-num: 50
|
||||||
log-expire: 3 # 日志保存天数,以天为单位
|
log-expire: 3 # 日志保存天数,以天为单位
|
||||||
app-name: know-streaming
|
app-name: know-streaming
|
||||||
|
enable: true # true表示开启job任务, false表关闭。KS在部署上可以考虑部署两套服务,一套处理前端请求,一套执行job任务,此时可以通过该字段进行控制
|
||||||
claim-strategy: com.didiglobal.logi.job.core.consensual.RandomConsensual
|
claim-strategy: com.didiglobal.logi.job.core.consensual.RandomConsensual
|
||||||
logi-security: # know-streaming 依赖的 logi-security 模块的数据库的配置,默认与 know-streaming 的数据库配置保持一致即可
|
logi-security: # know-streaming 依赖的 logi-security 模块的数据库的配置,默认与 know-streaming 的数据库配置保持一致即可
|
||||||
jdbc-url: jdbc:mariadb://127.0.0.1:3306/know_streaming?useUnicode=true&characterEncoding=utf8&jdbcCompliantTruncation=true&allowMultiQueries=true&useSSL=false&alwaysAutoGeneratedKeys=true&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
|
jdbc-url: jdbc:mariadb://127.0.0.1:3306/know_streaming?useUnicode=true&characterEncoding=utf8&jdbcCompliantTruncation=true&allowMultiQueries=true&useSSL=false&alwaysAutoGeneratedKeys=true&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
|
||||||
|
|||||||
@@ -1,60 +0,0 @@
|
|||||||
package com.xiaojukeji.know.streaming.km.task.kafka.client;
|
|
||||||
|
|
||||||
import com.didiglobal.logi.job.annotation.Task;
|
|
||||||
import com.didiglobal.logi.job.common.TaskResult;
|
|
||||||
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
|
||||||
import com.didiglobal.logi.job.core.job.Job;
|
|
||||||
import com.didiglobal.logi.job.core.job.JobContext;
|
|
||||||
import com.didiglobal.logi.log.ILog;
|
|
||||||
import com.didiglobal.logi.log.LogFactory;
|
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
|
||||||
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
|
|
||||||
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
|
|
||||||
@Task(name = "CheckJmxClientTask",
|
|
||||||
description = "检查Jmx客户端",
|
|
||||||
cron = "0 0/1 * * * ? *",
|
|
||||||
autoRegister = true,
|
|
||||||
timeout = 2 * 60,
|
|
||||||
consensual = ConsensualEnum.BROADCAST)
|
|
||||||
public class CheckJmxClientTask implements Job {
|
|
||||||
private static final ILog log = LogFactory.getLog(CheckJmxClientTask.class);
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private BrokerService brokerService;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private KafkaJMXClient kafkaJMXClient;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TaskResult execute(JobContext jobContext) {
|
|
||||||
boolean status = true;
|
|
||||||
for (ClusterPhy clusterPhy: LoadedClusterPhyCache.listAll().values()) {
|
|
||||||
if (this.checkJmxClient(clusterPhy)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
status = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return status? TaskResult.SUCCESS: TaskResult.FAIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean checkJmxClient(ClusterPhy clusterPhy) {
|
|
||||||
try {
|
|
||||||
kafkaJMXClient.checkAndRemoveIfIllegal(
|
|
||||||
clusterPhy.getId(),
|
|
||||||
brokerService.listAliveBrokersFromDB(clusterPhy.getId())
|
|
||||||
);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("method=checkJmxClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user