[Optimize]日志统一格式&优化输出内容-part2(#800)

This commit is contained in:
zengqiao
2022-12-02 15:01:24 +08:00
parent 175b8d643a
commit 7a52cf67b0
18 changed files with 180 additions and 222 deletions

View File

@@ -1,25 +1,53 @@
package com.xiaojukeji.know.streaming.km.collector.metric; package com.xiaojukeji.know.streaming.km.collector.metric;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.collector.service.CollectThreadPoolService; import com.xiaojukeji.know.streaming.km.collector.service.CollectThreadPoolService;
import com.xiaojukeji.know.streaming.km.common.utils.LoggerUtil;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BaseMetricEvent; import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BaseMetricEvent;
import com.xiaojukeji.know.streaming.km.common.component.SpringTool; import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
/** /**
* @author didi * @author didi
*/ */
public abstract class AbstractMetricCollector<T> { public abstract class AbstractMetricCollector<T> {
public abstract void collectMetrics(ClusterPhy clusterPhy); protected static final ILog LOGGER = LogFactory.getLog(AbstractMetricCollector.class);
protected static final ILog METRIC_COLLECTED_LOGGER = LoggerUtil.getMetricCollectedLogger();
public abstract List<T> collectKafkaMetrics(ClusterPhy clusterPhy);
public abstract VersionItemTypeEnum collectorType(); public abstract VersionItemTypeEnum collectorType();
@Autowired @Autowired
private CollectThreadPoolService collectThreadPoolService; private CollectThreadPoolService collectThreadPoolService;
public void collectMetrics(ClusterPhy clusterPhy) {
long startTime = System.currentTimeMillis();
// 采集指标
List<T> metricsList = this.collectKafkaMetrics(clusterPhy);
// 输出耗时信息
LOGGER.info(
"metricType={}||clusterPhyId={}||costTimeUnitMs={}",
this.collectorType().getMessage(), clusterPhy.getId(), System.currentTimeMillis() - startTime
);
// 输出采集到的指标信息
METRIC_COLLECTED_LOGGER.debug("metricType={}||clusterPhyId={}||metrics={}!",
this.collectorType().getMessage(), clusterPhy.getId(), ConvertUtil.obj2Json(metricsList)
);
}
protected FutureWaitUtil<Void> getFutureUtilByClusterPhyId(Long clusterPhyId) { protected FutureWaitUtil<Void> getFutureUtilByClusterPhyId(Long clusterPhyId) {
return collectThreadPoolService.selectSuitableFutureUtil(clusterPhyId * 1000L + this.collectorType().getCode()); return collectThreadPoolService.selectSuitableFutureUtil(clusterPhyId * 1000L + this.collectorType().getCode());
} }

View File

@@ -1,6 +1,5 @@
package com.xiaojukeji.know.streaming.km.collector.metric.kafka; package com.xiaojukeji.know.streaming.km.collector.metric.kafka;
import com.alibaba.fastjson.JSON;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.collector.metric.AbstractMetricCollector; import com.xiaojukeji.know.streaming.km.collector.metric.AbstractMetricCollector;
@@ -12,7 +11,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionContro
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BrokerMetricEvent; import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BrokerMetricEvent;
import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
@@ -30,7 +28,7 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT
*/ */
@Component @Component
public class BrokerMetricCollector extends AbstractMetricCollector<BrokerMetrics> { public class BrokerMetricCollector extends AbstractMetricCollector<BrokerMetrics> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); private static final ILog LOGGER = LogFactory.getLog(BrokerMetricCollector.class);
@Autowired @Autowired
private VersionControlService versionControlService; private VersionControlService versionControlService;
@@ -42,8 +40,7 @@ public class BrokerMetricCollector extends AbstractMetricCollector<BrokerMetrics
private BrokerService brokerService; private BrokerService brokerService;
@Override @Override
public void collectMetrics(ClusterPhy clusterPhy) { public List<BrokerMetrics> collectKafkaMetrics(ClusterPhy clusterPhy) {
Long startTime = System.currentTimeMillis();
Long clusterPhyId = clusterPhy.getId(); Long clusterPhyId = clusterPhy.getId();
List<Broker> brokers = brokerService.listAliveBrokersFromDB(clusterPhy.getId()); List<Broker> brokers = brokerService.listAliveBrokersFromDB(clusterPhy.getId());
@@ -51,23 +48,23 @@ public class BrokerMetricCollector extends AbstractMetricCollector<BrokerMetrics
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId); FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId);
List<BrokerMetrics> brokerMetrics = new ArrayList<>(); List<BrokerMetrics> metricsList = new ArrayList<>();
for(Broker broker : brokers) { for(Broker broker : brokers) {
BrokerMetrics metrics = new BrokerMetrics(clusterPhyId, broker.getBrokerId(), broker.getHost(), broker.getPort()); BrokerMetrics metrics = new BrokerMetrics(clusterPhyId, broker.getBrokerId(), broker.getHost(), broker.getPort());
brokerMetrics.add(metrics); metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME);
metricsList.add(metrics);
future.runnableTask( future.runnableTask(
String.format("method=BrokerMetricCollector||clusterPhyId=%d||brokerId=%d", clusterPhyId, broker.getBrokerId()), String.format("class=BrokerMetricCollector||clusterPhyId=%d||brokerId=%d", clusterPhyId, broker.getBrokerId()),
30000, 30000,
() -> collectMetrics(clusterPhyId, metrics, items) () -> collectMetrics(clusterPhyId, metrics, items)
); );
} }
future.waitExecute(30000); future.waitExecute(30000);
this.publishMetric(new BrokerMetricEvent(this, brokerMetrics)); this.publishMetric(new BrokerMetricEvent(this, metricsList));
LOGGER.info("method=BrokerMetricCollector||clusterPhyId={}||startTime={}||costTime={}||msg=collect finished.", return metricsList;
clusterPhyId, startTime, System.currentTimeMillis() - startTime);
} }
@Override @Override
@@ -79,7 +76,6 @@ public class BrokerMetricCollector extends AbstractMetricCollector<BrokerMetrics
private void collectMetrics(Long clusterPhyId, BrokerMetrics metrics, List<VersionControlItem> items) { private void collectMetrics(Long clusterPhyId, BrokerMetrics metrics, List<VersionControlItem> items) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME);
for(VersionControlItem v : items) { for(VersionControlItem v : items) {
try { try {
@@ -93,14 +89,11 @@ public class BrokerMetricCollector extends AbstractMetricCollector<BrokerMetrics
} }
metrics.putMetric(ret.getData().getMetrics()); metrics.putMetric(ret.getData().getMetrics());
if(!EnvUtil.isOnline()){
LOGGER.info("method=BrokerMetricCollector||clusterId={}||brokerId={}||metric={}||metric={}!",
clusterPhyId, metrics.getBrokerId(), v.getName(), JSON.toJSONString(ret.getData().getMetrics()));
}
} catch (Exception e){ } catch (Exception e){
LOGGER.error("method=BrokerMetricCollector||clusterId={}||brokerId={}||metric={}||errMsg=exception!", LOGGER.error(
clusterPhyId, metrics.getBrokerId(), v.getName(), e); "method=collectMetrics||clusterPhyId={}||brokerId={}||metricName={}||errMsg=exception!",
clusterPhyId, metrics.getBrokerId(), v.getName(), e
);
} }
} }

View File

@@ -8,18 +8,15 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetric
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem; import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ClusterMetricEvent; import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ClusterMetricEvent;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService;
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService; import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Arrays; import java.util.Collections;
import java.util.List; import java.util.List;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_CLUSTER; import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_CLUSTER;
@@ -28,8 +25,8 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT
* @author didi * @author didi
*/ */
@Component @Component
public class ClusterMetricCollector extends AbstractMetricCollector<ClusterMetricPO> { public class ClusterMetricCollector extends AbstractMetricCollector<ClusterMetrics> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); protected static final ILog LOGGER = LogFactory.getLog(ClusterMetricCollector.class);
@Autowired @Autowired
private VersionControlService versionControlService; private VersionControlService versionControlService;
@@ -38,35 +35,37 @@ public class ClusterMetricCollector extends AbstractMetricCollector<ClusterMetri
private ClusterMetricService clusterMetricService; private ClusterMetricService clusterMetricService;
@Override @Override
public void collectMetrics(ClusterPhy clusterPhy) { public List<ClusterMetrics> collectKafkaMetrics(ClusterPhy clusterPhy) {
Long startTime = System.currentTimeMillis(); Long startTime = System.currentTimeMillis();
Long clusterPhyId = clusterPhy.getId(); Long clusterPhyId = clusterPhy.getId();
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
ClusterMetrics metrics = new ClusterMetrics(clusterPhyId, clusterPhy.getKafkaVersion()); ClusterMetrics metrics = new ClusterMetrics(clusterPhyId, clusterPhy.getKafkaVersion());
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME);
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId); FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId);
for(VersionControlItem v : items) { for(VersionControlItem v : items) {
future.runnableTask( future.runnableTask(
String.format("method=ClusterMetricCollector||clusterPhyId=%d||metricName=%s", clusterPhyId, v.getName()), String.format("class=ClusterMetricCollector||clusterPhyId=%d", clusterPhyId),
30000, 30000,
() -> { () -> {
try { try {
if(null != metrics.getMetrics().get(v.getName())){return null;} if(null != metrics.getMetrics().get(v.getName())){
return null;
}
Result<ClusterMetrics> ret = clusterMetricService.collectClusterMetricsFromKafka(clusterPhyId, v.getName()); Result<ClusterMetrics> ret = clusterMetricService.collectClusterMetricsFromKafka(clusterPhyId, v.getName());
if(null == ret || ret.failed() || null == ret.getData()){return null;} if(null == ret || ret.failed() || null == ret.getData()){
return null;
}
metrics.putMetric(ret.getData().getMetrics()); metrics.putMetric(ret.getData().getMetrics());
if(!EnvUtil.isOnline()){
LOGGER.info("method=ClusterMetricCollector||clusterPhyId={}||metricName={}||metricValue={}",
clusterPhyId, v.getName(), ConvertUtil.obj2Json(ret.getData().getMetrics()));
}
} catch (Exception e){ } catch (Exception e){
LOGGER.error("method=ClusterMetricCollector||clusterPhyId={}||metricName={}||errMsg=exception!", LOGGER.error(
clusterPhyId, v.getName(), e); "method=collectKafkaMetrics||clusterPhyId={}||metricName={}||errMsg=exception!",
clusterPhyId, v.getName(), e
);
} }
return null; return null;
@@ -77,10 +76,9 @@ public class ClusterMetricCollector extends AbstractMetricCollector<ClusterMetri
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f); metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f);
publishMetric(new ClusterMetricEvent(this, Arrays.asList(metrics))); publishMetric(new ClusterMetricEvent(this, Collections.singletonList(metrics)));
LOGGER.info("method=ClusterMetricCollector||clusterPhyId={}||startTime={}||costTime={}||msg=msg=collect finished.", return Collections.singletonList(metrics);
clusterPhyId, startTime, System.currentTimeMillis() - startTime);
} }
@Override @Override

View File

@@ -1,6 +1,5 @@
package com.xiaojukeji.know.streaming.km.collector.metric.kafka; package com.xiaojukeji.know.streaming.km.collector.metric.kafka;
import com.alibaba.fastjson.JSON;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.collector.metric.AbstractMetricCollector; import com.xiaojukeji.know.streaming.km.collector.metric.AbstractMetricCollector;
@@ -11,20 +10,16 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionContro
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.GroupMetricEvent; import com.xiaojukeji.know.streaming.km.common.bean.event.metric.GroupMetricEvent;
import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService; import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService; import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
import org.apache.commons.collections.CollectionUtils; import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_GROUP; import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_GROUP;
@@ -33,8 +28,8 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT
* @author didi * @author didi
*/ */
@Component @Component
public class GroupMetricCollector extends AbstractMetricCollector<List<GroupMetrics>> { public class GroupMetricCollector extends AbstractMetricCollector<GroupMetrics> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); protected static final ILog LOGGER = LogFactory.getLog(GroupMetricCollector.class);
@Autowired @Autowired
private VersionControlService versionControlService; private VersionControlService versionControlService;
@@ -46,40 +41,38 @@ public class GroupMetricCollector extends AbstractMetricCollector<List<GroupMetr
private GroupService groupService; private GroupService groupService;
@Override @Override
public void collectMetrics(ClusterPhy clusterPhy) { public List<GroupMetrics> collectKafkaMetrics(ClusterPhy clusterPhy) {
Long startTime = System.currentTimeMillis();
Long clusterPhyId = clusterPhy.getId(); Long clusterPhyId = clusterPhy.getId();
List<String> groups = new ArrayList<>(); List<String> groupNameList = new ArrayList<>();
try { try {
groups = groupService.listGroupsFromKafka(clusterPhyId); groupNameList = groupService.listGroupsFromKafka(clusterPhyId);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("method=GroupMetricCollector||clusterPhyId={}||msg=exception!", clusterPhyId, e); LOGGER.error("method=collectKafkaMetrics||clusterPhyId={}||msg=exception!", clusterPhyId, e);
} }
if(CollectionUtils.isEmpty(groups)){return;} if(ValidateUtils.isEmptyList(groupNameList)) {
return Collections.emptyList();
}
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId); FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId);
Map<String, List<GroupMetrics>> metricsMap = new ConcurrentHashMap<>(); Map<String, List<GroupMetrics>> metricsMap = new ConcurrentHashMap<>();
for(String groupName : groups) { for(String groupName : groupNameList) {
future.runnableTask( future.runnableTask(
String.format("method=GroupMetricCollector||clusterPhyId=%d||groupName=%s", clusterPhyId, groupName), String.format("class=GroupMetricCollector||clusterPhyId=%d||groupName=%s", clusterPhyId, groupName),
30000, 30000,
() -> collectMetrics(clusterPhyId, groupName, metricsMap, items)); () -> collectMetrics(clusterPhyId, groupName, metricsMap, items));
} }
future.waitResult(30000); future.waitResult(30000);
List<GroupMetrics> metricsList = new ArrayList<>(); List<GroupMetrics> metricsList = metricsMap.values().stream().collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll);
metricsMap.values().forEach(elem -> metricsList.addAll(elem));
publishMetric(new GroupMetricEvent(this, metricsList)); publishMetric(new GroupMetricEvent(this, metricsList));
return metricsList;
LOGGER.info("method=GroupMetricCollector||clusterPhyId={}||startTime={}||cost={}||msg=collect finished.",
clusterPhyId, startTime, System.currentTimeMillis() - startTime);
} }
@Override @Override
@@ -92,9 +85,7 @@ public class GroupMetricCollector extends AbstractMetricCollector<List<GroupMetr
private void collectMetrics(Long clusterPhyId, String groupName, Map<String, List<GroupMetrics>> metricsMap, List<VersionControlItem> items) { private void collectMetrics(Long clusterPhyId, String groupName, Map<String, List<GroupMetrics>> metricsMap, List<VersionControlItem> items) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
List<GroupMetrics> groupMetricsList = new ArrayList<>(); Map<TopicPartition, GroupMetrics> subMetricMap = new HashMap<>();
Map<String, GroupMetrics> tpGroupPOMap = new HashMap<>();
GroupMetrics groupMetrics = new GroupMetrics(clusterPhyId, groupName, true); GroupMetrics groupMetrics = new GroupMetrics(clusterPhyId, groupName, true);
groupMetrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME); groupMetrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME);
@@ -108,38 +99,31 @@ public class GroupMetricCollector extends AbstractMetricCollector<List<GroupMetr
continue; continue;
} }
ret.getData().stream().forEach(metrics -> { ret.getData().forEach(metrics -> {
if (metrics.isBGroupMetric()) { if (metrics.isBGroupMetric()) {
groupMetrics.putMetric(metrics.getMetrics()); groupMetrics.putMetric(metrics.getMetrics());
} else { return;
String topicName = metrics.getTopic();
Integer partitionId = metrics.getPartitionId();
String tpGroupKey = genTopicPartitionGroupKey(topicName, partitionId);
tpGroupPOMap.putIfAbsent(tpGroupKey, new GroupMetrics(clusterPhyId, partitionId, topicName, groupName, false));
tpGroupPOMap.get(tpGroupKey).putMetric(metrics.getMetrics());
} }
});
if(!EnvUtil.isOnline()){ TopicPartition tp = new TopicPartition(metrics.getTopic(), metrics.getPartitionId());
LOGGER.info("method=GroupMetricCollector||clusterPhyId={}||groupName={}||metricName={}||metricValue={}", subMetricMap.putIfAbsent(tp, new GroupMetrics(clusterPhyId, metrics.getPartitionId(), metrics.getTopic(), groupName, false));
clusterPhyId, groupName, metricName, JSON.toJSONString(ret.getData())); subMetricMap.get(tp).putMetric(metrics.getMetrics());
} });
}catch (Exception e){ } catch (Exception e) {
LOGGER.error("method=GroupMetricCollector||clusterPhyId={}||groupName={}||errMsg=exception!", clusterPhyId, groupName, e); LOGGER.error(
"method=collectMetrics||clusterPhyId={}||groupName={}||errMsg=exception!",
clusterPhyId, groupName, e
);
} }
} }
groupMetricsList.add(groupMetrics); List<GroupMetrics> metricsList = new ArrayList<>();
groupMetricsList.addAll(tpGroupPOMap.values()); metricsList.add(groupMetrics);
metricsList.addAll(subMetricMap.values());
// 记录采集性能 // 记录采集性能
groupMetrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f); groupMetrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f);
metricsMap.put(groupName, groupMetricsList); metricsMap.put(groupName, metricsList);
}
private String genTopicPartitionGroupKey(String topic, Integer partitionId){
return topic + "@" + partitionId;
} }
} }

View File

@@ -10,8 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem; import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.PartitionMetricEvent; import com.xiaojukeji.know.streaming.km.common.bean.event.metric.PartitionMetricEvent;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
@@ -29,7 +27,7 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT
*/ */
@Component @Component
public class PartitionMetricCollector extends AbstractMetricCollector<PartitionMetrics> { public class PartitionMetricCollector extends AbstractMetricCollector<PartitionMetrics> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); protected static final ILog LOGGER = LogFactory.getLog(PartitionMetricCollector.class);
@Autowired @Autowired
private VersionControlService versionControlService; private VersionControlService versionControlService;
@@ -41,14 +39,11 @@ public class PartitionMetricCollector extends AbstractMetricCollector<PartitionM
private TopicService topicService; private TopicService topicService;
@Override @Override
public void collectMetrics(ClusterPhy clusterPhy) { public List<PartitionMetrics> collectKafkaMetrics(ClusterPhy clusterPhy) {
Long startTime = System.currentTimeMillis();
Long clusterPhyId = clusterPhy.getId(); Long clusterPhyId = clusterPhy.getId();
List<Topic> topicList = topicService.listTopicsFromCacheFirst(clusterPhyId); List<Topic> topicList = topicService.listTopicsFromCacheFirst(clusterPhyId);
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
// 获取集群所有分区
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId); FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId);
Map<String, Map<Integer, PartitionMetrics>> metricsMap = new ConcurrentHashMap<>(); Map<String, Map<Integer, PartitionMetrics>> metricsMap = new ConcurrentHashMap<>();
@@ -56,9 +51,9 @@ public class PartitionMetricCollector extends AbstractMetricCollector<PartitionM
metricsMap.put(topic.getTopicName(), new ConcurrentHashMap<>()); metricsMap.put(topic.getTopicName(), new ConcurrentHashMap<>());
future.runnableTask( future.runnableTask(
String.format("method=PartitionMetricCollector||clusterPhyId=%d||topicName=%s", clusterPhyId, topic.getTopicName()), String.format("class=PartitionMetricCollector||clusterPhyId=%d||topicName=%s", clusterPhyId, topic.getTopicName()),
30000, 30000,
() -> collectMetrics(clusterPhyId, topic.getTopicName(), metricsMap.get(topic.getTopicName()), items) () -> this.collectMetrics(clusterPhyId, topic.getTopicName(), metricsMap.get(topic.getTopicName()), items)
); );
} }
@@ -69,10 +64,7 @@ public class PartitionMetricCollector extends AbstractMetricCollector<PartitionM
this.publishMetric(new PartitionMetricEvent(this, metricsList)); this.publishMetric(new PartitionMetricEvent(this, metricsList));
LOGGER.info( return metricsList;
"method=PartitionMetricCollector||clusterPhyId={}||startTime={}||costTime={}||msg=collect finished.",
clusterPhyId, startTime, System.currentTimeMillis() - startTime
);
} }
@Override @Override
@@ -110,17 +102,9 @@ public class PartitionMetricCollector extends AbstractMetricCollector<PartitionM
PartitionMetrics allMetrics = metricsMap.get(subMetrics.getPartitionId()); PartitionMetrics allMetrics = metricsMap.get(subMetrics.getPartitionId());
allMetrics.putMetric(subMetrics.getMetrics()); allMetrics.putMetric(subMetrics.getMetrics());
} }
if (!EnvUtil.isOnline()) {
LOGGER.info(
"class=PartitionMetricCollector||method=collectMetrics||clusterPhyId={}||topicName={}||metricName={}||metricValue={}!",
clusterPhyId, topicName, v.getName(), ConvertUtil.obj2Json(ret.getData())
);
}
} catch (Exception e) { } catch (Exception e) {
LOGGER.info( LOGGER.info(
"class=PartitionMetricCollector||method=collectMetrics||clusterPhyId={}||topicName={}||metricName={}||errMsg=exception", "method=collectMetrics||clusterPhyId={}||topicName={}||metricName={}||errMsg=exception",
clusterPhyId, topicName, v.getName(), e clusterPhyId, topicName, v.getName(), e
); );
} }

View File

@@ -1,6 +1,5 @@
package com.xiaojukeji.know.streaming.km.collector.metric.kafka; package com.xiaojukeji.know.streaming.km.collector.metric.kafka;
import com.alibaba.fastjson.JSON;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.collector.metric.AbstractMetricCollector; import com.xiaojukeji.know.streaming.km.collector.metric.AbstractMetricCollector;
@@ -12,7 +11,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionContro
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ReplicaMetricEvent; import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ReplicaMetricEvent;
import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService; import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService;
@@ -30,7 +28,7 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT
*/ */
@Component @Component
public class ReplicaMetricCollector extends AbstractMetricCollector<ReplicationMetrics> { public class ReplicaMetricCollector extends AbstractMetricCollector<ReplicationMetrics> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); protected static final ILog LOGGER = LogFactory.getLog(ReplicaMetricCollector.class);
@Autowired @Autowired
private VersionControlService versionControlService; private VersionControlService versionControlService;
@@ -42,12 +40,10 @@ public class ReplicaMetricCollector extends AbstractMetricCollector<ReplicationM
private PartitionService partitionService; private PartitionService partitionService;
@Override @Override
public void collectMetrics(ClusterPhy clusterPhy) { public List<ReplicationMetrics> collectKafkaMetrics(ClusterPhy clusterPhy) {
Long startTime = System.currentTimeMillis();
Long clusterPhyId = clusterPhy.getId(); Long clusterPhyId = clusterPhy.getId();
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
List<Partition> partitions = partitionService.listPartitionFromCacheFirst(clusterPhyId);
List<Partition> partitions = partitionService.listPartitionByCluster(clusterPhyId);
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId); FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId);
@@ -55,10 +51,11 @@ public class ReplicaMetricCollector extends AbstractMetricCollector<ReplicationM
for(Partition partition : partitions) { for(Partition partition : partitions) {
for (Integer brokerId: partition.getAssignReplicaList()) { for (Integer brokerId: partition.getAssignReplicaList()) {
ReplicationMetrics metrics = new ReplicationMetrics(clusterPhyId, partition.getTopicName(), brokerId, partition.getPartitionId()); ReplicationMetrics metrics = new ReplicationMetrics(clusterPhyId, partition.getTopicName(), brokerId, partition.getPartitionId());
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME);
metricsList.add(metrics); metricsList.add(metrics);
future.runnableTask( future.runnableTask(
String.format("method=ReplicaMetricCollector||clusterPhyId=%d||brokerId=%d||topicName=%s||partitionId=%d", String.format("class=ReplicaMetricCollector||clusterPhyId=%d||brokerId=%d||topicName=%s||partitionId=%d",
clusterPhyId, brokerId, partition.getTopicName(), partition.getPartitionId()), clusterPhyId, brokerId, partition.getTopicName(), partition.getPartitionId()),
30000, 30000,
() -> collectMetrics(clusterPhyId, metrics, items) () -> collectMetrics(clusterPhyId, metrics, items)
@@ -70,8 +67,7 @@ public class ReplicaMetricCollector extends AbstractMetricCollector<ReplicationM
publishMetric(new ReplicaMetricEvent(this, metricsList)); publishMetric(new ReplicaMetricEvent(this, metricsList));
LOGGER.info("method=ReplicaMetricCollector||clusterPhyId={}||startTime={}||costTime={}||msg=collect finished.", return metricsList;
clusterPhyId, startTime, System.currentTimeMillis() - startTime);
} }
@Override @Override
@@ -84,8 +80,6 @@ public class ReplicaMetricCollector extends AbstractMetricCollector<ReplicationM
private ReplicationMetrics collectMetrics(Long clusterPhyId, ReplicationMetrics metrics, List<VersionControlItem> items) { private ReplicationMetrics collectMetrics(Long clusterPhyId, ReplicationMetrics metrics, List<VersionControlItem> items) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME);
for(VersionControlItem v : items) { for(VersionControlItem v : items) {
try { try {
if (metrics.getMetrics().containsKey(v.getName())) { if (metrics.getMetrics().containsKey(v.getName())) {
@@ -105,15 +99,11 @@ public class ReplicaMetricCollector extends AbstractMetricCollector<ReplicationM
} }
metrics.putMetric(ret.getData().getMetrics()); metrics.putMetric(ret.getData().getMetrics());
if (!EnvUtil.isOnline()) {
LOGGER.info("method=ReplicaMetricCollector||clusterPhyId={}||topicName={}||partitionId={}||metricName={}||metricValue={}",
clusterPhyId, metrics.getTopic(), metrics.getPartitionId(), v.getName(), JSON.toJSONString(ret.getData().getMetrics()));
}
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("method=ReplicaMetricCollector||clusterPhyId={}||topicName={}||partition={}||metricName={}||errMsg=exception!", LOGGER.error(
clusterPhyId, metrics.getTopic(), metrics.getPartitionId(), v.getName(), e); "method=collectMetrics||clusterPhyId={}||topicName={}||partition={}||metricName={}||errMsg=exception!",
clusterPhyId, metrics.getTopic(), metrics.getPartitionId(), v.getName(), e
);
} }
} }

View File

@@ -11,8 +11,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionContro
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.TopicMetricEvent; import com.xiaojukeji.know.streaming.km.common.bean.event.metric.TopicMetricEvent;
import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
@@ -32,8 +30,8 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT
* @author didi * @author didi
*/ */
@Component @Component
public class TopicMetricCollector extends AbstractMetricCollector<List<TopicMetrics>> { public class TopicMetricCollector extends AbstractMetricCollector<TopicMetrics> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); protected static final ILog LOGGER = LogFactory.getLog(TopicMetricCollector.class);
@Autowired @Autowired
private VersionControlService versionControlService; private VersionControlService versionControlService;
@@ -47,8 +45,7 @@ public class TopicMetricCollector extends AbstractMetricCollector<List<TopicMetr
private static final Integer AGG_METRICS_BROKER_ID = -10000; private static final Integer AGG_METRICS_BROKER_ID = -10000;
@Override @Override
public void collectMetrics(ClusterPhy clusterPhy) { public List<TopicMetrics> collectKafkaMetrics(ClusterPhy clusterPhy) {
Long startTime = System.currentTimeMillis();
Long clusterPhyId = clusterPhy.getId(); Long clusterPhyId = clusterPhy.getId();
List<Topic> topics = topicService.listTopicsFromCacheFirst(clusterPhyId); List<Topic> topics = topicService.listTopicsFromCacheFirst(clusterPhyId);
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
@@ -65,7 +62,7 @@ public class TopicMetricCollector extends AbstractMetricCollector<List<TopicMetr
allMetricsMap.put(topic.getTopicName(), metricsMap); allMetricsMap.put(topic.getTopicName(), metricsMap);
future.runnableTask( future.runnableTask(
String.format("method=TopicMetricCollector||clusterPhyId=%d||topicName=%s", clusterPhyId, topic.getTopicName()), String.format("class=TopicMetricCollector||clusterPhyId=%d||topicName=%s", clusterPhyId, topic.getTopicName()),
30000, 30000,
() -> collectMetrics(clusterPhyId, topic.getTopicName(), metricsMap, items) () -> collectMetrics(clusterPhyId, topic.getTopicName(), metricsMap, items)
); );
@@ -78,8 +75,7 @@ public class TopicMetricCollector extends AbstractMetricCollector<List<TopicMetr
this.publishMetric(new TopicMetricEvent(this, metricsList)); this.publishMetric(new TopicMetricEvent(this, metricsList));
LOGGER.info("method=TopicMetricCollector||clusterPhyId={}||startTime={}||costTime={}||msg=collect finished.", return metricsList;
clusterPhyId, startTime, System.currentTimeMillis() - startTime);
} }
@Override @Override
@@ -119,14 +115,9 @@ public class TopicMetricCollector extends AbstractMetricCollector<List<TopicMetr
metricsMap.get(metrics.getBrokerId()).putMetric(metrics.getMetrics()); metricsMap.get(metrics.getBrokerId()).putMetric(metrics.getMetrics());
} }
}); });
if (!EnvUtil.isOnline()) {
LOGGER.info("method=TopicMetricCollector||clusterPhyId={}||topicName={}||metricName={}||metricValue={}.",
clusterPhyId, topicName, v.getName(), ConvertUtil.obj2Json(ret.getData())
);
}
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("method=TopicMetricCollector||clusterPhyId={}||topicName={}||metricName={}||errMsg=exception!", LOGGER.error(
"method=collectMetrics||clusterPhyId={}||topicName={}||metricName={}||errMsg=exception!",
clusterPhyId, topicName, v.getName(), e clusterPhyId, topicName, v.getName(), e
); );
} }

View File

@@ -15,10 +15,8 @@ import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ZookeeperMetric
import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO;
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService; import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService; import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
@@ -26,7 +24,7 @@ import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Arrays; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -36,8 +34,8 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT
* @author didi * @author didi
*/ */
@Component @Component
public class ZookeeperMetricCollector extends AbstractMetricCollector<ZookeeperMetricPO> { public class ZookeeperMetricCollector extends AbstractMetricCollector<ZookeeperMetrics> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); protected static final ILog LOGGER = LogFactory.getLog(ZookeeperMetricCollector.class);
@Autowired @Autowired
private VersionControlService versionControlService; private VersionControlService versionControlService;
@@ -52,7 +50,7 @@ public class ZookeeperMetricCollector extends AbstractMetricCollector<ZookeeperM
private KafkaControllerService kafkaControllerService; private KafkaControllerService kafkaControllerService;
@Override @Override
public void collectMetrics(ClusterPhy clusterPhy) { public List<ZookeeperMetrics> collectKafkaMetrics(ClusterPhy clusterPhy) {
Long startTime = System.currentTimeMillis(); Long startTime = System.currentTimeMillis();
Long clusterPhyId = clusterPhy.getId(); Long clusterPhyId = clusterPhy.getId();
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
@@ -62,11 +60,11 @@ public class ZookeeperMetricCollector extends AbstractMetricCollector<ZookeeperM
.collect(Collectors.toList()); .collect(Collectors.toList());
KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId); KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId);
ZookeeperMetrics metrics = ZookeeperMetrics.initWithMetric(clusterPhyId, Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (float)Constant.INVALID_CODE); ZookeeperMetrics metrics = ZookeeperMetrics.initWithMetric(clusterPhyId, Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME);
if (ValidateUtils.isEmptyList(aliveZKList)) { if (ValidateUtils.isEmptyList(aliveZKList)) {
// 没有存活的ZK时发布事件然后直接返回 // 没有存活的ZK时发布事件然后直接返回
publishMetric(new ZookeeperMetricEvent(this, Arrays.asList(metrics))); publishMetric(new ZookeeperMetricEvent(this, Collections.singletonList(metrics)));
return; return Collections.singletonList(metrics);
} }
// 构造参数 // 构造参数
@@ -83,6 +81,7 @@ public class ZookeeperMetricCollector extends AbstractMetricCollector<ZookeeperM
if(null != metrics.getMetrics().get(v.getName())) { if(null != metrics.getMetrics().get(v.getName())) {
continue; continue;
} }
param.setMetricName(v.getName()); param.setMetricName(v.getName());
Result<ZookeeperMetrics> ret = zookeeperMetricService.collectMetricsFromZookeeper(param); Result<ZookeeperMetrics> ret = zookeeperMetricService.collectMetricsFromZookeeper(param);
@@ -91,16 +90,9 @@ public class ZookeeperMetricCollector extends AbstractMetricCollector<ZookeeperM
} }
metrics.putMetric(ret.getData().getMetrics()); metrics.putMetric(ret.getData().getMetrics());
if(!EnvUtil.isOnline()){
LOGGER.info(
"class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||metricName={}||metricValue={}",
clusterPhyId, v.getName(), ConvertUtil.obj2Json(ret.getData().getMetrics())
);
}
} catch (Exception e){ } catch (Exception e){
LOGGER.error( LOGGER.error(
"class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||metricName={}||errMsg=exception!", "method=collectMetrics||clusterPhyId={}||metricName={}||errMsg=exception!",
clusterPhyId, v.getName(), e clusterPhyId, v.getName(), e
); );
} }
@@ -108,12 +100,9 @@ public class ZookeeperMetricCollector extends AbstractMetricCollector<ZookeeperM
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f); metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f);
publishMetric(new ZookeeperMetricEvent(this, Arrays.asList(metrics))); this.publishMetric(new ZookeeperMetricEvent(this, Collections.singletonList(metrics)));
LOGGER.info( return Collections.singletonList(metrics);
"class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||startTime={}||costTime={}||msg=msg=collect finished.",
clusterPhyId, startTime, System.currentTimeMillis() - startTime
);
} }
@Override @Override

View File

@@ -3,67 +3,47 @@ package com.xiaojukeji.know.streaming.km.collector.sink;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO; import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
import com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO; import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public abstract class AbstractMetricESSender { public abstract class AbstractMetricESSender {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); private static final ILog LOGGER = LogFactory.getLog(AbstractMetricESSender.class);
private static final int THRESHOLD = 100; private static final int THRESHOLD = 100;
private static final ThreadPoolExecutor esExecutor = new ThreadPoolExecutor( private static final FutureUtil<Void> esExecutor = FutureUtil.init(
"MetricsESSender",
10, 10,
20, 20,
6000, 10000
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(1000),
new NamedThreadFactory("KM-Collect-MetricESSender-ES"),
(r, e) -> LOGGER.warn("class=MetricESSender||msg=KM-Collect-MetricESSender-ES Deque is blocked, taskCount:{}" + e.getTaskCount())
); );
/** /**
* 根据不同监控维度来发送 * 根据不同监控维度来发送
*/ */
protected boolean send2es(String index, List<? extends BaseESPO> statsList){ protected boolean send2es(String index, List<? extends BaseESPO> statsList) {
LOGGER.info("method=send2es||indexName={}||metricsSize={}||msg=send metrics to es", index, statsList.size());
if (CollectionUtils.isEmpty(statsList)) { if (CollectionUtils.isEmpty(statsList)) {
return true; return true;
} }
if (!EnvUtil.isOnline()) {
LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}",
index, statsList.size());
}
BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index); BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index);
if (Objects.isNull( baseMetricESDao )) { if (Objects.isNull(baseMetricESDao)) {
LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", index); LOGGER.error("method=send2es||indexName={}||errMsg=find dao failed", index);
return false; return false;
} }
int size = statsList.size(); for (int i = 0; i < statsList.size(); i += THRESHOLD) {
int num = (size) % THRESHOLD == 0 ? (size / THRESHOLD) : (size / THRESHOLD + 1); final int idxStart = i;
if (size < THRESHOLD) { // 异步发送
esExecutor.execute( esExecutor.submitTask(
() -> baseMetricESDao.batchInsertStats(statsList) () -> baseMetricESDao.batchInsertStats(statsList.subList(idxStart, Math.min(idxStart + THRESHOLD, statsList.size())))
);
return true;
}
for (int i = 1; i < num + 1; i++) {
int end = (i * THRESHOLD) > size ? size : (i * THRESHOLD);
int start = (i - 1) * THRESHOLD;
esExecutor.execute(
() -> baseMetricESDao.batchInsertStats(statsList.subList(start, end))
); );
} }

View File

@@ -14,11 +14,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.B
@Component @Component
public class BrokerMetricESSender extends AbstractMetricESSender implements ApplicationListener<BrokerMetricEvent> { public class BrokerMetricESSender extends AbstractMetricESSender implements ApplicationListener<BrokerMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); private static final ILog LOGGER = LogFactory.getLog(BrokerMetricESSender.class);
@PostConstruct @PostConstruct
public void init(){ public void init(){
LOGGER.info("class=BrokerMetricESSender||method=init||msg=init finished"); LOGGER.info("method=init||msg=init finished");
} }
@Override @Override

View File

@@ -15,11 +15,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.C
@Component @Component
public class ClusterMetricESSender extends AbstractMetricESSender implements ApplicationListener<ClusterMetricEvent> { public class ClusterMetricESSender extends AbstractMetricESSender implements ApplicationListener<ClusterMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); private static final ILog LOGGER = LogFactory.getLog(ClusterMetricESSender.class);
@PostConstruct @PostConstruct
public void init(){ public void init(){
LOGGER.info("class=ClusterMetricESSender||method=init||msg=init finished"); LOGGER.info("method=init||msg=init finished");
} }
@Override @Override

View File

@@ -15,11 +15,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.G
@Component @Component
public class GroupMetricESSender extends AbstractMetricESSender implements ApplicationListener<GroupMetricEvent> { public class GroupMetricESSender extends AbstractMetricESSender implements ApplicationListener<GroupMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); private static final ILog LOGGER = LogFactory.getLog(GroupMetricESSender.class);
@PostConstruct @PostConstruct
public void init(){ public void init(){
LOGGER.info("class=GroupMetricESSender||method=init||msg=init finished"); LOGGER.info("method=init||msg=init finished");
} }
@Override @Override

View File

@@ -14,11 +14,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.P
@Component @Component
public class PartitionMetricESSender extends AbstractMetricESSender implements ApplicationListener<PartitionMetricEvent> { public class PartitionMetricESSender extends AbstractMetricESSender implements ApplicationListener<PartitionMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); private static final ILog LOGGER = LogFactory.getLog(PartitionMetricESSender.class);
@PostConstruct @PostConstruct
public void init(){ public void init(){
LOGGER.info("class=PartitionMetricESSender||method=init||msg=init finished"); LOGGER.info("method=init||msg=init finished");
} }
@Override @Override

View File

@@ -14,11 +14,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.R
@Component @Component
public class ReplicaMetricESSender extends AbstractMetricESSender implements ApplicationListener<ReplicaMetricEvent> { public class ReplicaMetricESSender extends AbstractMetricESSender implements ApplicationListener<ReplicaMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); private static final ILog LOGGER = LogFactory.getLog(ReplicaMetricESSender.class);
@PostConstruct @PostConstruct
public void init(){ public void init(){
LOGGER.info("class=GroupMetricESSender||method=init||msg=init finished"); LOGGER.info("method=init||msg=init finished");
} }
@Override @Override

View File

@@ -15,11 +15,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.T
@Component @Component
public class TopicMetricESSender extends AbstractMetricESSender implements ApplicationListener<TopicMetricEvent> { public class TopicMetricESSender extends AbstractMetricESSender implements ApplicationListener<TopicMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); private static final ILog LOGGER = LogFactory.getLog(TopicMetricESSender.class);
@PostConstruct @PostConstruct
public void init(){ public void init(){
LOGGER.info("class=TopicMetricESSender||method=init||msg=init finished"); LOGGER.info("method=init||msg=init finished");
} }
@Override @Override

View File

@@ -14,11 +14,11 @@ import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.Z
@Component @Component
public class ZookeeperMetricESSender extends AbstractMetricESSender implements ApplicationListener<ZookeeperMetricEvent> { public class ZookeeperMetricESSender extends AbstractMetricESSender implements ApplicationListener<ZookeeperMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); private static final ILog LOGGER = LogFactory.getLog(ZookeeperMetricESSender.class);
@PostConstruct @PostConstruct
public void init(){ public void init(){
LOGGER.info("class=ZookeeperMetricESSender||method=init||msg=init finished"); LOGGER.info("method=init||msg=init finished");
} }
@Override @Override

View File

@@ -0,0 +1,21 @@
package com.xiaojukeji.know.streaming.km.common.utils;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
public class LoggerUtil {
private static final ILog MetricCollectedLogger = LogFactory.getLog("METRIC_COLLECTED_LOGGER");
private static final ILog ESLogger = LogFactory.getLog("ES_LOGGER");
public static ILog getMetricCollectedLogger() {
return MetricCollectedLogger;
}
public static ILog getESLogger() {
return ESLogger;
}
private LoggerUtil() {
}
}

View File

@@ -149,14 +149,14 @@
</appender> </appender>
<!-- Metrics信息收集日志 --> <!-- Metrics信息收集日志 -->
<appender name="METRIC_LOGGER" class="ch.qos.logback.core.rolling.RollingFileAppender"> <appender name="METRIC_COLLECTED_LOGGER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/metric/metrics.log</file> <file>${log.path}/metric/metric_collected.log</file>
<encoder> <encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <charset>UTF-8</charset>
</encoder> </encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/metric/metrics_%d{yyyy-MM-dd}.%i.log</fileNamePattern> <fileNamePattern>${log.path}/metric/metric_collected_%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize> <maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy> </timeBasedFileNamingAndTriggeringPolicy>
@@ -197,8 +197,8 @@
<appender-ref ref="ES_LOGGER"/> <appender-ref ref="ES_LOGGER"/>
</logger> </logger>
<logger name="METRIC_LOGGER" level="ERROR" additivity="false"> <logger name="METRIC_COLLECTED_LOGGER" level="ERROR" additivity="false">
<appender-ref ref="METRIC_LOGGER"/> <appender-ref ref="METRIC_COLLECTED_LOGGER"/>
</logger> </logger>
<logger name="TASK_LOGGER" level="ERROR" additivity="false"> <logger name="TASK_LOGGER" level="ERROR" additivity="false">