diff --git a/docs/install_guide/版本升级手册.md b/docs/install_guide/版本升级手册.md index a75f71fd..efc01399 100644 --- a/docs/install_guide/版本升级手册.md +++ b/docs/install_guide/版本升级手册.md @@ -7,7 +7,22 @@ ### 6.2.0、升级至 `master` 版本 -暂无 +```sql +DROP TABLE IF EXISTS `ks_km_zookeeper`; +CREATE TABLE `ks_km_zookeeper` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '物理集群ID', + `host` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper主机名', + `port` int(16) NOT NULL DEFAULT '-1' COMMENT 'zookeeper端口', + `role` int(16) NOT NULL DEFAULT '-1' COMMENT '角色, leader follower observer', + `version` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper版本', + `status` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1存活,0未存活,11存活但是4字命令使用不了', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_cluster_phy_id_host_port` (`cluster_phy_id`,`host`, `port`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Zookeeper信息表'; +``` ### 6.2.1、升级至 `v3.0.0` 版本 diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/ZookeeperInfo.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/ZookeeperInfo.java new file mode 100644 index 00000000..e943952e --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/ZookeeperInfo.java @@ -0,0 +1,42 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.BaseEntity; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import lombok.Data; + +@Data +public class ZookeeperInfo extends BaseEntity { + /** + * 集群Id + */ + private Long clusterPhyId; + + /** + * 主机 + */ + private String host; + + /** + * 端口 + */ + private Integer port; + + /** + * 角色 + */ + private String role; + + /** + * 版本 + */ + private String version; + + /** + * ZK状态 + */ + private Integer status; + + public boolean alive() { + return !(Constant.DOWN.equals(status)); + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/zookeeper/ZookeeperInfoPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/zookeeper/ZookeeperInfoPO.java new file mode 100644 index 00000000..69968ef6 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/zookeeper/ZookeeperInfoPO.java @@ -0,0 +1,40 @@ +package com.xiaojukeji.know.streaming.km.common.bean.po.zookeeper; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import lombok.Data; + +@Data +@TableName(Constant.MYSQL_TABLE_NAME_PREFIX + "zookeeper") +public class ZookeeperInfoPO extends BasePO { + /** + * 集群Id + */ + private Long clusterPhyId; + + /** + * 主机 + */ + private String host; + + /** + * 端口 + */ + private Integer port; + + /** + * 角色 + */ + private String role; + + /** + * 版本 + */ + private String version; + + /** + * ZK状态 + */ + private Integer status; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/zookeeper/ZKRoleEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/zookeeper/ZKRoleEnum.java new file mode 100644 index 00000000..fd379dc8 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/zookeeper/ZKRoleEnum.java @@ -0,0 +1,22 @@ +package com.xiaojukeji.know.streaming.km.common.enums.zookeeper; + +import lombok.Getter; + +@Getter +public enum ZKRoleEnum { + LEADER("leader"), + + FOLLOWER("follower"), + + OBSERVER("observer"), + + UNKNOWN("unknown"), + + ; + + private final String role; + + ZKRoleEnum(String role) { + this.role = role; + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperService.java new file mode 100644 index 00000000..758247aa --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperService.java @@ -0,0 +1,18 @@ +package com.xiaojukeji.know.streaming.km.core.service.zookeeper; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; + +import java.util.List; + +public interface ZookeeperService { + /** + * 从ZK集群中获取ZK信息 + */ + Result> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig); + + void batchReplaceDataInDB(Long clusterPhyId, List infoList); + + List listFromDBByCluster(Long clusterPhyId); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperServiceImpl.java new file mode 100644 index 00000000..2f1e318c --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperServiceImpl.java @@ -0,0 +1,147 @@ +package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.ServerCmdDataParser; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.zookeeper.ZKRoleEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; +import com.xiaojukeji.know.streaming.km.common.bean.po.zookeeper.ZookeeperInfoPO; +import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil; +import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.ZookeeperUtils; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; +import com.xiaojukeji.know.streaming.km.persistence.mysql.zookeeper.ZookeeperDAO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Service +public class ZookeeperServiceImpl implements ZookeeperService { + private static final ILog LOGGER = LogFactory.getLog(ZookeeperServiceImpl.class); + + @Autowired + private ZookeeperDAO zookeeperDAO; + + @Override + public Result> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig) { + List> addressList = null; + try { + addressList = ZookeeperUtils.connectStringParser(zookeeperAddress); + } catch (Exception e) { + LOGGER.error( + "class=ZookeeperServiceImpl||method=listFromZookeeperCluster||clusterPhyId={}||zookeeperAddress={}||errMsg=exception!", + clusterPhyId, zookeeperAddress, e + ); + + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, e.getMessage()); + } + + List aliveZKList = new ArrayList<>(); + for (Tuple hostPort: addressList) { + aliveZKList.add(this.getFromZookeeperCluster( + clusterPhyId, + hostPort.getV1(), + hostPort.getV2(), + zkConfig + )); + } + return Result.buildSuc(aliveZKList); + } + + @Override + public void batchReplaceDataInDB(Long clusterPhyId, List infoList) { + // DB 中的信息 + List dbInfoList = this.listRawFromDBByCluster(clusterPhyId); + Map dbMap = new HashMap<>(); + dbInfoList.stream().forEach(elem -> dbMap.put(elem.getHost() + elem.getPort(), elem)); + + // 新获取到的信息 + List newInfoList = ConvertUtil.list2List(infoList, ZookeeperInfoPO.class); + for (ZookeeperInfoPO newInfo: newInfoList) { + try { + ZookeeperInfoPO oldInfo = dbMap.remove(newInfo.getHost() + newInfo.getPort()); + if (oldInfo == null) { + zookeeperDAO.insert(newInfo); + } else if (!Constant.DOWN.equals(newInfo.getStatus())) { + // 存活时,直接使用获取到的数据 + newInfo.setId(oldInfo.getId()); + zookeeperDAO.updateById(newInfo); + } else { + // 如果挂了,则版本和角色信息,使用先前的信息。 + // 挂掉之后,如果角色是leader,则需要调整一下 + newInfo.setId(oldInfo.getId()); + newInfo.setRole(ZKRoleEnum.LEADER.getRole().equals(oldInfo.getRole())? ZKRoleEnum.FOLLOWER.getRole(): oldInfo.getRole()); + newInfo.setVersion(oldInfo.getVersion()); + zookeeperDAO.updateById(newInfo); + } + } catch (Exception e) { + LOGGER.error("class=ZookeeperServiceImpl||method=batchReplaceDataInDB||clusterPhyId={}||newInfo={}||errMsg=exception", clusterPhyId, newInfo, e); + } + } + + // 删除剩余的ZK节点 + dbMap.entrySet().forEach(entry -> { + try { + zookeeperDAO.deleteById(entry.getValue().getId()); + } catch (Exception e) { + LOGGER.error("class=ZookeeperServiceImpl||method=batchReplaceDataInDB||clusterPhyId={}||expiredInfo={}||errMsg=exception", clusterPhyId, entry.getValue(), e); + } + }); + } + + @Override + public List listFromDBByCluster(Long clusterPhyId) { + return ConvertUtil.list2List(this.listRawFromDBByCluster(clusterPhyId), ZookeeperInfo.class); + } + + + /**************************************************** private method ****************************************************/ + + private List listRawFromDBByCluster(Long clusterPhyId) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(ZookeeperInfoPO::getClusterPhyId, clusterPhyId); + + return zookeeperDAO.selectList(lambdaQueryWrapper); + } + + private ZookeeperInfo getFromZookeeperCluster(Long clusterPhyId, String host, Integer port, ZKConfig zkConfig) { + ZookeeperInfo zookeeperInfo = new ZookeeperInfo(); + zookeeperInfo.setClusterPhyId(clusterPhyId); + zookeeperInfo.setHost(host); + zookeeperInfo.setPort(port); + zookeeperInfo.setRole(""); + zookeeperInfo.setVersion(""); + zookeeperInfo.setStatus(Constant.DOWN); + + Result serverCmdDataResult = FourLetterWordUtil.executeFourLetterCmd( + clusterPhyId, + host, + port, + zkConfig != null ? zkConfig.getOpenSecure(): false, + zkConfig != null ? zkConfig.getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS, + new ServerCmdDataParser() + ); + if (serverCmdDataResult.hasData()) { + zookeeperInfo.setRole(serverCmdDataResult.getData().getZkServerState()); + zookeeperInfo.setVersion(serverCmdDataResult.getData().getZkVersion()); + zookeeperInfo.setStatus(Constant.ALIVE); + } else if (serverCmdDataResult.getCode().equals(ResultStatus.ZK_FOUR_LETTER_CMD_FORBIDDEN.getCode())) { + zookeeperInfo.setStatus(Constant.ZK_ALIVE_BUT_4_LETTER_FORBIDDEN); + } else { + return zookeeperInfo; + } + + return zookeeperInfo; + } +} diff --git a/km-dist/init/sql/ddl-ks-km.sql b/km-dist/init/sql/ddl-ks-km.sql index d9e4e16c..89f04034 100644 --- a/km-dist/init/sql/ddl-ks-km.sql +++ b/km-dist/init/sql/ddl-ks-km.sql @@ -355,3 +355,19 @@ CREATE TABLE `ks_km_app_node` ( PRIMARY KEY (`id`), KEY `idx_app_host` (`app_name`,`host_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='km集群部署的node信息'; + + +DROP TABLE IF EXISTS `ks_km_zookeeper`; +CREATE TABLE `ks_km_zookeeper` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '物理集群ID', + `host` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper主机名', + `port` int(16) NOT NULL DEFAULT '-1' COMMENT 'zookeeper端口', + `role` int(16) NOT NULL DEFAULT '-1' COMMENT '角色, leader follower observer', + `version` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper版本', + `status` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1存活,0未存活,11存活但是4字命令使用不了', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_cluster_phy_id_host_port` (`cluster_phy_id`,`host`, `port`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Zookeeper信息表'; \ No newline at end of file diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/zookeeper/ZookeeperDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/zookeeper/ZookeeperDAO.java new file mode 100644 index 00000000..73a177ae --- /dev/null +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/zookeeper/ZookeeperDAO.java @@ -0,0 +1,9 @@ +package com.xiaojukeji.know.streaming.km.persistence.mysql.zookeeper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.xiaojukeji.know.streaming.km.common.bean.po.zookeeper.ZookeeperInfoPO; +import org.springframework.stereotype.Repository; + +@Repository +public interface ZookeeperDAO extends BaseMapper { +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncZookeeperTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncZookeeperTask.java new file mode 100644 index 00000000..5af37be2 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncZookeeperTask.java @@ -0,0 +1,47 @@ +package com.xiaojukeji.know.streaming.km.task.metadata; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.List; + + +@Task(name = "SyncZookeeperTask", + description = "ZK信息同步到DB", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class SyncZookeeperTask extends AbstractAsyncMetadataDispatchTask { + private static final ILog log = LogFactory.getLog(SyncZookeeperTask.class); + + @Autowired + private ZookeeperService zookeeperService; + + @Override + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + Result> infoResult = zookeeperService.listFromZookeeper( + clusterPhy.getId(), + clusterPhy.getZookeeper(), + ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class) + ); + + if (infoResult.failed()) { + return new TaskResult(TaskResult.FAIL_CODE, infoResult.getMessage()); + } + + zookeeperService.batchReplaceDataInDB(clusterPhy.getId(), infoResult.getData()); + + return TaskResult.SUCCESS; + } +}