mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
优化Broker列表JMX端口的返回值
This commit is contained in:
@@ -6,6 +6,8 @@ import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterBrokersManager;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterBrokersOverviewDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.JmxConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
|
||||
@@ -16,6 +18,8 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBroker
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.kafkacontroller.KafkaControllerVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterRunStateEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
@@ -24,6 +28,7 @@ import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -83,9 +88,13 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
||||
Map<Integer, Boolean> jmxConnectedMap = new HashMap<>();
|
||||
brokerList.forEach(elem -> jmxConnectedMap.put(elem.getBrokerId(), kafkaJMXClient.getClientWithCheck(clusterPhyId, elem.getBrokerId()) != null));
|
||||
|
||||
|
||||
ClusterPhy clusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
|
||||
|
||||
// 格式转换
|
||||
return PaginationResult.buildSuc(
|
||||
this.convert2ClusterBrokersOverviewVOList(
|
||||
clusterPhy,
|
||||
paginationResult.getData().getBizData(),
|
||||
brokerList,
|
||||
metricsResult.getData(),
|
||||
@@ -169,7 +178,8 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
||||
);
|
||||
}
|
||||
|
||||
private List<ClusterBrokersOverviewVO> convert2ClusterBrokersOverviewVOList(List<Integer> pagedBrokerIdList,
|
||||
private List<ClusterBrokersOverviewVO> convert2ClusterBrokersOverviewVOList(ClusterPhy clusterPhy,
|
||||
List<Integer> pagedBrokerIdList,
|
||||
List<Broker> brokerList,
|
||||
List<BrokerMetrics> metricsList,
|
||||
Topic groupTopic,
|
||||
@@ -185,9 +195,15 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
||||
Broker broker = brokerMap.get(brokerId);
|
||||
BrokerMetrics brokerMetrics = metricsMap.get(brokerId);
|
||||
Boolean jmxConnected = jmxConnectedMap.get(brokerId);
|
||||
|
||||
voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic, kafkaController, jmxConnected));
|
||||
}
|
||||
|
||||
//补充非zk模式的JMXPort信息
|
||||
if (!clusterPhy.getRunState().equals(ClusterRunStateEnum.RUN_ZK.getRunState())) {
|
||||
JmxConfig jmxConfig = ConvertUtil.str2ObjByJson(clusterPhy.getJmxProperties(), JmxConfig.class);
|
||||
voList.forEach(elem -> elem.setJmxPort(jmxConfig.getJmxPort() == null ? -1 : jmxConfig.getJmxPort()));
|
||||
}
|
||||
|
||||
return voList;
|
||||
}
|
||||
|
||||
|
||||
@@ -66,13 +66,13 @@ public class Broker implements Serializable {
|
||||
*/
|
||||
private Map<String, IpPortData> endpointMap;
|
||||
|
||||
public static Broker buildFrom(Long clusterPhyId, Node node, Long startTimestamp, JmxConfig jmxConfig) {
|
||||
public static Broker buildFrom(Long clusterPhyId, Node node, Long startTimestamp) {
|
||||
Broker metadata = new Broker();
|
||||
metadata.setClusterPhyId(clusterPhyId);
|
||||
metadata.setBrokerId(node.id());
|
||||
metadata.setHost(node.host());
|
||||
metadata.setPort(node.port());
|
||||
metadata.setJmxPort(jmxConfig != null ? jmxConfig.getJmxPort() : -1);
|
||||
metadata.setJmxPort(-1);
|
||||
metadata.setStartTimestamp(startTimestamp);
|
||||
metadata.setRack(node.rack());
|
||||
metadata.setStatus(1);
|
||||
|
||||
@@ -168,7 +168,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
|
||||
allBrokerList = this.listAllBrokersAndUpdateCache(clusterPhyId);
|
||||
}
|
||||
|
||||
return allBrokerList.stream().filter( elem -> elem.alive()).collect(Collectors.toList());
|
||||
return allBrokerList.stream().filter(elem -> elem.alive()).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -234,11 +234,10 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
|
||||
|
||||
@Override
|
||||
public String getBrokerVersionFromKafka(Long clusterId, Integer brokerId) {
|
||||
JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClient(clusterId, brokerId);
|
||||
if (ValidateUtils.isNull(jmxConnectorWrap) || !jmxConnectorWrap.checkJmxConnectionAndInitIfNeed()) {
|
||||
JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClientWithCheck(clusterId, brokerId);
|
||||
if (jmxConnectorWrap == null) {
|
||||
return "";
|
||||
}
|
||||
|
||||
try {
|
||||
return (String) jmxConnectorWrap.getAttribute(new ObjectName(JMX_SERVER_APP_INFO + ",id=" + brokerId), VERSION);
|
||||
} catch (Exception e) {
|
||||
@@ -331,7 +330,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
|
||||
|
||||
return Result.buildSuc(brokerList);
|
||||
} catch (Exception e) {
|
||||
log.error("class=BrokerServiceImpl||method=getBrokersFromZKClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
|
||||
log.error("method=getBrokersFromZKClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage());
|
||||
}
|
||||
@@ -353,7 +352,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
|
||||
|
||||
return Result.buildSuc(newBrokerList);
|
||||
} catch (Exception e) {
|
||||
log.error("class=BrokerServiceImpl||method=getBrokersFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
|
||||
log.error("method=getBrokersFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
|
||||
}
|
||||
@@ -361,13 +360,13 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
|
||||
|
||||
private Broker getStartTimeAndBuildBroker(Long clusterPhyId, Node newNode, JmxConfig jmxConfig) {
|
||||
try {
|
||||
Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), null, jmxConfig);
|
||||
Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), jmxConfig.getJmxPort(), jmxConfig);
|
||||
|
||||
return Broker.buildFrom(clusterPhyId, newNode, startTime, jmxConfig);
|
||||
return Broker.buildFrom(clusterPhyId, newNode, startTime);
|
||||
} catch (Exception e) {
|
||||
log.error("class=BrokerServiceImpl||method=getStartTimeAndBuildBroker||clusterPhyId={}||brokerNode={}||jmxConfig={}||errMsg=exception!", clusterPhyId, newNode, jmxConfig, e);
|
||||
log.error("method=getStartTimeAndBuildBroker||clusterPhyId={}||brokerNode={}||jmxConfig={}||errMsg=exception!", clusterPhyId, newNode, jmxConfig, e);
|
||||
}
|
||||
|
||||
return Broker.buildFrom(clusterPhyId, newNode, null, jmxConfig);
|
||||
return Broker.buildFrom(clusterPhyId, newNode, null);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user