mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
集群Broker列表,增加Controller角色信息
This commit is contained in:
@@ -14,6 +14,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersOverviewVO;
|
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersOverviewVO;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersStateVO;
|
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersStateVO;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.kafkacontroller.KafkaControllerVO;
|
import com.xiaojukeji.know.streaming.km.common.bean.vo.kafkacontroller.KafkaControllerVO;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
|
import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
|
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
|
||||||
@@ -71,6 +72,9 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
|||||||
Topic groupTopic = topicService.getTopic(clusterPhyId, org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME);
|
Topic groupTopic = topicService.getTopic(clusterPhyId, org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME);
|
||||||
Topic transactionTopic = topicService.getTopic(clusterPhyId, org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME);
|
Topic transactionTopic = topicService.getTopic(clusterPhyId, org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME);
|
||||||
|
|
||||||
|
//获取controller信息
|
||||||
|
KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId);
|
||||||
|
|
||||||
// 格式转换
|
// 格式转换
|
||||||
return PaginationResult.buildSuc(
|
return PaginationResult.buildSuc(
|
||||||
this.convert2ClusterBrokersOverviewVOList(
|
this.convert2ClusterBrokersOverviewVOList(
|
||||||
@@ -78,7 +82,8 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
|||||||
brokerList,
|
brokerList,
|
||||||
metricsResult.getData(),
|
metricsResult.getData(),
|
||||||
groupTopic,
|
groupTopic,
|
||||||
transactionTopic
|
transactionTopic,
|
||||||
|
kafkaController
|
||||||
),
|
),
|
||||||
paginationResult
|
paginationResult
|
||||||
);
|
);
|
||||||
@@ -159,7 +164,8 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
|||||||
List<Broker> brokerList,
|
List<Broker> brokerList,
|
||||||
List<BrokerMetrics> metricsList,
|
List<BrokerMetrics> metricsList,
|
||||||
Topic groupTopic,
|
Topic groupTopic,
|
||||||
Topic transactionTopic) {
|
Topic transactionTopic,
|
||||||
|
KafkaController kafkaController) {
|
||||||
Map<Integer, BrokerMetrics> metricsMap = metricsList == null? new HashMap<>(): metricsList.stream().collect(Collectors.toMap(BrokerMetrics::getBrokerId, Function.identity()));
|
Map<Integer, BrokerMetrics> metricsMap = metricsList == null? new HashMap<>(): metricsList.stream().collect(Collectors.toMap(BrokerMetrics::getBrokerId, Function.identity()));
|
||||||
|
|
||||||
Map<Integer, Broker> brokerMap = brokerList == null? new HashMap<>(): brokerList.stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity()));
|
Map<Integer, Broker> brokerMap = brokerList == null? new HashMap<>(): brokerList.stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity()));
|
||||||
@@ -169,12 +175,12 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
|||||||
Broker broker = brokerMap.get(brokerId);
|
Broker broker = brokerMap.get(brokerId);
|
||||||
BrokerMetrics brokerMetrics = metricsMap.get(brokerId);
|
BrokerMetrics brokerMetrics = metricsMap.get(brokerId);
|
||||||
|
|
||||||
voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic));
|
voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic, kafkaController));
|
||||||
}
|
}
|
||||||
return voList;
|
return voList;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer brokerId, Broker broker, BrokerMetrics brokerMetrics, Topic groupTopic, Topic transactionTopic) {
|
private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer brokerId, Broker broker, BrokerMetrics brokerMetrics, Topic groupTopic, Topic transactionTopic, KafkaController kafkaController) {
|
||||||
ClusterBrokersOverviewVO clusterBrokersOverviewVO = new ClusterBrokersOverviewVO();
|
ClusterBrokersOverviewVO clusterBrokersOverviewVO = new ClusterBrokersOverviewVO();
|
||||||
clusterBrokersOverviewVO.setBrokerId(brokerId);
|
clusterBrokersOverviewVO.setBrokerId(brokerId);
|
||||||
if (broker != null) {
|
if (broker != null) {
|
||||||
@@ -192,6 +198,9 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
|||||||
if (transactionTopic != null && transactionTopic.getBrokerIdSet().contains(brokerId)) {
|
if (transactionTopic != null && transactionTopic.getBrokerIdSet().contains(brokerId)) {
|
||||||
clusterBrokersOverviewVO.getKafkaRoleList().add(transactionTopic.getTopicName());
|
clusterBrokersOverviewVO.getKafkaRoleList().add(transactionTopic.getTopicName());
|
||||||
}
|
}
|
||||||
|
if (kafkaController != null && kafkaController.getBrokerId().equals(brokerId)) {
|
||||||
|
clusterBrokersOverviewVO.getKafkaRoleList().add(KafkaConstant.CONTROLLER_ROLE);
|
||||||
|
}
|
||||||
|
|
||||||
clusterBrokersOverviewVO.setLatestMetrics(brokerMetrics);
|
clusterBrokersOverviewVO.setLatestMetrics(brokerMetrics);
|
||||||
return clusterBrokersOverviewVO;
|
return clusterBrokersOverviewVO;
|
||||||
|
|||||||
@@ -41,6 +41,8 @@ public class KafkaConstant {
|
|||||||
|
|
||||||
public static final Long POLL_ONCE_TIMEOUT_UNIT_MS = 2000L;
|
public static final Long POLL_ONCE_TIMEOUT_UNIT_MS = 2000L;
|
||||||
|
|
||||||
|
public static final String CONTROLLER_ROLE = "controller";
|
||||||
|
|
||||||
public static final Map<String, ConfigDef.ConfigKey> KAFKA_ALL_CONFIG_DEF_MAP = new ConcurrentHashMap<>();
|
public static final Map<String, ConfigDef.ConfigKey> KAFKA_ALL_CONFIG_DEF_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
static {
|
static {
|
||||||
|
|||||||
Reference in New Issue
Block a user