From a0312be4fd6ca559bb952ad91b2b9940ac18a2a1 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Sat, 3 Sep 2022 08:26:50 +0800 Subject: [PATCH] =?UTF-8?q?Jmx=E8=BF=9E=E6=8E=A5=E7=9A=84=E4=B8=BB?= =?UTF-8?q?=E6=9C=BAIP=E6=94=AF=E6=8C=81=E5=8F=AF=E9=80=89=E6=8B=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/install_guide/版本升级手册.md | 26 ++++++++++++++- .../km/common/bean/entity/broker/Broker.java | 33 +++++++++++++++++++ .../common/bean/entity/config/JmxConfig.java | 3 ++ .../km/common/bean/po/broker/BrokerPO.java | 5 +++ .../broker/impl/BrokerServiceImpl.java | 8 +++-- km-dist/init/sql/ddl-ks-km.sql | 1 + .../km/persistence/kafka/KafkaJMXClient.java | 6 ++-- .../persistence/mysql/broker/BrokerDAO.java | 1 - .../main/resources/mybatis/BrokerMapper.xml | 7 +--- 9 files changed, 76 insertions(+), 14 deletions(-) diff --git a/docs/install_guide/版本升级手册.md b/docs/install_guide/版本升级手册.md index 621d90bc..e952ef95 100644 --- a/docs/install_guide/版本升级手册.md +++ b/docs/install_guide/版本升级手册.md @@ -1,6 +1,30 @@ ## 6.2、版本升级手册 -**`2.x`版本 升级至 `3.0.0`版本** +注意:如果想升级至具体版本,需要将你当前版本至你期望使用版本的变更统统执行一遍,然后才能正常使用。 + +`master` 版本,需要 + +### 6.2.0、升级至 `master` 版本 + + +**SQL变更** + +1、在`ks_km_broker`表增加了一个监听信息字段。 +2、为`logi_security_oplog`表operation_methods字段设置默认值''。 +因此需要执行下面的sql对数据库表进行更新。 + +```sql +ALTER TABLE `ks_km_broker` +ADD COLUMN `endpoint_map` VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '监听信息' AFTER `update_time`; + +ALTER TABLE `logi_security_oplog` +ALTER COLUMN `operation_methods` set default ''; + +``` + +--- + +### 6.2.1、`2.x`版本 升级至 `3.0.0`版本 **升级步骤:** diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java index 916820d8..d7e3b792 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java @@ -1,5 +1,10 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.broker; + +import com.alibaba.fastjson.TypeReference; +import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData; +import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata; import lombok.AllArgsConstructor; import lombok.Data; @@ -7,6 +12,7 @@ import lombok.NoArgsConstructor; import org.apache.kafka.common.Node; import java.io.Serializable; +import java.util.Map; /** * @author didi @@ -55,6 +61,11 @@ public class Broker implements Serializable { */ private Integer status; + /** + * 监听信息 + */ + private Map endpointMap; + public static Broker buildFrom(Long clusterPhyId, Node node, Long startTimestamp) { Broker metadata = new Broker(); metadata.setClusterPhyId(clusterPhyId); @@ -78,9 +89,31 @@ public class Broker implements Serializable { metadata.setStartTimestamp(brokerMetadata.getTimestamp()); metadata.setRack(brokerMetadata.getRack()); metadata.setStatus(1); + metadata.setEndpointMap(brokerMetadata.getEndpointMap()); return metadata; } + public static Broker buildFrom(BrokerPO brokerPO) { + Broker broker = ConvertUtil.obj2Obj(brokerPO, Broker.class); + String endpointMapStr = brokerPO.getEndpointMap(); + if (broker == null || endpointMapStr == null || endpointMapStr.equals("")) { + return broker; + } + + // 填充endpoint信息 + Map endpointMap = ConvertUtil.str2ObjByJson(endpointMapStr, new TypeReference>(){}); + broker.setEndpointMap(endpointMap); + return broker; + } + + public String getJmxHost(String endPoint) { + if (endPoint == null || endpointMap == null) { + return host; + } + IpPortData ip = endpointMap.get(endPoint); + return ip == null ? ip.getIp() : host; + } + public boolean alive() { return status != null && status > 0; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java index 5c78183c..87607c1f 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java @@ -27,6 +27,9 @@ public class JmxConfig implements Serializable { @ApiModelProperty(value="SSL情况下的token", example = "KsKmCCY19") private String token; + + @ApiModelProperty(value="使用哪个endpoint网络", example = "EXTERNAL") + private String useWhichEndpoint; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/broker/BrokerPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/broker/BrokerPO.java index 2f50480d..16f98d88 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/broker/BrokerPO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/broker/BrokerPO.java @@ -42,4 +42,9 @@ public class BrokerPO extends BasePO { * Broker状态 */ private Integer status; + + /** + * 监听信息 + */ + private String endpointMap; } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index e9f8c933..dc702388 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -130,6 +130,9 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok // 如果当前Broker还存活,则更新DB信息 BrokerPO newBrokerPO = ConvertUtil.obj2Obj(presentAliveBroker, BrokerPO.class); + if (presentAliveBroker.getEndpointMap() != null) { + newBrokerPO.setEndpointMap(ConvertUtil.obj2Json(presentAliveBroker.getEndpointMap())); + } newBrokerPO.setId(inDBBrokerPO.getId()); newBrokerPO.setStatus(Constant.ALIVE); newBrokerPO.setCreateTime(inDBBrokerPO.getCreateTime()); @@ -203,7 +206,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok lambdaQueryWrapper.eq(BrokerPO::getClusterPhyId, clusterPhyId); lambdaQueryWrapper.eq(BrokerPO::getBrokerId, brokerId); - return ConvertUtil.obj2Obj(brokerDAO.selectOne(lambdaQueryWrapper), Broker.class); + return Broker.buildFrom(brokerDAO.selectOne(lambdaQueryWrapper)); } @Override @@ -272,9 +275,8 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok /**************************************************** private method ****************************************************/ private List listAllBrokersAndUpdateCache(Long clusterPhyId) { - List allBrokerList = ConvertUtil.list2List(this.getAllBrokerPOsFromDB(clusterPhyId), Broker.class); + List allBrokerList = getAllBrokerPOsFromDB(clusterPhyId).stream().map(elem -> Broker.buildFrom(elem)).collect(Collectors.toList()); brokersCache.put(clusterPhyId, allBrokerList); - return allBrokerList; } diff --git a/km-dist/init/sql/ddl-ks-km.sql b/km-dist/init/sql/ddl-ks-km.sql index 96de4ed1..50696917 100644 --- a/km-dist/init/sql/ddl-ks-km.sql +++ b/km-dist/init/sql/ddl-ks-km.sql @@ -13,6 +13,7 @@ CREATE TABLE `ks_km_broker` ( `status` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1存活,0未存活', `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + `endpoint_map` varchar(1024) NOT NULL DEFAULT '' COMMENT '监听信息', PRIMARY KEY (`id`), UNIQUE KEY `uniq_cluster_phy_id_broker_id` (`cluster_phy_id`,`broker_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Broker信息表'; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java index afa904af..68d1011e 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java @@ -165,8 +165,8 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler { clusterPhy.getId(), brokerId, broker.getStartTimestamp(), - broker.getHost(), - broker.getJmxPort() != null? broker.getJmxPort(): jmxConfig.getJmxPort(), + jmxConfig != null ? broker.getJmxHost(jmxConfig.getUseWhichEndpoint()) : broker.getHost(), + broker.getJmxPort() != null ? broker.getJmxPort() : jmxConfig.getJmxPort(), jmxConfig ); @@ -191,6 +191,6 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler { lambdaQueryWrapper.eq(BrokerPO::getStatus, Constant.ALIVE); BrokerPO brokerPO = brokerDAO.selectOne(lambdaQueryWrapper); - return ConvertUtil.obj2Obj(brokerPO, Broker.class); + return Broker.buildFrom(brokerPO); } } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/broker/BrokerDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/broker/BrokerDAO.java index c05a66ad..5169bbad 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/broker/BrokerDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/broker/BrokerDAO.java @@ -6,5 +6,4 @@ import org.springframework.stereotype.Repository; @Repository public interface BrokerDAO extends BaseMapper { - int replace(BrokerPO brokerPO); } diff --git a/km-persistence/src/main/resources/mybatis/BrokerMapper.xml b/km-persistence/src/main/resources/mybatis/BrokerMapper.xml index 360fe9c8..3d9f5d8f 100644 --- a/km-persistence/src/main/resources/mybatis/BrokerMapper.xml +++ b/km-persistence/src/main/resources/mybatis/BrokerMapper.xml @@ -14,12 +14,7 @@ + - - REPLACE ks_km_broker - (cluster_phy_id, broker_id, host, port, jmx_port, start_timestamp, status, update_time) - VALUES - (#{clusterPhyId}, #{brokerId}, #{host}, #{port}, #{jmxPort}, #{startTimestamp}, #{status}, #{updateTime}) -