mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
[ISSUE #677] 重启会导致部分信息采集抛出空指针
This commit is contained in:
@@ -207,11 +207,12 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
|
||||
}
|
||||
|
||||
for(String metric : metrics){
|
||||
String value = esAggrMap.get(metric).getUnusedMap().get(VALUE).toString();
|
||||
Object value = esAggrMap.get(metric).getUnusedMap().get(VALUE);
|
||||
if(null == value){continue;}
|
||||
|
||||
MetricPointVO metricPoint = new MetricPointVO();
|
||||
metricPoint.setAggType(aggType);
|
||||
metricPoint.setValue(value);
|
||||
metricPoint.setValue(value.toString());
|
||||
metricPoint.setName(metric);
|
||||
|
||||
metricMap.put(metric, metricPoint);
|
||||
@@ -243,12 +244,13 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
|
||||
try {
|
||||
if (null != esBucket.getUnusedMap().get(KEY)) {
|
||||
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
|
||||
String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString();
|
||||
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
|
||||
if(null == value){return;}
|
||||
|
||||
MetricPointVO metricPoint = new MetricPointVO();
|
||||
metricPoint.setAggType(aggType);
|
||||
metricPoint.setTimeStamp(timestamp);
|
||||
metricPoint.setValue(value);
|
||||
metricPoint.setValue(value.toString());
|
||||
metricPoint.setName(metric);
|
||||
|
||||
metricPoints.add(metricPoint);
|
||||
@@ -290,13 +292,14 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
|
||||
try {
|
||||
if (null != esBucket.getUnusedMap().get(KEY)) {
|
||||
Long brokerId = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
|
||||
Double value = Double.valueOf(esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap()
|
||||
.get(metric).getUnusedMap().get(VALUE).toString());
|
||||
Object value = esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap()
|
||||
.get(metric).getUnusedMap().get(VALUE);
|
||||
if(null == value){return;}
|
||||
|
||||
List<Tuple<Long, Double>> brokerValue = (null == metricBrokerValueMap.get(metric)) ?
|
||||
new ArrayList<>() : metricBrokerValueMap.get(metric);
|
||||
|
||||
brokerValue.add(new Tuple<>(brokerId, value));
|
||||
brokerValue.add(new Tuple<>(brokerId, Double.valueOf(value.toString())));
|
||||
metricBrokerValueMap.put(metric, brokerValue);
|
||||
}
|
||||
}catch (Exception e){
|
||||
|
||||
@@ -169,11 +169,12 @@ public class ClusterMetricESDAO extends BaseMetricESDAO {
|
||||
}
|
||||
|
||||
for(String metric : metrics){
|
||||
String value = esAggrMap.get(metric).getUnusedMap().get(VALUE).toString();
|
||||
Object value = esAggrMap.get(metric).getUnusedMap().get(VALUE);
|
||||
if(null == value){continue;}
|
||||
|
||||
MetricPointVO metricPoint = new MetricPointVO();
|
||||
metricPoint.setAggType(aggType);
|
||||
metricPoint.setValue(value);
|
||||
metricPoint.setValue(value.toString());
|
||||
metricPoint.setName(metric);
|
||||
|
||||
metricMap.put(metric, metricPoint);
|
||||
@@ -194,12 +195,13 @@ public class ClusterMetricESDAO extends BaseMetricESDAO {
|
||||
try {
|
||||
if (null != esBucket.getUnusedMap().get(KEY)) {
|
||||
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
|
||||
String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString();
|
||||
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
|
||||
if(null == value){return;}
|
||||
|
||||
MetricPointVO metricPoint = new MetricPointVO();
|
||||
metricPoint.setAggType(aggType);
|
||||
metricPoint.setTimeStamp(timestamp);
|
||||
metricPoint.setValue(value);
|
||||
metricPoint.setValue(value.toString());
|
||||
metricPoint.setName(metric);
|
||||
|
||||
metricPoints.add(metricPoint);
|
||||
|
||||
@@ -173,8 +173,9 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
|
||||
}
|
||||
|
||||
for(String metric : metrics){
|
||||
String value = esAggrMap.get(metric).getUnusedMap().get(VALUE).toString();
|
||||
groupMetricPO.getMetrics().put(metric, Float.valueOf(value));
|
||||
Object value = esAggrMap.get(metric).getUnusedMap().get(VALUE);
|
||||
if(value == null){continue;}
|
||||
groupMetricPO.getMetrics().put(metric, Float.parseFloat(value.toString()));
|
||||
}
|
||||
|
||||
return groupMetricPO;
|
||||
@@ -192,12 +193,13 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
|
||||
try {
|
||||
if (null != esBucket.getUnusedMap().get(KEY)) {
|
||||
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
|
||||
String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString();
|
||||
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
|
||||
if(value == null){return;}
|
||||
|
||||
MetricPointVO metricPoint = new MetricPointVO();
|
||||
metricPoint.setAggType(aggType);
|
||||
metricPoint.setTimeStamp(timestamp);
|
||||
metricPoint.setValue(value);
|
||||
metricPoint.setValue(value.toString());
|
||||
metricPoint.setName(metric);
|
||||
|
||||
metricPoints.add(metricPoint);
|
||||
|
||||
@@ -337,12 +337,13 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
|
||||
try {
|
||||
if (null != esBucket.getUnusedMap().get(KEY)) {
|
||||
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
|
||||
String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString();
|
||||
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
|
||||
if(value == null){return;}
|
||||
|
||||
MetricPointVO metricPoint = new MetricPointVO();
|
||||
metricPoint.setAggType(aggType);
|
||||
metricPoint.setTimeStamp(timestamp);
|
||||
metricPoint.setValue(value);
|
||||
metricPoint.setValue(value.toString());
|
||||
metricPoint.setName(metric);
|
||||
|
||||
metricPoints.add(metricPoint);
|
||||
|
||||
@@ -5,13 +5,13 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchTerm;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchPage;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchRange;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchSort;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.ClusterMetricESDAO;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
|
||||
public class ClusterMetricESDAOTest extends KnowStreamApplicationTest {
|
||||
|
||||
@@ -28,6 +28,53 @@ public class ClusterMetricESDAOTest extends KnowStreamApplicationTest {
|
||||
clusterMetricESDAO.listClusterMetricsByClusterIds(metrics, "avg", clusterIds, startTime, endTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* 测试
|
||||
* 获取集群 clusterPhyId 中每个 metric 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值
|
||||
*/
|
||||
@Test
|
||||
public void getClusterMetricsPointTest(){
|
||||
Long clusterId = 1L;
|
||||
List<String> metrics = Arrays.asList(
|
||||
"Connections", "BytesIn_min_15", "PartitionURP",
|
||||
"HealthScore_Topics", "EventQueueSize", "ActiveControllerCount",
|
||||
"GroupDeads", "BytesIn_min_5", "HealthCheckTotal_Topics",
|
||||
"Partitions", "BytesOut", "Groups",
|
||||
"BytesOut_min_15", "TotalRequestQueueSize", "HealthCheckPassed_Groups",
|
||||
"TotalProduceRequests", "HealthCheckPassed", "TotalLogSize",
|
||||
"GroupEmptys", "PartitionNoLeader", "HealthScore_Brokers",
|
||||
"Messages", "Topics", "PartitionMinISR_E",
|
||||
"HealthCheckTotal", "Brokers", "Replicas",
|
||||
"HealthCheckTotal_Groups", "GroupRebalances", "MessageIn",
|
||||
"HealthScore", "HealthCheckPassed_Topics", "HealthCheckTotal_Brokers",
|
||||
"PartitionMinISR_S", "BytesIn", "BytesOut_min_5",
|
||||
"GroupActives", "MessagesIn", "GroupReBalances",
|
||||
"HealthCheckPassed_Brokers", "HealthScore_Groups", "TotalResponseQueueSize",
|
||||
"Zookeepers", "LeaderMessages", "HealthScore_Cluster",
|
||||
"HealthCheckPassed_Cluster", "HealthCheckTotal_Cluster");
|
||||
Long endTime = System.currentTimeMillis();
|
||||
Long startTime = endTime - 4 * 60 * 60 * 1000;
|
||||
|
||||
Map<String/*metric*/, MetricPointVO> metricPointVOS = clusterMetricESDAO.getClusterMetricsPoint(
|
||||
clusterId, metrics, "avg", startTime, endTime);
|
||||
|
||||
assert null != metricPointVOS;
|
||||
}
|
||||
|
||||
/**
|
||||
* 测试
|
||||
* 获取集群 clusterId 最新的统计指标
|
||||
*/
|
||||
@Test
|
||||
public void getClusterLatestMetricsTest(){
|
||||
Long clusterId = 1L;
|
||||
List<String> metrics = Collections.emptyList();
|
||||
|
||||
ClusterMetricPO clusterLatestMetrics = clusterMetricESDAO.getClusterLatestMetrics(clusterId, metrics);
|
||||
|
||||
assert null != clusterLatestMetrics;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pagingClusterWithLatestMetricsTest(){
|
||||
List<Long> clusterIds = new ArrayList<>();
|
||||
|
||||
@@ -2,11 +2,14 @@ package com.xiaojukeji.know.streaming.km.persistence.es;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.KnowStreamApplicationTest;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.ReplicationMetricESDAO;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
public class ReplicationMetricESDAOTest extends KnowStreamApplicationTest {
|
||||
|
||||
@@ -15,7 +18,7 @@ public class ReplicationMetricESDAOTest extends KnowStreamApplicationTest {
|
||||
|
||||
@Test
|
||||
public void getReplicationLatestMetricsTest(){
|
||||
Long clusterPhyId = 2l;
|
||||
Long clusterPhyId = 2L;
|
||||
Integer brokerId = 1;
|
||||
String topic = "know-streaming-test-251";
|
||||
Integer partitionId = 1;
|
||||
@@ -24,4 +27,22 @@ public class ReplicationMetricESDAOTest extends KnowStreamApplicationTest {
|
||||
|
||||
assert null != replicationMetricPO;
|
||||
}
|
||||
|
||||
/**
|
||||
* 测试
|
||||
* 获取集群 clusterPhyId 中每个 metric 的指定 partitionId 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值
|
||||
*/
|
||||
@Test
|
||||
public void getReplicationMetricsPointTest(){
|
||||
Long clusterPhyId = 2L;
|
||||
Integer brokerId = 1;
|
||||
String topic = "know-streaming-test-251";
|
||||
Integer partitionId = 1;
|
||||
Long endTime = System.currentTimeMillis();
|
||||
Long startTime = endTime - 4 * 60 * 60 * 1000;
|
||||
Map<String, MetricPointVO> metricPointVOMap = replicationMetricESDAO.getReplicationMetricsPoint(
|
||||
clusterPhyId, topic, brokerId, partitionId, Collections.emptyList(), "avg", startTime, endTime);
|
||||
|
||||
assert null != metricPointVOMap;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@ import org.springframework.util.CollectionUtils;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
|
||||
|
||||
@@ -38,14 +37,20 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
|
||||
@Test
|
||||
public void getTopicsAggsMetricsValueTest(){
|
||||
Long clusterId = 2L;
|
||||
String topic = "know-streaming-test-251";
|
||||
String topic1 = "topic_test01";
|
||||
List<String> metrics = Arrays.asList("BytesIn", "BytesIn_min_5");
|
||||
List<String> topicList = Arrays.asList("know-streaming-test-251", "topic_test01");
|
||||
List<String> metrics = Arrays.asList(
|
||||
"Messages", "BytesIn_min_15", "BytesRejected",
|
||||
"PartitionURP", "HealthCheckTotal", "ReplicationCount",
|
||||
"CollectMetricsCostTimeUnitSec", "FailedFetchRequests", "BytesIn_min_5",
|
||||
"HealthScore", "LogSize", "BytesOut",
|
||||
"FailedProduceRequests", "BytesOut_min_15", "BytesIn",
|
||||
"BytesOut_min_5", "MessagesIn", "TotalProduceRequests",
|
||||
"HealthCheckPassed");
|
||||
Long endTime = System.currentTimeMillis();
|
||||
Long startTime = endTime - 4 * 60 * 60 * 1000;
|
||||
|
||||
Table<String/*topics*/, String/*metric*/, MetricPointVO> ret = topicMetricESDAO.getTopicsAggsMetricsValue(
|
||||
clusterId, Arrays.asList(topic, topic1), metrics, "max", startTime, endTime);
|
||||
clusterId, topicList, metrics, "max", startTime, endTime);
|
||||
assert null != ret;
|
||||
}
|
||||
|
||||
@@ -90,7 +95,14 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
|
||||
String topic = "know-streaming-test-251";
|
||||
String topic1 = "know-streaming-123";
|
||||
String topic2 = "1209test";
|
||||
List<String> metrics = Arrays.asList("BytesIn", "BytesIn_min_5");
|
||||
List<String> metrics = Arrays.asList(
|
||||
"Messages", "BytesIn_min_15", "BytesRejected",
|
||||
"PartitionURP", "HealthCheckTotal", "ReplicationCount",
|
||||
"CollectMetricsCostTimeUnitSec", "FailedFetchRequests", "BytesIn_min_5",
|
||||
"HealthScore", "LogSize", "BytesOut",
|
||||
"FailedProduceRequests", "BytesOut_min_15", "BytesIn",
|
||||
"BytesOut_min_5", "MessagesIn", "TotalProduceRequests",
|
||||
"HealthCheckPassed");
|
||||
|
||||
|
||||
List<TopicMetricPO> topicMetricPO = topicMetricESDAO.listTopicLatestMetric(clusterId, Arrays.asList(topic,topic1,topic2), metrics);
|
||||
@@ -101,7 +113,14 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
|
||||
@Test
|
||||
public void listBrokerMetricsByTopicsTest(){
|
||||
Long clusterId = 2L;
|
||||
List<String> metrics = Arrays.asList("BytesIn", "BytesIn_min_5");
|
||||
List<String> metrics = Arrays.asList(
|
||||
"Messages", "BytesIn_min_15", "BytesRejected",
|
||||
"PartitionURP", "HealthCheckTotal", "ReplicationCount",
|
||||
"CollectMetricsCostTimeUnitSec", "FailedFetchRequests", "BytesIn_min_5",
|
||||
"HealthScore", "LogSize", "BytesOut",
|
||||
"FailedProduceRequests", "BytesOut_min_15", "BytesIn",
|
||||
"BytesOut_min_5", "MessagesIn", "TotalProduceRequests",
|
||||
"HealthCheckPassed");
|
||||
List<String> topics = Arrays.asList("QAtest_1_13", "__consumer_offsets");
|
||||
Long endTime = System.currentTimeMillis();
|
||||
Long startTime = endTime - 4 * 60 * 60 * 1000;
|
||||
|
||||
Reference in New Issue
Block a user