Jmx连接的主机IP支持可选择

This commit is contained in:
zengqiao
2022-09-03 08:26:50 +08:00
parent 7da712fcff
commit a0312be4fd
9 changed files with 76 additions and 14 deletions

View File

@@ -1,6 +1,30 @@
## 6.2、版本升级手册
**`2.x`版本 升级至 `3.0.0`版本**
注意:如果想升级至具体版本,需要将你当前版本至你期望使用版本的变更统统执行一遍,然后才能正常使用。
`master` 版本,需要
### 6.2.0、升级至 `master` 版本
**SQL变更**
1、在`ks_km_broker`表增加了一个监听信息字段。
2、为`logi_security_oplog`表operation_methods字段设置默认值''。
因此需要执行下面的sql对数据库表进行更新。
```sql
ALTER TABLE `ks_km_broker`
ADD COLUMN `endpoint_map` VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '监听信息' AFTER `update_time`;
ALTER TABLE `logi_security_oplog`
ALTER COLUMN `operation_methods` set default '';
```
---
### 6.2.1、`2.x`版本 升级至 `3.0.0`版本
**升级步骤:**

View File

@@ -1,5 +1,10 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.broker;
import com.alibaba.fastjson.TypeReference;
import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData;
import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -7,6 +12,7 @@ import lombok.NoArgsConstructor;
import org.apache.kafka.common.Node;
import java.io.Serializable;
import java.util.Map;
/**
* @author didi
@@ -55,6 +61,11 @@ public class Broker implements Serializable {
*/
private Integer status;
/**
* 监听信息
*/
private Map<String, IpPortData> endpointMap;
public static Broker buildFrom(Long clusterPhyId, Node node, Long startTimestamp) {
Broker metadata = new Broker();
metadata.setClusterPhyId(clusterPhyId);
@@ -78,9 +89,31 @@ public class Broker implements Serializable {
metadata.setStartTimestamp(brokerMetadata.getTimestamp());
metadata.setRack(brokerMetadata.getRack());
metadata.setStatus(1);
metadata.setEndpointMap(brokerMetadata.getEndpointMap());
return metadata;
}
public static Broker buildFrom(BrokerPO brokerPO) {
Broker broker = ConvertUtil.obj2Obj(brokerPO, Broker.class);
String endpointMapStr = brokerPO.getEndpointMap();
if (broker == null || endpointMapStr == null || endpointMapStr.equals("")) {
return broker;
}
// 填充endpoint信息
Map<String, IpPortData> endpointMap = ConvertUtil.str2ObjByJson(endpointMapStr, new TypeReference<Map<String, IpPortData>>(){});
broker.setEndpointMap(endpointMap);
return broker;
}
public String getJmxHost(String endPoint) {
if (endPoint == null || endpointMap == null) {
return host;
}
IpPortData ip = endpointMap.get(endPoint);
return ip == null ? ip.getIp() : host;
}
public boolean alive() {
return status != null && status > 0;
}

View File

@@ -27,6 +27,9 @@ public class JmxConfig implements Serializable {
@ApiModelProperty(value="SSL情况下的token", example = "KsKmCCY19")
private String token;
@ApiModelProperty(value="使用哪个endpoint网络", example = "EXTERNAL")
private String useWhichEndpoint;
}

View File

@@ -42,4 +42,9 @@ public class BrokerPO extends BasePO {
* Broker状态
*/
private Integer status;
/**
* 监听信息
*/
private String endpointMap;
}

View File

@@ -130,6 +130,9 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
// 如果当前Broker还存活则更新DB信息
BrokerPO newBrokerPO = ConvertUtil.obj2Obj(presentAliveBroker, BrokerPO.class);
if (presentAliveBroker.getEndpointMap() != null) {
newBrokerPO.setEndpointMap(ConvertUtil.obj2Json(presentAliveBroker.getEndpointMap()));
}
newBrokerPO.setId(inDBBrokerPO.getId());
newBrokerPO.setStatus(Constant.ALIVE);
newBrokerPO.setCreateTime(inDBBrokerPO.getCreateTime());
@@ -203,7 +206,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
lambdaQueryWrapper.eq(BrokerPO::getClusterPhyId, clusterPhyId);
lambdaQueryWrapper.eq(BrokerPO::getBrokerId, brokerId);
return ConvertUtil.obj2Obj(brokerDAO.selectOne(lambdaQueryWrapper), Broker.class);
return Broker.buildFrom(brokerDAO.selectOne(lambdaQueryWrapper));
}
@Override
@@ -272,9 +275,8 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
/**************************************************** private method ****************************************************/
private List<Broker> listAllBrokersAndUpdateCache(Long clusterPhyId) {
List<Broker> allBrokerList = ConvertUtil.list2List(this.getAllBrokerPOsFromDB(clusterPhyId), Broker.class);
List<Broker> allBrokerList = getAllBrokerPOsFromDB(clusterPhyId).stream().map(elem -> Broker.buildFrom(elem)).collect(Collectors.toList());
brokersCache.put(clusterPhyId, allBrokerList);
return allBrokerList;
}

View File

@@ -13,6 +13,7 @@ CREATE TABLE `ks_km_broker` (
`status` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1存活0未存活',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`endpoint_map` varchar(1024) NOT NULL DEFAULT '' COMMENT '监听信息',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_cluster_phy_id_broker_id` (`cluster_phy_id`,`broker_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Broker信息表';

View File

@@ -165,7 +165,7 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler {
clusterPhy.getId(),
brokerId,
broker.getStartTimestamp(),
broker.getHost(),
jmxConfig != null ? broker.getJmxHost(jmxConfig.getUseWhichEndpoint()) : broker.getHost(),
broker.getJmxPort() != null ? broker.getJmxPort() : jmxConfig.getJmxPort(),
jmxConfig
);
@@ -191,6 +191,6 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler {
lambdaQueryWrapper.eq(BrokerPO::getStatus, Constant.ALIVE);
BrokerPO brokerPO = brokerDAO.selectOne(lambdaQueryWrapper);
return ConvertUtil.obj2Obj(brokerPO, Broker.class);
return Broker.buildFrom(brokerPO);
}
}

View File

@@ -6,5 +6,4 @@ import org.springframework.stereotype.Repository;
@Repository
public interface BrokerDAO extends BaseMapper<BrokerPO> {
int replace(BrokerPO brokerPO);
}

View File

@@ -14,12 +14,7 @@
<result column="jmx_port" property="jmxPort" />
<result column="start_timestamp" property="startTimestamp" />
<result column="status" property="status" />
<result column="endpoint_map" property="endpointMap"/>
</resultMap>
<insert id="replace" parameterType="com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO">
REPLACE ks_km_broker
(cluster_phy_id, broker_id, host, port, jmx_port, start_timestamp, status, update_time)
VALUES
(#{clusterPhyId}, #{brokerId}, #{host}, #{port}, #{jmxPort}, #{startTimestamp}, #{status}, #{updateTime})
</insert>
</mapper>