mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-02 18:32:08 +08:00
Merge branch 'dev' of github.com:kingdomrushing/LogiKM into dev_v2.6.0
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.task.component;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.NetUtils;
|
||||
@@ -29,7 +28,7 @@ import java.util.concurrent.*;
|
||||
* @date 20/8/10
|
||||
*/
|
||||
public abstract class AbstractScheduledTask<E extends Comparable> implements SchedulingConfigurer {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractScheduledTask.class);
|
||||
|
||||
@Autowired
|
||||
private HeartbeatDao heartbeatDao;
|
||||
@@ -73,18 +72,16 @@ public abstract class AbstractScheduledTask<E extends Comparable> implements Sch
|
||||
LOGGER.info("init custom scheduled finished, scheduledName:{} scheduledCron:{}.", scheduledName, scheduledCron);
|
||||
}
|
||||
|
||||
private boolean checkAndModifyCron(String scheduledName, String scheduledCron, boolean existIfIllegal) {
|
||||
private boolean checkAndModifyCron(String scheduledName, String scheduledCron, boolean isInit) {
|
||||
if (scheduledCron.matches(ScheduledTaskConstant.CRON_REG_EX)) {
|
||||
this.scheduledCron = scheduledCron;
|
||||
LOGGER.info("modify scheduledCron success, scheduledName:{} scheduledCron:{}."
|
||||
, scheduledName, scheduledCron);
|
||||
LOGGER.info("{} scheduledCron success, scheduledName:{} scheduledCron:{}.", isInit? "init": "modify", scheduledName, scheduledCron);
|
||||
return true;
|
||||
}
|
||||
|
||||
LOGGER.error("modify scheduledCron failed, format invalid, scheduledName:{} scheduledCron:{}."
|
||||
, scheduledName, scheduledCron);
|
||||
if (existIfIllegal) {
|
||||
System.exit(0);
|
||||
LOGGER.error("modify scheduledCron failed, format invalid, scheduledName:{} scheduledCron:{}.", scheduledName, scheduledCron);
|
||||
if (isInit) {
|
||||
throw new UnsupportedOperationException(String.format("scheduledName:%s scheduledCron:%s format invalid", scheduledName, scheduledCron));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@@ -130,7 +127,8 @@ public abstract class AbstractScheduledTask<E extends Comparable> implements Sch
|
||||
LOGGER.info("customScheduled task finished, empty selected task, scheduledName:{}.", scheduledName);
|
||||
return;
|
||||
}
|
||||
LOGGER.info("customScheduled task running, selected tasks, IP:{} selectedTasks:{}.",
|
||||
|
||||
LOGGER.debug("customScheduled task running, selected tasks, IP:{} selectedTasks:{}.",
|
||||
NetUtils.localIp(), JsonUtils.toJSONString(selectTasks)
|
||||
);
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.xiaojukeji.kafka.manager.task.component;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -9,11 +8,11 @@ import org.slf4j.LoggerFactory;
|
||||
* @date 20/8/10
|
||||
*/
|
||||
public class BaseBizTask<E extends Comparable> implements Runnable {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractScheduledTask.class);
|
||||
|
||||
private E task;
|
||||
private final E task;
|
||||
|
||||
private AbstractScheduledTask scheduledTask;
|
||||
private final AbstractScheduledTask scheduledTask;
|
||||
|
||||
public BaseBizTask(E task, AbstractScheduledTask scheduledTask) {
|
||||
this.task = task;
|
||||
@@ -30,6 +29,7 @@ public class BaseBizTask<E extends Comparable> implements Runnable {
|
||||
} catch (Throwable t) {
|
||||
LOGGER.error("scheduled task scheduleName:{} execute failed, task:{}", scheduledTask.getScheduledName(), task, t);
|
||||
}
|
||||
|
||||
LOGGER.info("scheduled task scheduleName:{} finished, cost-time:{}ms.", scheduledTask.getScheduledName(), System.currentTimeMillis() - startTime);
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.xiaojukeji.kafka.manager.task.component;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.NetUtils;
|
||||
import com.xiaojukeji.kafka.manager.dao.HeartbeatDao;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.HeartbeatDO;
|
||||
@@ -18,7 +17,7 @@ import java.util.Date;
|
||||
*/
|
||||
@Component
|
||||
public class Heartbeat {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(Heartbeat.class);
|
||||
|
||||
@Autowired
|
||||
private HeartbeatDao heartbeatDao;
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.task.dispatch.biz;
|
||||
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.task.config.TopicBillConfig;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
|
||||
@@ -30,7 +28,7 @@ import java.util.*;
|
||||
*/
|
||||
@CustomScheduled(name = "calKafkaBill", cron = "0 0 1 * * *", threadNum = 1)
|
||||
public class CalKafkaTopicBill extends AbstractScheduledTask<ClusterDO> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CalKafkaTopicBill.class);
|
||||
|
||||
@Autowired
|
||||
private AppService appService;
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.task.dispatch.biz;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicStatisticsDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
|
||||
@@ -30,7 +29,7 @@ import java.util.Map;
|
||||
*/
|
||||
@CustomScheduled(name = "calTopicStatistics", cron = "0 0 0/4 * * ?", threadNum = 5)
|
||||
public class CalTopicStatistics extends AbstractScheduledTask<ClusterDO> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CalTopicStatistics.class);
|
||||
|
||||
@Autowired
|
||||
private ClusterService clusterService;
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.task.dispatch.biz;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.DBStatusEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerDO;
|
||||
@@ -27,7 +26,7 @@ import java.util.*;
|
||||
*/
|
||||
@CustomScheduled(name = "flushBrokerTable", cron = "0 0 0/1 * * ?", threadNum = 1)
|
||||
public class FlushBrokerTable extends AbstractScheduledTask<ClusterDO> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FlushBrokerTable.class);
|
||||
|
||||
@Autowired
|
||||
private BrokerService brokerService;
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.task.dispatch.biz;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicExpiredDO;
|
||||
@@ -32,7 +31,7 @@ import java.util.Map;
|
||||
*/
|
||||
@CustomScheduled(name = "flushExpiredTopic", cron = "0 0 0/5 * * ?", threadNum = 1)
|
||||
public class FlushExpiredTopic extends AbstractScheduledTask<ClusterDO> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FlushExpiredTopic.class);
|
||||
|
||||
@Autowired
|
||||
private TopicExpiredDao topicExpiredDao;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.xiaojukeji.kafka.manager.task.dispatch.biz;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterTaskDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.kcm.ClusterTaskService;
|
||||
@@ -24,7 +23,7 @@ import java.util.List;
|
||||
@CustomScheduled(name = "syncClusterTaskState", cron = "0 0/1 * * * ?", threadNum = 1)
|
||||
@ConditionalOnProperty(prefix = "kcm", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class SyncClusterTaskState extends AbstractScheduledTask<EmptyEntry> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SyncClusterTaskState.class);
|
||||
|
||||
@Autowired
|
||||
private ClusterTaskService clusterTaskService;
|
||||
|
||||
@@ -0,0 +1,93 @@
|
||||
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.collect;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.events.metrics.BatchBrokerMetricsCollectedEvent;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.JmxService;
|
||||
import com.xiaojukeji.kafka.manager.service.strategy.AbstractHealthScoreStrategy;
|
||||
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
|
||||
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Broker指标信息收集
|
||||
* @author zengqiao
|
||||
* @date 20/5/7
|
||||
*/
|
||||
@CustomScheduled(name = "collectAndPublishBrokerMetrics", cron = "21 0/1 * * * ?", threadNum = 2)
|
||||
@ConditionalOnProperty(prefix = "task.metrics.collect", name = "broker-metrics-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class CollectAndPublishBrokerMetrics extends AbstractScheduledTask<ClusterDO> {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CollectAndPublishBrokerMetrics.class);
|
||||
|
||||
@Autowired
|
||||
private JmxService jmxService;
|
||||
|
||||
@Autowired
|
||||
private ClusterService clusterService;
|
||||
|
||||
@Autowired
|
||||
private AbstractHealthScoreStrategy healthScoreStrategy;
|
||||
|
||||
@Override
|
||||
protected List<ClusterDO> listAllTasks() {
|
||||
return clusterService.list();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processTask(ClusterDO clusterDO) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
SpringTool.publish(new BatchBrokerMetricsCollectedEvent(
|
||||
this,
|
||||
clusterDO.getId(),
|
||||
startTime,
|
||||
this.getBrokerMetrics(clusterDO.getId()))
|
||||
);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("collect broker-metrics failed, physicalClusterId:{}.", clusterDO.getId(), e);
|
||||
}
|
||||
|
||||
LOGGER.info("collect broker-metrics finished, physicalClusterId:{} costTime:{}", clusterDO.getId(), System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
private List<BrokerMetrics> getBrokerMetrics(Long clusterId) {
|
||||
List<BrokerMetrics> metricsList = new ArrayList<>();
|
||||
for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterId)) {
|
||||
BrokerMetrics metrics = jmxService.getBrokerMetrics(
|
||||
clusterId,
|
||||
brokerId,
|
||||
KafkaMetricsCollections.BROKER_TO_DB_METRICS
|
||||
);
|
||||
|
||||
if (ValidateUtils.isNull(metrics)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
metrics.getMetricsMap().put(
|
||||
JmxConstant.HEALTH_SCORE,
|
||||
healthScoreStrategy.calBrokerHealthScore(clusterId, brokerId, metrics)
|
||||
);
|
||||
|
||||
metricsList.add(metrics);
|
||||
}
|
||||
|
||||
if (ValidateUtils.isEmptyList(metricsList)) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
return metricsList;
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.collect;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroup;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.ConsumerMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
@@ -34,7 +33,7 @@ import java.util.concurrent.FutureTask;
|
||||
*/
|
||||
@CustomScheduled(name = "newCollectAndPublishCGData", cron = "30 0/1 * * * *", threadNum = 10)
|
||||
public class CollectAndPublishCGData extends AbstractScheduledTask<ClusterDO> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CollectAndPublishCGData.class);
|
||||
|
||||
@Autowired
|
||||
private TopicService topicService;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.delete;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.BackoffUtils;
|
||||
import com.xiaojukeji.kafka.manager.dao.*;
|
||||
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
|
||||
@@ -22,7 +21,7 @@ import java.util.List;
|
||||
*/
|
||||
@CustomScheduled(name = "deleteMetrics", cron = "0 0/2 * * * ?", threadNum = 1)
|
||||
public class DeleteMetrics extends AbstractScheduledTask<EmptyEntry> {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(DeleteMetrics.class);
|
||||
|
||||
@Autowired
|
||||
private TopicMetricsDao topicMetricsDao;
|
||||
@@ -42,25 +41,25 @@ public class DeleteMetrics extends AbstractScheduledTask<EmptyEntry> {
|
||||
@Autowired
|
||||
private TopicThrottledMetricsDao topicThrottledMetricsDao;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.delete-limit-size:1000}")
|
||||
@Value(value = "${task.metrics.delete.delete-limit-size:1000}")
|
||||
private Integer deleteLimitSize;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.cluster-metrics-save-days:14}")
|
||||
@Value(value = "${task.metrics.delete.cluster-metrics-save-days:14}")
|
||||
private Integer clusterMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.broker-metrics-save-days:14}")
|
||||
@Value(value = "${task.metrics.delete.broker-metrics-save-days:14}")
|
||||
private Integer brokerMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.topic-metrics-save-days:7}")
|
||||
@Value(value = "${task.metrics.delete.topic-metrics-save-days:7}")
|
||||
private Integer topicMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.topic-request-time-metrics-save-days:7}")
|
||||
@Value(value = "${task.metrics.delete.topic-request-time-metrics-save-days:7}")
|
||||
private Integer topicRequestTimeMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.topic-throttled-metrics-save-days:7}")
|
||||
@Value(value = "${task.metrics.delete.topic-throttled-metrics-save-days:7}")
|
||||
private Integer topicThrottledMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.app-topic-metrics-save-days:7}")
|
||||
@Value(value = "${task.metrics.delete.app-topic-metrics-save-days:7}")
|
||||
private Integer appTopicMetricsSaveDays;
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,146 +0,0 @@
|
||||
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.ClusterMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
|
||||
import com.xiaojukeji.kafka.manager.dao.BrokerMetricsDao;
|
||||
import com.xiaojukeji.kafka.manager.dao.ClusterMetricsDao;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerMetricsDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterMetricsDO;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.JmxService;
|
||||
import com.xiaojukeji.kafka.manager.service.strategy.AbstractHealthScoreStrategy;
|
||||
import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
|
||||
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
|
||||
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Broker指标信息存DB, Broker流量, 集群流量
|
||||
* @author zengqiao
|
||||
* @date 20/5/7
|
||||
*/
|
||||
@CustomScheduled(name = "storeBrokerMetrics", cron = "21 0/1 * * * ?", threadNum = 2)
|
||||
@ConditionalOnProperty(prefix = "custom.store-metrics-task.community", name = "broker-metrics-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class StoreBrokerMetrics extends AbstractScheduledTask<ClusterDO> {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
|
||||
@Autowired
|
||||
private JmxService jmxService;
|
||||
|
||||
@Autowired
|
||||
private ClusterService clusterService;
|
||||
|
||||
@Autowired
|
||||
private BrokerMetricsDao brokerMetricsDao;
|
||||
|
||||
@Autowired
|
||||
private ClusterMetricsDao clusterMetricsDao;
|
||||
|
||||
@Autowired
|
||||
private AbstractHealthScoreStrategy healthScoreStrategy;
|
||||
|
||||
@Override
|
||||
protected List<ClusterDO> listAllTasks() {
|
||||
return clusterService.list();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processTask(ClusterDO clusterDO) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
List<ClusterMetrics> clusterMetricsList = new ArrayList<>();
|
||||
|
||||
try {
|
||||
List<BrokerMetrics> brokerMetricsList = getAndBatchAddMetrics(startTime, clusterDO.getId());
|
||||
clusterMetricsList.add(supplyAndConvert2ClusterMetrics(
|
||||
clusterDO.getId(),
|
||||
MetricsConvertUtils.merge2BaseMetricsByAdd(brokerMetricsList))
|
||||
);
|
||||
} catch (Exception t) {
|
||||
LOGGER.error("collect failed, clusterId:{}.", clusterDO.getId(), t);
|
||||
}
|
||||
long endTime = System.currentTimeMillis();
|
||||
LOGGER.info("collect finish, clusterId:{} costTime:{}", clusterDO.getId(), endTime - startTime);
|
||||
|
||||
List<ClusterMetricsDO> doList = MetricsConvertUtils.convertAndUpdateCreateTime2ClusterMetricsDOList(
|
||||
startTime,
|
||||
clusterMetricsList
|
||||
);
|
||||
|
||||
if (ValidateUtils.isEmptyList(doList)) {
|
||||
return;
|
||||
}
|
||||
|
||||
clusterMetricsDao.batchAdd(doList);
|
||||
}
|
||||
|
||||
private List<BrokerMetrics> getAndBatchAddMetrics(Long startTime, Long clusterId) {
|
||||
List<BrokerMetrics> metricsList = new ArrayList<>();
|
||||
for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterId)) {
|
||||
BrokerMetrics metrics = jmxService.getBrokerMetrics(
|
||||
clusterId,
|
||||
brokerId,
|
||||
KafkaMetricsCollections.BROKER_TO_DB_METRICS
|
||||
);
|
||||
if (ValidateUtils.isNull(metrics)) {
|
||||
continue;
|
||||
}
|
||||
metrics.getMetricsMap().put(
|
||||
JmxConstant.HEALTH_SCORE,
|
||||
healthScoreStrategy.calBrokerHealthScore(clusterId, brokerId, metrics)
|
||||
);
|
||||
metricsList.add(metrics);
|
||||
}
|
||||
if (ValidateUtils.isEmptyList(metricsList)) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
List<BrokerMetricsDO> doList =
|
||||
MetricsConvertUtils.convertAndUpdateCreateTime2BrokerMetricsDOList(startTime, metricsList);
|
||||
int i = 0;
|
||||
do {
|
||||
List<BrokerMetricsDO> subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
|
||||
if (ValidateUtils.isEmptyList(subDOList)) {
|
||||
break;
|
||||
}
|
||||
|
||||
brokerMetricsDao.batchAdd(subDOList);
|
||||
i += Constant.BATCH_INSERT_SIZE;
|
||||
} while (i < doList.size());
|
||||
|
||||
return metricsList;
|
||||
}
|
||||
|
||||
private ClusterMetrics supplyAndConvert2ClusterMetrics(Long clusterId, BaseMetrics baseMetrics) {
|
||||
ClusterMetrics metrics = new ClusterMetrics(clusterId);
|
||||
Map<String, Object> metricsMap = metrics.getMetricsMap();
|
||||
metricsMap.putAll(baseMetrics.getMetricsMap());
|
||||
metricsMap.put(JmxConstant.TOPIC_NUM, PhysicalClusterMetadataManager.getTopicNameList(clusterId).size());
|
||||
metricsMap.put(JmxConstant.BROKER_NUM, PhysicalClusterMetadataManager.getBrokerIdList(clusterId).size());
|
||||
Integer partitionNum = 0;
|
||||
for (String topicName : PhysicalClusterMetadataManager.getTopicNameList(clusterId)) {
|
||||
TopicMetadata topicMetaData = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName);
|
||||
if (ValidateUtils.isNull(topicMetaData)) {
|
||||
continue;
|
||||
}
|
||||
partitionNum += topicMetaData.getPartitionNum();
|
||||
}
|
||||
metricsMap.put(JmxConstant.PARTITION_NUM, partitionNum);
|
||||
return metrics;
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.dao.TopicAppMetricsDao;
|
||||
@@ -28,7 +27,7 @@ import java.util.*;
|
||||
@CustomScheduled(name = "storeDiDiAppTopicMetrics", cron = "41 0/1 * * * ?", threadNum = 5)
|
||||
@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "app-topic-metrics-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask<ClusterDO> {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(StoreDiDiAppTopicMetrics.class);
|
||||
|
||||
@Autowired
|
||||
private JmxService jmxService;
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao;
|
||||
@@ -28,7 +27,7 @@ import java.util.*;
|
||||
@CustomScheduled(name = "storeDiDiTopicRequestTimeMetrics", cron = "51 0/1 * * * ?", threadNum = 5)
|
||||
@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-request-time-metrics-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class StoreDiDiTopicRequestTimeMetrics extends AbstractScheduledTask<ClusterDO> {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(StoreDiDiTopicRequestTimeMetrics.class);
|
||||
|
||||
@Autowired
|
||||
private JmxService jmxService;
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.xiaojukeji.kafka.manager.bpm.OrderService;
|
||||
import com.xiaojukeji.kafka.manager.bpm.common.OrderStatusEnum;
|
||||
import com.xiaojukeji.kafka.manager.bpm.common.OrderTypeEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.SystemCodeConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
@@ -39,7 +38,7 @@ import java.util.Properties;
|
||||
@CustomScheduled(name = "autoHandleTopicOrder", cron = "0 0/1 * * * ?", threadNum = 1)
|
||||
@ConditionalOnProperty(prefix = "task.op.order-auto-exec", name = "topic-enabled", havingValue = "true", matchIfMissing = false)
|
||||
public class AutoHandleTopicOrder extends AbstractScheduledTask<EmptyEntry> {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AutoHandleTopicOrder.class);
|
||||
|
||||
@Autowired
|
||||
private ConfigService configService;
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.xiaojukeji.kafka.manager.bpm.OrderService;
|
||||
import com.xiaojukeji.kafka.manager.bpm.common.OrderStatusEnum;
|
||||
import com.xiaojukeji.kafka.manager.bpm.common.OrderTypeEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.bpm.common.handle.OrderHandleBaseDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
|
||||
@@ -34,7 +33,7 @@ import java.util.*;
|
||||
@CustomScheduled(name = "automatedHandleOrder", cron = "0 0/1 * * * ?", threadNum = 1)
|
||||
@ConditionalOnProperty(prefix = "task.op.order-auto-exec", name = "app-enabled", havingValue = "true", matchIfMissing = false)
|
||||
public class AutomatedHandleOrder extends AbstractScheduledTask<EmptyEntry> {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AutomatedHandleOrder.class);
|
||||
|
||||
@Autowired
|
||||
private OrderService orderService;
|
||||
|
||||
@@ -3,7 +3,6 @@ package com.xiaojukeji.kafka.manager.task.dispatch.op;
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusReassignEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
@@ -36,7 +35,7 @@ import java.util.*;
|
||||
@Component
|
||||
@CustomScheduled(name = "flushReassignment", cron = "0 0/1 * * * ?", threadNum = 1)
|
||||
public class FlushReassignment extends AbstractScheduledTask<EmptyEntry> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FlushReassignment.class);
|
||||
|
||||
@Autowired
|
||||
private ClusterService clusterService;
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.task.dispatch.op;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
|
||||
@@ -39,7 +38,7 @@ import java.util.stream.Collectors;
|
||||
@CustomScheduled(name = "syncTopic2DB", cron = "0 0/2 * * * ?", threadNum = 1)
|
||||
@ConditionalOnProperty(prefix = "task.op", name = "sync-topic-enabled", havingValue = "true", matchIfMissing = false)
|
||||
public class SyncTopic2DB extends AbstractScheduledTask<EmptyEntry> {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SyncTopic2DB.class);
|
||||
|
||||
private static final String SYNC_TOPIC_2_DB_CONFIG_KEY = "SYNC_TOPIC_2_DB_CONFIG_KEY";
|
||||
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
package com.xiaojukeji.kafka.manager.task.listener.sink.db;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerMetricsDO;
|
||||
import com.xiaojukeji.kafka.manager.common.events.metrics.BatchBrokerMetricsCollectedEvent;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.dao.BrokerMetricsDao;
|
||||
import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 22/01/17
|
||||
*/
|
||||
@Component
|
||||
@ConditionalOnProperty(prefix = "task.metrics.sink.broker-metrics", name = "sink-db-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class SinkBrokerMetrics2DB implements ApplicationListener<BatchBrokerMetricsCollectedEvent> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(SinkBrokerMetrics2DB.class);
|
||||
|
||||
@Autowired
|
||||
private BrokerMetricsDao metricsDao;
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(BatchBrokerMetricsCollectedEvent event) {
|
||||
logger.debug("sink broker-metrics to db start, event:{}.", event);
|
||||
|
||||
List<BrokerMetrics> metricsList = event.getMetricsList();
|
||||
if (ValidateUtils.isEmptyList(metricsList)) {
|
||||
logger.warn("sink broker-metrics to db finished, without need sink, event:{}.", event);
|
||||
return;
|
||||
}
|
||||
|
||||
List<BrokerMetricsDO> doList = MetricsConvertUtils.convertAndUpdateCreateTime2BrokerMetricsDOList(event.getCollectTime(), metricsList);
|
||||
int i = 0;
|
||||
while (i < doList.size()) {
|
||||
List<BrokerMetricsDO> subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
|
||||
if (ValidateUtils.isEmptyList(subDOList)) {
|
||||
break;
|
||||
}
|
||||
|
||||
metricsDao.batchAdd(subDOList);
|
||||
i += Constant.BATCH_INSERT_SIZE;
|
||||
}
|
||||
|
||||
logger.debug("sink broker-metrics to db finished, event:{}.", event);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
package com.xiaojukeji.kafka.manager.task.listener.sink.db;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.ClusterMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterMetricsDO;
|
||||
import com.xiaojukeji.kafka.manager.common.events.metrics.BatchBrokerMetricsCollectedEvent;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
|
||||
import com.xiaojukeji.kafka.manager.dao.ClusterMetricsDao;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 22/01/17
|
||||
*/
|
||||
@Component
|
||||
@ConditionalOnProperty(prefix = "task.metrics.sink.cluster-metrics", name = "sink-db-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class SinkClusterMetrics2DB implements ApplicationListener<BatchBrokerMetricsCollectedEvent> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(SinkClusterMetrics2DB.class);
|
||||
|
||||
@Autowired
|
||||
private ClusterMetricsDao clusterMetricsDao;
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(BatchBrokerMetricsCollectedEvent event) {
|
||||
logger.debug("sink cluster-metrics to db start, event:{}.", event);
|
||||
|
||||
List<BrokerMetrics> metricsList = event.getMetricsList();
|
||||
if (ValidateUtils.isEmptyList(metricsList)) {
|
||||
logger.warn("sink cluster-metrics to db finished, without need sink, event:{}.", event);
|
||||
return;
|
||||
}
|
||||
|
||||
List<ClusterMetricsDO> doList = MetricsConvertUtils.convertAndUpdateCreateTime2ClusterMetricsDOList(
|
||||
event.getCollectTime(),
|
||||
// 合并broker-metrics为cluster-metrics
|
||||
Arrays.asList(supplyAndConvert2ClusterMetrics(event.getPhysicalClusterId(), MetricsConvertUtils.merge2BaseMetricsByAdd(event.getMetricsList())))
|
||||
);
|
||||
|
||||
if (ValidateUtils.isEmptyList(doList)) {
|
||||
logger.warn("sink cluster-metrics to db finished, without need sink, event:{}.", event);
|
||||
return;
|
||||
}
|
||||
|
||||
clusterMetricsDao.batchAdd(doList);
|
||||
|
||||
logger.debug("sink cluster-metrics to db finished, event:{}.", event);
|
||||
}
|
||||
|
||||
private ClusterMetrics supplyAndConvert2ClusterMetrics(Long clusterId, BaseMetrics baseMetrics) {
|
||||
ClusterMetrics metrics = new ClusterMetrics(clusterId);
|
||||
Map<String, Object> metricsMap = metrics.getMetricsMap();
|
||||
metricsMap.putAll(baseMetrics.getMetricsMap());
|
||||
metricsMap.put(JmxConstant.TOPIC_NUM, PhysicalClusterMetadataManager.getTopicNameList(clusterId).size());
|
||||
metricsMap.put(JmxConstant.BROKER_NUM, PhysicalClusterMetadataManager.getBrokerIdList(clusterId).size());
|
||||
Integer partitionNum = 0;
|
||||
for (String topicName : PhysicalClusterMetadataManager.getTopicNameList(clusterId)) {
|
||||
TopicMetadata topicMetaData = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName);
|
||||
if (ValidateUtils.isNull(topicMetaData)) {
|
||||
continue;
|
||||
}
|
||||
partitionNum += topicMetaData.getPartitionNum();
|
||||
}
|
||||
metricsMap.put(JmxConstant.PARTITION_NUM, partitionNum);
|
||||
return metrics;
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.task.listener;
|
||||
package com.xiaojukeji.kafka.manager.task.listener.sink.db;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO;
|
||||
import com.xiaojukeji.kafka.manager.common.events.TopicMetricsCollectedEvent;
|
||||
@@ -25,7 +24,7 @@ import java.util.List;
|
||||
@Component("storeCommunityTopicMetrics2DB")
|
||||
@ConditionalOnProperty(prefix = "custom.store-metrics-task.community", name = "topic-metrics-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class StoreCommunityTopicMetrics2DB implements ApplicationListener<TopicMetricsCollectedEvent> {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(StoreCommunityTopicMetrics2DB.class);
|
||||
|
||||
@Autowired
|
||||
private TopicMetricsDao topicMetricsDao;
|
||||
@@ -1,8 +1,7 @@
|
||||
package com.xiaojukeji.kafka.manager.task.listener;
|
||||
package com.xiaojukeji.kafka.manager.task.listener.sink.db;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicThrottledMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicThrottledMetricsDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
@@ -24,7 +23,7 @@ import java.util.*;
|
||||
@Component("storeTopicThrottledMetrics2DB")
|
||||
@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-throttled-metrics-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class StoreTopicThrottledMetrics2DB implements ApplicationListener<TopicThrottledMetricsCollectedEvent> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(StoreTopicThrottledMetrics2DB.class);
|
||||
|
||||
@Autowired
|
||||
private ThrottleService throttleService;
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.task.listener;
|
||||
package com.xiaojukeji.kafka.manager.task.listener.sink.kafka;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.config.TopicNameConfig;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.remote.KafkaTopicMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
|
||||
@@ -27,7 +26,7 @@ import java.util.List;
|
||||
*/
|
||||
@Component("sinkCommunityTopicMetrics2Kafka")
|
||||
public class SinkCommunityTopicMetrics2Kafka implements ApplicationListener<TopicMetricsCollectedEvent> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SinkCommunityTopicMetrics2Kafka.class);
|
||||
|
||||
@Autowired
|
||||
private ConfigService configService;
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.task.listener;
|
||||
package com.xiaojukeji.kafka.manager.task.listener.sink.kafka;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.config.TopicNameConfig;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.remote.KafkaConsumerMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.remote.KafkaConsumerMetricsElem;
|
||||
@@ -27,7 +26,7 @@ import java.util.Map;
|
||||
*/
|
||||
@Component("produceConsumerMetrics")
|
||||
public class SinkConsumerMetrics2Kafka implements ApplicationListener<ConsumerMetricsCollectedEvent> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SinkConsumerMetrics2Kafka.class);
|
||||
|
||||
@Autowired
|
||||
private ConfigService configService;
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.task.listener;
|
||||
package com.xiaojukeji.kafka.manager.task.listener.sink.monitor;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.monitor.common.MonitorSinkConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
|
||||
import com.xiaojukeji.kafka.manager.monitor.common.entry.sink.MonitorTopicSinkTag;
|
||||
@@ -33,7 +32,7 @@ import java.util.List;
|
||||
@ConditionalOnProperty(prefix = "monitor", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
@CustomScheduled(name = "sinkCommunityTopicMetrics2Monitor", cron = "1 0/1 * * * ?", threadNum = 5)
|
||||
public class SinkCommunityTopicMetrics2Monitor extends AbstractScheduledTask<ClusterDO> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SinkCommunityTopicMetrics2Monitor.class);
|
||||
|
||||
@Autowired
|
||||
private AbstractMonitorService abstractMonitor;
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.task.listener;
|
||||
package com.xiaojukeji.kafka.manager.task.listener.sink.monitor;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.monitor.common.MonitorSinkConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.ConsumerMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
|
||||
@@ -32,7 +31,7 @@ import java.util.*;
|
||||
@Component("sinkConsumerMetrics2Monitor")
|
||||
@ConditionalOnProperty(prefix = "monitor", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class SinkConsumerMetrics2Monitor implements ApplicationListener<ConsumerMetricsCollectedEvent> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SinkConsumerMetrics2Monitor.class);
|
||||
|
||||
@Autowired
|
||||
private LogicalClusterMetadataManager logicalClusterMetadataManager;
|
||||
@@ -1,4 +1,4 @@
|
||||
package com.xiaojukeji.kafka.manager.task.listener;
|
||||
package com.xiaojukeji.kafka.manager.task.listener.sink.monitor;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
|
||||
import com.xiaojukeji.kafka.manager.monitor.common.MonitorSinkConstant;
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.task.schedule;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
@@ -22,7 +21,7 @@ import java.util.*;
|
||||
*/
|
||||
@Component
|
||||
public class FlushTopicMetrics {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FlushTopicMetrics.class);
|
||||
|
||||
@Autowired
|
||||
private JmxService jmxService;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.xiaojukeji.kafka.manager.task.schedule.metadata;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ConsumerMetadata;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
@@ -25,7 +24,7 @@ import java.util.*;
|
||||
*/
|
||||
@Component
|
||||
public class FlushBKConsumerGroupMetadata {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FlushBKConsumerGroupMetadata.class);
|
||||
|
||||
@Autowired
|
||||
private ClusterService clusterService;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.xiaojukeji.kafka.manager.task.schedule.metadata;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
|
||||
import com.xiaojukeji.kafka.manager.service.utils.KafkaZookeeperUtils;
|
||||
@@ -22,7 +21,7 @@ import java.util.Properties;
|
||||
*/
|
||||
@Component
|
||||
public class FlushTopicProperties {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FlushTopicProperties.class);
|
||||
|
||||
@Autowired
|
||||
private ClusterService clusterService;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.xiaojukeji.kafka.manager.task.schedule.metadata;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ConsumerMetadata;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
|
||||
@@ -27,7 +26,7 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
@Component
|
||||
public class FlushZKConsumerGroupMetadata {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FlushZKConsumerGroupMetadata.class);
|
||||
|
||||
@Autowired
|
||||
private ClusterService clusterService;
|
||||
|
||||
Reference in New Issue
Block a user