mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
[Optimize] 集群Broker列表中,补充Jmx是否成功连接的信息
1、当前页面无数据时,一部分的原因是JMX连接失败导致; 2、Broker列表中增加是否连接成功的信息,便于问题的排查;
This commit is contained in:
@@ -24,6 +24,7 @@ import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
|
|||||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
|
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||||
|
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
@@ -51,6 +52,9 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private KafkaControllerService kafkaControllerService;
|
private KafkaControllerService kafkaControllerService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private KafkaJMXClient kafkaJMXClient;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(Long clusterPhyId, ClusterBrokersOverviewDTO dto) {
|
public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(Long clusterPhyId, ClusterBrokersOverviewDTO dto) {
|
||||||
// 获取集群Broker列表
|
// 获取集群Broker列表
|
||||||
@@ -75,6 +79,10 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
|||||||
//获取controller信息
|
//获取controller信息
|
||||||
KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId);
|
KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId);
|
||||||
|
|
||||||
|
//获取jmx状态信息
|
||||||
|
Map<Integer, Boolean> jmxConnectedMap = new HashMap<>();
|
||||||
|
brokerList.forEach(elem -> jmxConnectedMap.put(elem.getBrokerId(), kafkaJMXClient.getClientWithCheck(clusterPhyId, elem.getBrokerId()) != null));
|
||||||
|
|
||||||
// 格式转换
|
// 格式转换
|
||||||
return PaginationResult.buildSuc(
|
return PaginationResult.buildSuc(
|
||||||
this.convert2ClusterBrokersOverviewVOList(
|
this.convert2ClusterBrokersOverviewVOList(
|
||||||
@@ -83,7 +91,8 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
|||||||
metricsResult.getData(),
|
metricsResult.getData(),
|
||||||
groupTopic,
|
groupTopic,
|
||||||
transactionTopic,
|
transactionTopic,
|
||||||
kafkaController
|
kafkaController,
|
||||||
|
jmxConnectedMap
|
||||||
),
|
),
|
||||||
paginationResult
|
paginationResult
|
||||||
);
|
);
|
||||||
@@ -165,7 +174,8 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
|||||||
List<BrokerMetrics> metricsList,
|
List<BrokerMetrics> metricsList,
|
||||||
Topic groupTopic,
|
Topic groupTopic,
|
||||||
Topic transactionTopic,
|
Topic transactionTopic,
|
||||||
KafkaController kafkaController) {
|
KafkaController kafkaController,
|
||||||
|
Map<Integer, Boolean> jmxConnectedMap) {
|
||||||
Map<Integer, BrokerMetrics> metricsMap = metricsList == null ? new HashMap<>() : metricsList.stream().collect(Collectors.toMap(BrokerMetrics::getBrokerId, Function.identity()));
|
Map<Integer, BrokerMetrics> metricsMap = metricsList == null ? new HashMap<>() : metricsList.stream().collect(Collectors.toMap(BrokerMetrics::getBrokerId, Function.identity()));
|
||||||
|
|
||||||
Map<Integer, Broker> brokerMap = brokerList == null ? new HashMap<>() : brokerList.stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity()));
|
Map<Integer, Broker> brokerMap = brokerList == null ? new HashMap<>() : brokerList.stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity()));
|
||||||
@@ -174,13 +184,14 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
|||||||
for (Integer brokerId : pagedBrokerIdList) {
|
for (Integer brokerId : pagedBrokerIdList) {
|
||||||
Broker broker = brokerMap.get(brokerId);
|
Broker broker = brokerMap.get(brokerId);
|
||||||
BrokerMetrics brokerMetrics = metricsMap.get(brokerId);
|
BrokerMetrics brokerMetrics = metricsMap.get(brokerId);
|
||||||
|
Boolean jmxConnected = jmxConnectedMap.get(brokerId);
|
||||||
|
|
||||||
voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic, kafkaController));
|
voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic, kafkaController, jmxConnected));
|
||||||
}
|
}
|
||||||
return voList;
|
return voList;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer brokerId, Broker broker, BrokerMetrics brokerMetrics, Topic groupTopic, Topic transactionTopic, KafkaController kafkaController) {
|
private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer brokerId, Broker broker, BrokerMetrics brokerMetrics, Topic groupTopic, Topic transactionTopic, KafkaController kafkaController, Boolean jmxConnected) {
|
||||||
ClusterBrokersOverviewVO clusterBrokersOverviewVO = new ClusterBrokersOverviewVO();
|
ClusterBrokersOverviewVO clusterBrokersOverviewVO = new ClusterBrokersOverviewVO();
|
||||||
clusterBrokersOverviewVO.setBrokerId(brokerId);
|
clusterBrokersOverviewVO.setBrokerId(brokerId);
|
||||||
if (broker != null) {
|
if (broker != null) {
|
||||||
@@ -203,6 +214,7 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
clusterBrokersOverviewVO.setLatestMetrics(brokerMetrics);
|
clusterBrokersOverviewVO.setLatestMetrics(brokerMetrics);
|
||||||
|
clusterBrokersOverviewVO.setJmxConnected(jmxConnected);
|
||||||
return clusterBrokersOverviewVO;
|
return clusterBrokersOverviewVO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -31,6 +31,9 @@ public class ClusterBrokersOverviewVO extends BrokerMetadataVO {
|
|||||||
@ApiModelProperty(value = "jmx端口")
|
@ApiModelProperty(value = "jmx端口")
|
||||||
private Integer jmxPort;
|
private Integer jmxPort;
|
||||||
|
|
||||||
|
@ApiModelProperty(value = "jmx连接状态 true:连接成功 false:连接失败")
|
||||||
|
private Boolean jmxConnected;
|
||||||
|
|
||||||
@ApiModelProperty(value = "是否存活 true:存活 false:不存活")
|
@ApiModelProperty(value = "是否存活 true:存活 false:不存活")
|
||||||
private Boolean alive;
|
private Boolean alive;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user