diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/AbstractMetricCollector.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/AbstractMetricCollector.java index 7d0e85f6..7b6bce9a 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/AbstractMetricCollector.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/AbstractMetricCollector.java @@ -1,25 +1,53 @@ package com.xiaojukeji.know.streaming.km.collector.metric; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.collector.service.CollectThreadPoolService; +import com.xiaojukeji.know.streaming.km.common.utils.LoggerUtil; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BaseMetricEvent; import com.xiaojukeji.know.streaming.km.common.component.SpringTool; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import org.springframework.beans.factory.annotation.Autowired; +import java.util.List; + /** * @author didi */ public abstract class AbstractMetricCollector { - public abstract void collectMetrics(ClusterPhy clusterPhy); + protected static final ILog LOGGER = LogFactory.getLog(AbstractMetricCollector.class); + + protected static final ILog METRIC_COLLECTED_LOGGER = LoggerUtil.getMetricCollectedLogger(); + + public abstract List collectKafkaMetrics(ClusterPhy clusterPhy); public abstract VersionItemTypeEnum collectorType(); @Autowired private CollectThreadPoolService collectThreadPoolService; + public void collectMetrics(ClusterPhy clusterPhy) { + long startTime = System.currentTimeMillis(); + + // 采集指标 + List metricsList = this.collectKafkaMetrics(clusterPhy); + + // 输出耗时信息 + LOGGER.info( + "metricType={}||clusterPhyId={}||costTimeUnitMs={}", + this.collectorType().getMessage(), clusterPhy.getId(), System.currentTimeMillis() - startTime + ); + + // 输出采集到的指标信息 + METRIC_COLLECTED_LOGGER.debug("metricType={}||clusterPhyId={}||metrics={}!", + this.collectorType().getMessage(), clusterPhy.getId(), ConvertUtil.obj2Json(metricsList) + ); + } + protected FutureWaitUtil getFutureUtilByClusterPhyId(Long clusterPhyId) { return collectThreadPoolService.selectSuitableFutureUtil(clusterPhyId * 1000L + this.collectorType().getCode()); } diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/BrokerMetricCollector.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/BrokerMetricCollector.java index 18f36192..1753a875 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/BrokerMetricCollector.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/BrokerMetricCollector.java @@ -1,6 +1,5 @@ package com.xiaojukeji.know.streaming.km.collector.metric.kafka; -import com.alibaba.fastjson.JSON; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.collector.metric.AbstractMetricCollector; @@ -12,7 +11,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionContro import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BrokerMetricEvent; import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; -import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; @@ -30,7 +28,7 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT */ @Component public class BrokerMetricCollector extends AbstractMetricCollector { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + private static final ILog LOGGER = LogFactory.getLog(BrokerMetricCollector.class); @Autowired private VersionControlService versionControlService; @@ -42,8 +40,7 @@ public class BrokerMetricCollector extends AbstractMetricCollector collectKafkaMetrics(ClusterPhy clusterPhy) { Long clusterPhyId = clusterPhy.getId(); List brokers = brokerService.listAliveBrokersFromDB(clusterPhy.getId()); @@ -51,23 +48,23 @@ public class BrokerMetricCollector extends AbstractMetricCollector future = this.getFutureUtilByClusterPhyId(clusterPhyId); - List brokerMetrics = new ArrayList<>(); + List metricsList = new ArrayList<>(); for(Broker broker : brokers) { BrokerMetrics metrics = new BrokerMetrics(clusterPhyId, broker.getBrokerId(), broker.getHost(), broker.getPort()); - brokerMetrics.add(metrics); + metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME); + metricsList.add(metrics); future.runnableTask( - String.format("method=BrokerMetricCollector||clusterPhyId=%d||brokerId=%d", clusterPhyId, broker.getBrokerId()), + String.format("class=BrokerMetricCollector||clusterPhyId=%d||brokerId=%d", clusterPhyId, broker.getBrokerId()), 30000, () -> collectMetrics(clusterPhyId, metrics, items) ); } future.waitExecute(30000); - this.publishMetric(new BrokerMetricEvent(this, brokerMetrics)); + this.publishMetric(new BrokerMetricEvent(this, metricsList)); - LOGGER.info("method=BrokerMetricCollector||clusterPhyId={}||startTime={}||costTime={}||msg=collect finished.", - clusterPhyId, startTime, System.currentTimeMillis() - startTime); + return metricsList; } @Override @@ -79,7 +76,6 @@ public class BrokerMetricCollector extends AbstractMetricCollector items) { long startTime = System.currentTimeMillis(); - metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME); for(VersionControlItem v : items) { try { @@ -93,14 +89,11 @@ public class BrokerMetricCollector extends AbstractMetricCollector { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); +public class ClusterMetricCollector extends AbstractMetricCollector { + protected static final ILog LOGGER = LogFactory.getLog(ClusterMetricCollector.class); @Autowired private VersionControlService versionControlService; @@ -38,35 +35,37 @@ public class ClusterMetricCollector extends AbstractMetricCollector collectKafkaMetrics(ClusterPhy clusterPhy) { Long startTime = System.currentTimeMillis(); Long clusterPhyId = clusterPhy.getId(); List items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); ClusterMetrics metrics = new ClusterMetrics(clusterPhyId, clusterPhy.getKafkaVersion()); + metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME); FutureWaitUtil future = this.getFutureUtilByClusterPhyId(clusterPhyId); for(VersionControlItem v : items) { future.runnableTask( - String.format("method=ClusterMetricCollector||clusterPhyId=%d||metricName=%s", clusterPhyId, v.getName()), + String.format("class=ClusterMetricCollector||clusterPhyId=%d", clusterPhyId), 30000, () -> { try { - if(null != metrics.getMetrics().get(v.getName())){return null;} + if(null != metrics.getMetrics().get(v.getName())){ + return null; + } Result ret = clusterMetricService.collectClusterMetricsFromKafka(clusterPhyId, v.getName()); - if(null == ret || ret.failed() || null == ret.getData()){return null;} + if(null == ret || ret.failed() || null == ret.getData()){ + return null; + } metrics.putMetric(ret.getData().getMetrics()); - - if(!EnvUtil.isOnline()){ - LOGGER.info("method=ClusterMetricCollector||clusterPhyId={}||metricName={}||metricValue={}", - clusterPhyId, v.getName(), ConvertUtil.obj2Json(ret.getData().getMetrics())); - } } catch (Exception e){ - LOGGER.error("method=ClusterMetricCollector||clusterPhyId={}||metricName={}||errMsg=exception!", - clusterPhyId, v.getName(), e); + LOGGER.error( + "method=collectKafkaMetrics||clusterPhyId={}||metricName={}||errMsg=exception!", + clusterPhyId, v.getName(), e + ); } return null; @@ -77,10 +76,9 @@ public class ClusterMetricCollector extends AbstractMetricCollector> { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); +public class GroupMetricCollector extends AbstractMetricCollector { + protected static final ILog LOGGER = LogFactory.getLog(GroupMetricCollector.class); @Autowired private VersionControlService versionControlService; @@ -46,40 +41,38 @@ public class GroupMetricCollector extends AbstractMetricCollector collectKafkaMetrics(ClusterPhy clusterPhy) { Long clusterPhyId = clusterPhy.getId(); - List groups = new ArrayList<>(); + List groupNameList = new ArrayList<>(); try { - groups = groupService.listGroupsFromKafka(clusterPhyId); + groupNameList = groupService.listGroupsFromKafka(clusterPhyId); } catch (Exception e) { - LOGGER.error("method=GroupMetricCollector||clusterPhyId={}||msg=exception!", clusterPhyId, e); + LOGGER.error("method=collectKafkaMetrics||clusterPhyId={}||msg=exception!", clusterPhyId, e); } - if(CollectionUtils.isEmpty(groups)){return;} + if(ValidateUtils.isEmptyList(groupNameList)) { + return Collections.emptyList(); + } List items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); FutureWaitUtil future = this.getFutureUtilByClusterPhyId(clusterPhyId); Map> metricsMap = new ConcurrentHashMap<>(); - for(String groupName : groups) { + for(String groupName : groupNameList) { future.runnableTask( - String.format("method=GroupMetricCollector||clusterPhyId=%d||groupName=%s", clusterPhyId, groupName), + String.format("class=GroupMetricCollector||clusterPhyId=%d||groupName=%s", clusterPhyId, groupName), 30000, () -> collectMetrics(clusterPhyId, groupName, metricsMap, items)); } future.waitResult(30000); - List metricsList = new ArrayList<>(); - metricsMap.values().forEach(elem -> metricsList.addAll(elem)); + List metricsList = metricsMap.values().stream().collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll); publishMetric(new GroupMetricEvent(this, metricsList)); - - LOGGER.info("method=GroupMetricCollector||clusterPhyId={}||startTime={}||cost={}||msg=collect finished.", - clusterPhyId, startTime, System.currentTimeMillis() - startTime); + return metricsList; } @Override @@ -92,9 +85,7 @@ public class GroupMetricCollector extends AbstractMetricCollector> metricsMap, List items) { long startTime = System.currentTimeMillis(); - List groupMetricsList = new ArrayList<>(); - - Map tpGroupPOMap = new HashMap<>(); + Map subMetricMap = new HashMap<>(); GroupMetrics groupMetrics = new GroupMetrics(clusterPhyId, groupName, true); groupMetrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME); @@ -108,38 +99,31 @@ public class GroupMetricCollector extends AbstractMetricCollector { + ret.getData().forEach(metrics -> { if (metrics.isBGroupMetric()) { groupMetrics.putMetric(metrics.getMetrics()); - } else { - String topicName = metrics.getTopic(); - Integer partitionId = metrics.getPartitionId(); - String tpGroupKey = genTopicPartitionGroupKey(topicName, partitionId); - - tpGroupPOMap.putIfAbsent(tpGroupKey, new GroupMetrics(clusterPhyId, partitionId, topicName, groupName, false)); - tpGroupPOMap.get(tpGroupKey).putMetric(metrics.getMetrics()); + return; } - }); - if(!EnvUtil.isOnline()){ - LOGGER.info("method=GroupMetricCollector||clusterPhyId={}||groupName={}||metricName={}||metricValue={}", - clusterPhyId, groupName, metricName, JSON.toJSONString(ret.getData())); - } - }catch (Exception e){ - LOGGER.error("method=GroupMetricCollector||clusterPhyId={}||groupName={}||errMsg=exception!", clusterPhyId, groupName, e); + TopicPartition tp = new TopicPartition(metrics.getTopic(), metrics.getPartitionId()); + subMetricMap.putIfAbsent(tp, new GroupMetrics(clusterPhyId, metrics.getPartitionId(), metrics.getTopic(), groupName, false)); + subMetricMap.get(tp).putMetric(metrics.getMetrics()); + }); + } catch (Exception e) { + LOGGER.error( + "method=collectMetrics||clusterPhyId={}||groupName={}||errMsg=exception!", + clusterPhyId, groupName, e + ); } } - groupMetricsList.add(groupMetrics); - groupMetricsList.addAll(tpGroupPOMap.values()); + List metricsList = new ArrayList<>(); + metricsList.add(groupMetrics); + metricsList.addAll(subMetricMap.values()); // 记录采集性能 groupMetrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f); - metricsMap.put(groupName, groupMetricsList); - } - - private String genTopicPartitionGroupKey(String topic, Integer partitionId){ - return topic + "@" + partitionId; + metricsMap.put(groupName, metricsList); } } diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/PartitionMetricCollector.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/PartitionMetricCollector.java index 0b5debfa..fbb710b9 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/PartitionMetricCollector.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/PartitionMetricCollector.java @@ -10,8 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem; import com.xiaojukeji.know.streaming.km.common.bean.event.metric.PartitionMetricEvent; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; -import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; -import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; @@ -29,7 +27,7 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT */ @Component public class PartitionMetricCollector extends AbstractMetricCollector { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + protected static final ILog LOGGER = LogFactory.getLog(PartitionMetricCollector.class); @Autowired private VersionControlService versionControlService; @@ -41,14 +39,11 @@ public class PartitionMetricCollector extends AbstractMetricCollector collectKafkaMetrics(ClusterPhy clusterPhy) { Long clusterPhyId = clusterPhy.getId(); List topicList = topicService.listTopicsFromCacheFirst(clusterPhyId); List items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); - // 获取集群所有分区 - FutureWaitUtil future = this.getFutureUtilByClusterPhyId(clusterPhyId); Map> metricsMap = new ConcurrentHashMap<>(); @@ -56,9 +51,9 @@ public class PartitionMetricCollector extends AbstractMetricCollector()); future.runnableTask( - String.format("method=PartitionMetricCollector||clusterPhyId=%d||topicName=%s", clusterPhyId, topic.getTopicName()), + String.format("class=PartitionMetricCollector||clusterPhyId=%d||topicName=%s", clusterPhyId, topic.getTopicName()), 30000, - () -> collectMetrics(clusterPhyId, topic.getTopicName(), metricsMap.get(topic.getTopicName()), items) + () -> this.collectMetrics(clusterPhyId, topic.getTopicName(), metricsMap.get(topic.getTopicName()), items) ); } @@ -69,10 +64,7 @@ public class PartitionMetricCollector extends AbstractMetricCollector { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + protected static final ILog LOGGER = LogFactory.getLog(ReplicaMetricCollector.class); @Autowired private VersionControlService versionControlService; @@ -42,12 +40,10 @@ public class ReplicaMetricCollector extends AbstractMetricCollector collectKafkaMetrics(ClusterPhy clusterPhy) { Long clusterPhyId = clusterPhy.getId(); List items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); - - List partitions = partitionService.listPartitionByCluster(clusterPhyId); + List partitions = partitionService.listPartitionFromCacheFirst(clusterPhyId); FutureWaitUtil future = this.getFutureUtilByClusterPhyId(clusterPhyId); @@ -55,10 +51,11 @@ public class ReplicaMetricCollector extends AbstractMetricCollector collectMetrics(clusterPhyId, metrics, items) @@ -70,8 +67,7 @@ public class ReplicaMetricCollector extends AbstractMetricCollector items) { long startTime = System.currentTimeMillis(); - metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME); - for(VersionControlItem v : items) { try { if (metrics.getMetrics().containsKey(v.getName())) { @@ -105,15 +99,11 @@ public class ReplicaMetricCollector extends AbstractMetricCollector> { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); +public class TopicMetricCollector extends AbstractMetricCollector { + protected static final ILog LOGGER = LogFactory.getLog(TopicMetricCollector.class); @Autowired private VersionControlService versionControlService; @@ -47,8 +45,7 @@ public class TopicMetricCollector extends AbstractMetricCollector collectKafkaMetrics(ClusterPhy clusterPhy) { Long clusterPhyId = clusterPhy.getId(); List topics = topicService.listTopicsFromCacheFirst(clusterPhyId); List items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); @@ -65,7 +62,7 @@ public class TopicMetricCollector extends AbstractMetricCollector collectMetrics(clusterPhyId, topic.getTopicName(), metricsMap, items) ); @@ -78,8 +75,7 @@ public class TopicMetricCollector extends AbstractMetricCollector { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); +public class ZookeeperMetricCollector extends AbstractMetricCollector { + protected static final ILog LOGGER = LogFactory.getLog(ZookeeperMetricCollector.class); @Autowired private VersionControlService versionControlService; @@ -52,7 +50,7 @@ public class ZookeeperMetricCollector extends AbstractMetricCollector collectKafkaMetrics(ClusterPhy clusterPhy) { Long startTime = System.currentTimeMillis(); Long clusterPhyId = clusterPhy.getId(); List items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); @@ -62,11 +60,11 @@ public class ZookeeperMetricCollector extends AbstractMetricCollector ret = zookeeperMetricService.collectMetricsFromZookeeper(param); @@ -91,16 +90,9 @@ public class ZookeeperMetricCollector extends AbstractMetricCollector esExecutor = FutureUtil.init( + "MetricsESSender", 10, 20, - 6000, - TimeUnit.MILLISECONDS, - new LinkedBlockingDeque<>(1000), - new NamedThreadFactory("KM-Collect-MetricESSender-ES"), - (r, e) -> LOGGER.warn("class=MetricESSender||msg=KM-Collect-MetricESSender-ES Deque is blocked, taskCount:{}" + e.getTaskCount()) + 10000 ); /** * 根据不同监控维度来发送 */ - protected boolean send2es(String index, List statsList){ + protected boolean send2es(String index, List statsList) { + LOGGER.info("method=send2es||indexName={}||metricsSize={}||msg=send metrics to es", index, statsList.size()); + if (CollectionUtils.isEmpty(statsList)) { return true; } - if (!EnvUtil.isOnline()) { - LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}", - index, statsList.size()); - } - BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index); - if (Objects.isNull( baseMetricESDao )) { - LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", index); + if (Objects.isNull(baseMetricESDao)) { + LOGGER.error("method=send2es||indexName={}||errMsg=find dao failed", index); return false; } - int size = statsList.size(); - int num = (size) % THRESHOLD == 0 ? (size / THRESHOLD) : (size / THRESHOLD + 1); + for (int i = 0; i < statsList.size(); i += THRESHOLD) { + final int idxStart = i; - if (size < THRESHOLD) { - esExecutor.execute( - () -> baseMetricESDao.batchInsertStats(statsList) - ); - return true; - } - - for (int i = 1; i < num + 1; i++) { - int end = (i * THRESHOLD) > size ? size : (i * THRESHOLD); - int start = (i - 1) * THRESHOLD; - - esExecutor.execute( - () -> baseMetricESDao.batchInsertStats(statsList.subList(start, end)) + // 异步发送 + esExecutor.submitTask( + () -> baseMetricESDao.batchInsertStats(statsList.subList(idxStart, Math.min(idxStart + THRESHOLD, statsList.size()))) ); } diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/BrokerMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/BrokerMetricESSender.java index 6708ba38..57d43ad8 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/BrokerMetricESSender.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/BrokerMetricESSender.java @@ -14,11 +14,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.B @Component public class BrokerMetricESSender extends AbstractMetricESSender implements ApplicationListener { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + private static final ILog LOGGER = LogFactory.getLog(BrokerMetricESSender.class); @PostConstruct public void init(){ - LOGGER.info("class=BrokerMetricESSender||method=init||msg=init finished"); + LOGGER.info("method=init||msg=init finished"); } @Override diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ClusterMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ClusterMetricESSender.java index 94091748..478a27bd 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ClusterMetricESSender.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ClusterMetricESSender.java @@ -15,11 +15,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.C @Component public class ClusterMetricESSender extends AbstractMetricESSender implements ApplicationListener { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + private static final ILog LOGGER = LogFactory.getLog(ClusterMetricESSender.class); @PostConstruct public void init(){ - LOGGER.info("class=ClusterMetricESSender||method=init||msg=init finished"); + LOGGER.info("method=init||msg=init finished"); } @Override diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/GroupMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/GroupMetricESSender.java index cd7a2242..e7e622c5 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/GroupMetricESSender.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/GroupMetricESSender.java @@ -15,11 +15,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.G @Component public class GroupMetricESSender extends AbstractMetricESSender implements ApplicationListener { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + private static final ILog LOGGER = LogFactory.getLog(GroupMetricESSender.class); @PostConstruct public void init(){ - LOGGER.info("class=GroupMetricESSender||method=init||msg=init finished"); + LOGGER.info("method=init||msg=init finished"); } @Override diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/PartitionMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/PartitionMetricESSender.java index ce108835..460d5e92 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/PartitionMetricESSender.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/PartitionMetricESSender.java @@ -14,11 +14,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.P @Component public class PartitionMetricESSender extends AbstractMetricESSender implements ApplicationListener { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + private static final ILog LOGGER = LogFactory.getLog(PartitionMetricESSender.class); @PostConstruct public void init(){ - LOGGER.info("class=PartitionMetricESSender||method=init||msg=init finished"); + LOGGER.info("method=init||msg=init finished"); } @Override diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ReplicaMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ReplicaMetricESSender.java index 76b2aa2a..9b39f3af 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ReplicaMetricESSender.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ReplicaMetricESSender.java @@ -14,11 +14,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.R @Component public class ReplicaMetricESSender extends AbstractMetricESSender implements ApplicationListener { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + private static final ILog LOGGER = LogFactory.getLog(ReplicaMetricESSender.class); @PostConstruct public void init(){ - LOGGER.info("class=GroupMetricESSender||method=init||msg=init finished"); + LOGGER.info("method=init||msg=init finished"); } @Override diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/TopicMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/TopicMetricESSender.java index eebd82aa..311a26fa 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/TopicMetricESSender.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/TopicMetricESSender.java @@ -15,11 +15,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.T @Component public class TopicMetricESSender extends AbstractMetricESSender implements ApplicationListener { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + private static final ILog LOGGER = LogFactory.getLog(TopicMetricESSender.class); @PostConstruct public void init(){ - LOGGER.info("class=TopicMetricESSender||method=init||msg=init finished"); + LOGGER.info("method=init||msg=init finished"); } @Override diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ZookeeperMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ZookeeperMetricESSender.java index 4f9dad53..f2f254d6 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ZookeeperMetricESSender.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ZookeeperMetricESSender.java @@ -14,11 +14,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.Z @Component public class ZookeeperMetricESSender extends AbstractMetricESSender implements ApplicationListener { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + private static final ILog LOGGER = LogFactory.getLog(ZookeeperMetricESSender.class); @PostConstruct public void init(){ - LOGGER.info("class=ZookeeperMetricESSender||method=init||msg=init finished"); + LOGGER.info("method=init||msg=init finished"); } @Override diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/LoggerUtil.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/LoggerUtil.java new file mode 100644 index 00000000..d8de462e --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/LoggerUtil.java @@ -0,0 +1,21 @@ +package com.xiaojukeji.know.streaming.km.common.utils; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; + +public class LoggerUtil { + private static final ILog MetricCollectedLogger = LogFactory.getLog("METRIC_COLLECTED_LOGGER"); + + private static final ILog ESLogger = LogFactory.getLog("ES_LOGGER"); + + public static ILog getMetricCollectedLogger() { + return MetricCollectedLogger; + } + + public static ILog getESLogger() { + return ESLogger; + } + + private LoggerUtil() { + } +} diff --git a/km-rest/src/main/resources/logback-spring.xml b/km-rest/src/main/resources/logback-spring.xml index 91c3af7f..09073029 100644 --- a/km-rest/src/main/resources/logback-spring.xml +++ b/km-rest/src/main/resources/logback-spring.xml @@ -149,14 +149,14 @@ - - ${log.path}/metric/metrics.log + + ${log.path}/metric/metric_collected.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n UTF-8 - ${log.path}/metric/metrics_%d{yyyy-MM-dd}.%i.log + ${log.path}/metric/metric_collected_%d{yyyy-MM-dd}.%i.log 100MB @@ -197,8 +197,8 @@ - - + +