[Optimize]去除Replica指标从ES读写的相关代码(#862)

This commit is contained in:
zengqiao
2023-01-09 14:47:18 +08:00
committed by EricZeng
parent a8b56fb613
commit f4a219ceef
15 changed files with 1 additions and 581 deletions

View File

@@ -1,114 +0,0 @@
package com.xiaojukeji.know.streaming.km.collector.metric.kafka;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ReplicationMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ReplicaMetricEvent;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService;
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_REPLICATION;
/**
* @author didi
*/
@Component
public class ReplicaMetricCollector extends AbstractKafkaMetricCollector<ReplicationMetrics> {
protected static final ILog LOGGER = LogFactory.getLog(ReplicaMetricCollector.class);
@Autowired
private VersionControlService versionControlService;
@Autowired
private ReplicaMetricService replicaMetricService;
@Autowired
private PartitionService partitionService;
@Override
public List<ReplicationMetrics> collectKafkaMetrics(ClusterPhy clusterPhy) {
Long clusterPhyId = clusterPhy.getId();
List<Partition> partitions = partitionService.listPartitionFromCacheFirst(clusterPhyId);
List<VersionControlItem> items = versionControlService.listVersionControlItem(this.getClusterVersion(clusterPhy), collectorType().getCode());
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId);
List<ReplicationMetrics> metricsList = new ArrayList<>();
for(Partition partition : partitions) {
for (Integer brokerId: partition.getAssignReplicaList()) {
ReplicationMetrics metrics = new ReplicationMetrics(clusterPhyId, partition.getTopicName(), brokerId, partition.getPartitionId());
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME);
metricsList.add(metrics);
future.runnableTask(
String.format("class=ReplicaMetricCollector||clusterPhyId=%d||brokerId=%d||topicName=%s||partitionId=%d",
clusterPhyId, brokerId, partition.getTopicName(), partition.getPartitionId()),
30000,
() -> collectMetrics(clusterPhyId, metrics, items)
);
}
}
future.waitExecute(30000);
publishMetric(new ReplicaMetricEvent(this, metricsList));
return metricsList;
}
@Override
public VersionItemTypeEnum collectorType() {
return METRIC_REPLICATION;
}
/**************************************************** private method ****************************************************/
private ReplicationMetrics collectMetrics(Long clusterPhyId, ReplicationMetrics metrics, List<VersionControlItem> items) {
long startTime = System.currentTimeMillis();
for(VersionControlItem v : items) {
try {
if (metrics.getMetrics().containsKey(v.getName())) {
continue;
}
Result<ReplicationMetrics> ret = replicaMetricService.collectReplicaMetricsFromKafka(
clusterPhyId,
metrics.getTopic(),
metrics.getBrokerId(),
metrics.getPartitionId(),
v.getName()
);
if (null == ret || ret.failed() || null == ret.getData()) {
continue;
}
metrics.putMetric(ret.getData().getMetrics());
} catch (Exception e) {
LOGGER.error(
"method=collectMetrics||clusterPhyId={}||topicName={}||partition={}||metricName={}||errMsg=exception!",
clusterPhyId, metrics.getTopic(), metrics.getPartitionId(), v.getName(), e
);
}
}
// 记录采集性能
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f);
return metrics;
}
}

View File

@@ -1,29 +0,0 @@
package com.xiaojukeji.know.streaming.km.collector.sink.kafka;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.collector.sink.AbstractMetricESSender;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ReplicaMetricEvent;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.REPLICATION_INDEX;
@Component
public class ReplicaMetricESSender extends AbstractMetricESSender implements ApplicationListener<ReplicaMetricEvent> {
private static final ILog LOGGER = LogFactory.getLog(ReplicaMetricESSender.class);
@PostConstruct
public void init(){
LOGGER.info("method=init||msg=init finished");
}
@Override
public void onApplicationEvent(ReplicaMetricEvent event) {
send2es(REPLICATION_INDEX, ConvertUtil.list2List(event.getReplicationMetrics(), ReplicationMetricPO.class));
}
}

View File

@@ -1,20 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.bean.event.metric;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ReplicationMetrics;
import lombok.Getter;
import java.util.List;
/**
* @author didi
*/
@Getter
public class ReplicaMetricEvent extends BaseMetricEvent{
private final List<ReplicationMetrics> replicationMetrics;
public ReplicaMetricEvent(Object source, List<ReplicationMetrics> replicationMetrics) {
super( source );
this.replicationMetrics = replicationMetrics;
}
}

View File

@@ -1,9 +1,7 @@
package com.xiaojukeji.know.streaming.km.core.service.replica;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ReplicationMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import java.util.List;
@@ -14,13 +12,4 @@ public interface ReplicaMetricService {
*/
Result<ReplicationMetrics> collectReplicaMetricsFromKafka(Long clusterId, String topic, Integer partitionId, Integer brokerId, String metric);
Result<ReplicationMetrics> collectReplicaMetricsFromKafka(Long clusterId, String topicName, Integer partitionId, Integer brokerId, List<String> metricNameList);
/**
* 从ES中获取指标
*/
@Deprecated
Result<List<MetricPointVO>> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, MetricDTO dto);
@Deprecated
Result<ReplicationMetrics> getLatestMetricsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, List<String> metricNames);
}

View File

@@ -2,7 +2,6 @@ package com.xiaojukeji.know.streaming.km.core.service.replica.impl;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ReplicationMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ReplicationMetricParam;
@@ -10,26 +9,21 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionJmxInfo;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.ReplicationMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_REPLICATION;
@@ -54,9 +48,6 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
@Autowired
private PartitionService partitionService;
@Autowired
private ReplicationMetricESDAO replicationMetricESDAO;
@Override
protected List<String> listMetricPOFields(){
return BeanUtil.listBeanFields(ReplicationMetricPO.class);
@@ -118,21 +109,6 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
}
}
@Override
public Result<List<MetricPointVO>> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, MetricDTO dto) {
Map<String/*metric*/, MetricPointVO> metricPointMap = replicationMetricESDAO.getReplicationMetricsPoint(clusterPhyId, topicName, brokerId, partitionId,
dto.getMetricsNames(), dto.getAggType(), dto.getStartTime(), dto.getEndTime());
List<MetricPointVO> metricPoints = new ArrayList<>(metricPointMap.values());
return Result.buildSuc(metricPoints);
}
@Override
public Result<ReplicationMetrics> getLatestMetricsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, List<String> metricNames) {
ReplicationMetricPO metricPO = replicationMetricESDAO.getReplicationLatestMetrics(clusterPhyId, brokerId, topicName, partitionId, metricNames);
return Result.buildSuc(ConvertUtil.obj2Obj(metricPO, ReplicationMetrics.class));
}
/**************************************************** private method ****************************************************/
private Result<ReplicationMetrics> doNothing(VersionItemParam param) {
ReplicationMetricParam metricParam = (ReplicationMetricParam)param;

View File

@@ -1,65 +0,0 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_replication_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "10"
}
},
"mappings" : {
"properties" : {
"brokerId" : {
"type" : "long"
},
"partitionId" : {
"type" : "long"
},
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"clusterPhyId" : {
"type" : "long"
},
"topic" : {
"type" : "keyword"
},
"metrics" : {
"properties" : {
"LogStartOffset" : {
"type" : "float"
},
"Messages" : {
"type" : "float"
},
"LogEndOffset" : {
"type" : "float"
}
}
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"index" : true,
"type" : "date",
"doc_values" : true
}
}
},
"aliases" : { }
}

View File

@@ -56,9 +56,6 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener<
GroupMetricEvent groupMetricEvent = (GroupMetricEvent)event;
sinkMetrics(groupMetric2SinkPoint(groupMetricEvent.getGroupMetrics()));
} else if(event instanceof ReplicaMetricEvent) {
ReplicaMetricEvent replicaMetricEvent = (ReplicaMetricEvent)event;
sinkMetrics(replicationMetric2SinkPoint(replicaMetricEvent.getReplicationMetrics()));
} else if(event instanceof ZookeeperMetricEvent) {
ZookeeperMetricEvent zookeeperMetricEvent = (ZookeeperMetricEvent)event;
sinkMetrics(zookeeperMetric2SinkPoint(zookeeperMetricEvent.getZookeeperMetrics()));

View File

@@ -1,95 +0,0 @@
package com.xiaojukeji.know.streaming.km.persistence.es.dao;
import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse;
import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.VALUE;
import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.REPLICATION_INDEX;
/**
* @author didi
*/
@Component
public class ReplicationMetricESDAO extends BaseMetricESDAO {
@PostConstruct
public void init() {
super.indexName = REPLICATION_INDEX;
checkCurrentDayIndexExist();
register(this);
}
/**
* 获取集群 clusterId 中 brokerId 最新的统计指标
*/
public ReplicationMetricPO getReplicationLatestMetrics(Long clusterPhyId, Integer brokerId, String topic,
Integer partitionId, List<String> metricNames){
Long endTime = getLatestMetricTime();
Long startTime = endTime - FIVE_MIN;
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslConstant.GET_REPLICATION_LATEST_METRICS, clusterPhyId, brokerId, topic, partitionId, startTime, endTime);
ReplicationMetricPO replicationMetricPO = esOpClient.performRequestAndTakeFirst(
realIndex(startTime, endTime), dsl, ReplicationMetricPO.class);
return (null == replicationMetricPO) ? new ReplicationMetricPO(clusterPhyId, topic, brokerId, partitionId)
: filterMetrics(replicationMetricPO, metricNames);
}
/**
* 获取集群 clusterPhyId 中每个 metric 的指定 partitionId 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值
*/
public Map<String/*metric*/, MetricPointVO> getReplicationMetricsPoint(Long clusterPhyId, String topic,
Integer brokerId, Integer partitionId, List<String> metrics,
String aggType, Long startTime, Long endTime){
//1、获取需要查下的索引
String realIndex = realIndex(startTime, endTime);
//2、构造agg查询条件
String aggDsl = buildAggsDSL(metrics, aggType);
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslConstant.GET_REPLICATION_AGG_SINGLE_METRICS, clusterPhyId, brokerId,topic, partitionId, startTime, endTime, aggDsl);
return esOpClient.performRequestWithRouting(String.valueOf(brokerId), realIndex, dsl,
s -> handleSingleESQueryResponse(s, metrics, aggType), 3);
}
/**************************************************** private method ****************************************************/
private Map<String/*metric*/, MetricPointVO> handleSingleESQueryResponse(ESQueryResponse response, List<String> metrics, String aggType){
Map<String/*metric*/, MetricPointVO> metricMap = new HashMap<>();
if(null == response || null == response.getAggs()){
return metricMap;
}
Map<String, ESAggr> esAggrMap = response.getAggs().getEsAggrMap();
if (null == esAggrMap) {
return metricMap;
}
for(String metric : metrics){
String value = esAggrMap.get(metric).getUnusedMap().get(VALUE).toString();
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setValue(value);
metricPoint.setName(metric);
metricMap.put(metric, metricPoint);
}
return metricMap;
}
}

View File

@@ -62,11 +62,6 @@ public class DslConstant {
public static final String LIST_PARTITION_LATEST_METRICS_BY_TOPIC = "PartitionMetricESDAO/listPartitionLatestMetricsByTopic";
/**************************************************** REPLICATION ****************************************************/
public static final String GET_REPLICATION_AGG_SINGLE_METRICS = "ReplicationMetricESDAO/getAggSingleReplicationMetrics";
public static final String GET_REPLICATION_LATEST_METRICS = "ReplicationMetricESDAO/getReplicationLatestMetrics";
/**************************************************** Group ****************************************************/
public static final String GET_GROUP_TOPIC_PARTITION = "GroupMetricESDAO/getTopicPartitionOfGroup";

View File

@@ -9,7 +9,6 @@ public class TemplateConstant {
public static final String BROKER_INDEX = "ks_kafka_broker_metric";
public static final String PARTITION_INDEX = "ks_kafka_partition_metric";
public static final String GROUP_INDEX = "ks_kafka_group_metric";
public static final String REPLICATION_INDEX = "ks_kafka_replication_metric";
public static final String ZOOKEEPER_INDEX = "ks_kafka_zookeeper_metric";
public static final String CONNECT_CLUSTER_INDEX = "ks_kafka_connect_cluster_metric";
public static final String CONNECT_CONNECTOR_INDEX = "ks_kafka_connect_connector_metric";

View File

@@ -1,48 +0,0 @@
{
"size":0,
"query":{
"bool":{
"must":[
{
"term":{
"clusterPhyId":{
"value":%d
}
}
},
{
"term":{
"brokerId":{
"value":%d
}
}
},
{
"term":{
"topic":{
"value":"%s"
}
}
},
{
"term":{
"partitionId":{
"value":%d
}
}
},
{
"range":{
"timestamp":{
"gte":%d,
"lte":%d
}
}
}
]
}
},
"aggs":{
%s
}
}

View File

@@ -1,52 +0,0 @@
{
"size": 1,
"query": {
"bool": {
"must": [
{
"term": {
"clusterPhyId": {
"value": %d
}
}
},
{
"term": {
"brokerId": {
"value": %d
}
}
},
{
"term": {
"topic": {
"value": "%s"
}
}
},
{
"term": {
"partitionId": {
"value": %d
}
}
},
{
"range": {
"timestamp": {
"gte": %d,
"lte": %d
}
}
}
]
}
},
"sort": [
{
"timestamp": {
"order": "desc"
}
}
]
}

View File

@@ -1,65 +0,0 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_replication_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "10"
}
},
"mappings" : {
"properties" : {
"brokerId" : {
"type" : "long"
},
"partitionId" : {
"type" : "long"
},
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"clusterPhyId" : {
"type" : "long"
},
"topic" : {
"type" : "keyword"
},
"metrics" : {
"properties" : {
"LogStartOffset" : {
"type" : "float"
},
"Messages" : {
"type" : "float"
},
"LogEndOffset" : {
"type" : "float"
}
}
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"index" : true,
"type" : "date",
"doc_values" : true
}
}
},
"aliases" : { }
}

View File

@@ -35,7 +35,7 @@ public class ReplicaMetricsController {
@PathVariable String topicName,
@PathVariable Integer partitionId,
@RequestBody MetricDTO dto) {
return replicationMetricService.getMetricPointsFromES(clusterPhyId, brokerId, topicName, partitionId, dto);
return Result.buildSuc();
}
@ApiOperation(value = "Replica指标-单个Replica")

View File

@@ -1,48 +0,0 @@
package com.xiaojukeji.know.streaming.km.persistence.es;
import com.xiaojukeji.know.streaming.km.KnowStreamApplicationTest;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.ReplicationMetricESDAO;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
public class ReplicationMetricESDAOTest extends KnowStreamApplicationTest {
@Autowired
private ReplicationMetricESDAO replicationMetricESDAO;
@Test
public void getReplicationLatestMetricsTest(){
Long clusterPhyId = 2L;
Integer brokerId = 1;
String topic = "know-streaming-test-251";
Integer partitionId = 1;
ReplicationMetricPO replicationMetricPO = replicationMetricESDAO.getReplicationLatestMetrics(
clusterPhyId, brokerId, topic, partitionId, new ArrayList<>());
assert null != replicationMetricPO;
}
/**
* 测试
* 获取集群 clusterPhyId 中每个 metric 的指定 partitionId 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值
*/
@Test
public void getReplicationMetricsPointTest(){
Long clusterPhyId = 2L;
Integer brokerId = 1;
String topic = "know-streaming-test-251";
Integer partitionId = 1;
Long endTime = System.currentTimeMillis();
Long startTime = endTime - 4 * 60 * 60 * 1000;
Map<String, MetricPointVO> metricPointVOMap = replicationMetricESDAO.getReplicationMetricsPoint(
clusterPhyId, topic, brokerId, partitionId, Collections.emptyList(), "avg", startTime, endTime);
assert null != metricPointVOMap;
}
}