From 37585f760d8caf8c93721de27230c98c3377039b Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 11 Jan 2022 16:57:24 +0800 Subject: [PATCH 1/3] optimize reeassign task-name --- .../web/converters/ReassignModelConverter.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/ReassignModelConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/ReassignModelConverter.java index 747fbb8b..06de3ad9 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/ReassignModelConverter.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/ReassignModelConverter.java @@ -95,12 +95,21 @@ public class ReassignModelConverter { vo.setBeginTime(0L); vo.setEndTime(0L); + StringBuilder clusterAndTopicName = new StringBuilder(); + Integer completedTopicNum = 0; Set statusSet = new HashSet<>(); for (ReassignTaskDO elem: doList) { vo.setGmtCreate(elem.getGmtCreate().getTime()); vo.setOperator(elem.getOperator()); vo.setDescription(elem.getDescription()); + + if (clusterAndTopicName.length() == 0) { + clusterAndTopicName.append("-").append(elem.getClusterId()).append("-").append(elem.getTopicName()); + } else { + clusterAndTopicName.append("等"); + } + if (TaskStatusReassignEnum.isFinished(elem.getStatus())) { completedTopicNum += 1; statusSet.add(elem.getStatus()); @@ -114,6 +123,9 @@ public class ReassignModelConverter { vo.setBeginTime(elem.getBeginTime().getTime()); } + // 任务名称上,增加展示集群ID和Topic名称,多个时,仅展示第一个. PR from Hongten + vo.setTaskName(String.format("%s 数据迁移任务%s", DateUtils.getFormattedDate(taskId), clusterAndTopicName.toString())); + // 任务整体状态 if (statusSet.contains(TaskStatusReassignEnum.RUNNING.getCode())) { vo.setStatus(TaskStatusReassignEnum.RUNNING.getCode()); From 2818584db6e46bfe751d8294061563517a32379a Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 11 Jan 2022 17:19:14 +0800 Subject: [PATCH 2/3] ignore read kafka-controller data when znode not exist --- .../zookeeper/ControllerStateListener.java | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/ControllerStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/ControllerStateListener.java index 3f43f57b..c417df66 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/ControllerStateListener.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/ControllerStateListener.java @@ -19,13 +19,13 @@ import org.springframework.dao.DuplicateKeyException; * @date 20/5/14 */ public class ControllerStateListener implements StateChangeListener { - private final static Logger LOGGER = LoggerFactory.getLogger(ControllerStateListener.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerStateListener.class); - private Long clusterId; + private final Long clusterId; - private ZkConfigImpl zkConfig; + private final ZkConfigImpl zkConfig; - private ControllerDao controllerDao; + private final ControllerDao controllerDao; public ControllerStateListener(Long clusterId, ZkConfigImpl zkConfig, ControllerDao controllerDao) { this.clusterId = clusterId; @@ -35,8 +35,11 @@ public class ControllerStateListener implements StateChangeListener { @Override public void init() { + if (!checkNodeExist()) { + LOGGER.warn("kafka-controller data not exist, clusterId:{}.", clusterId); + return; + } processControllerChange(); - return; } @Override @@ -49,12 +52,21 @@ public class ControllerStateListener implements StateChangeListener { break; } } catch (Exception e) { - LOGGER.error("process controller state change failed, clusterId:{} state:{} path:{}.", - clusterId, state, path, e); + LOGGER.error("process controller state change failed, clusterId:{} state:{} path:{}.", clusterId, state, path, e); } } - private void processControllerChange(){ + private boolean checkNodeExist() { + try { + return zkConfig.checkPathExists(ZkPathUtil.CONTROLLER_ROOT_NODE); + } catch (Exception e) { + LOGGER.error("init kafka-controller data failed, clusterId:{}.", clusterId, e); + } + + return false; + } + + private void processControllerChange() { LOGGER.warn("init controllerData or controller change, clusterId:{}.", clusterId); ControllerData controllerData = null; try { From c036483680e7c60b26def1eba34b9c867de0fde7 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 11 Jan 2022 17:48:24 +0800 Subject: [PATCH 3/3] add region created event and handle it to cal region capacity immediately --- .../common/events/RegionCreatedEvent.java | 20 ++++++++++ .../service/impl/RegionServiceImpl.java | 4 ++ .../listener/biz/RegionCreatedListener.java | 38 +++++++++++++++++++ 3 files changed, 62 insertions(+) create mode 100644 kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/events/RegionCreatedEvent.java create mode 100644 kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/biz/RegionCreatedListener.java diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/events/RegionCreatedEvent.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/events/RegionCreatedEvent.java new file mode 100644 index 00000000..b8d72de9 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/events/RegionCreatedEvent.java @@ -0,0 +1,20 @@ +package com.xiaojukeji.kafka.manager.common.events; + +import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO; +import lombok.Getter; +import org.springframework.context.ApplicationEvent; + +/** + * Region创建事件 + * @author zengqiao + * @date 22/01/1 + */ +@Getter +public class RegionCreatedEvent extends ApplicationEvent { + private final RegionDO regionDO; + + public RegionCreatedEvent(Object source, RegionDO regionDO) { + super(source); + this.regionDO = regionDO; + } +} diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/RegionServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/RegionServiceImpl.java index 8f957b02..3b2df843 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/RegionServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/RegionServiceImpl.java @@ -2,6 +2,8 @@ package com.xiaojukeji.kafka.manager.service.service.impl; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO; +import com.xiaojukeji.kafka.manager.common.events.RegionCreatedEvent; +import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; import com.xiaojukeji.kafka.manager.dao.RegionDao; @@ -59,6 +61,8 @@ public class RegionServiceImpl implements RegionService { return ResultStatus.BROKER_NOT_EXIST; } if (regionDao.insert(regionDO) > 0) { + // 发布region创建事件 + SpringTool.publish(new RegionCreatedEvent(this, regionDO)); return ResultStatus.SUCCESS; } } catch (DuplicateKeyException e) { diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/biz/RegionCreatedListener.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/biz/RegionCreatedListener.java new file mode 100644 index 00000000..5daa0e9e --- /dev/null +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/biz/RegionCreatedListener.java @@ -0,0 +1,38 @@ +package com.xiaojukeji.kafka.manager.task.listener.biz; + +import com.xiaojukeji.kafka.manager.common.events.RegionCreatedEvent; +import com.xiaojukeji.kafka.manager.task.dispatch.biz.CalRegionCapacity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +/** + * Region创建监听器, + * TODO 后续需要将其移动到core模块 + * @author zengqiao + * @date 22/01/11 + */ +@Component +public class RegionCreatedListener implements ApplicationListener { + private static final Logger logger = LoggerFactory.getLogger(RegionCreatedListener.class); + + @Autowired + private CalRegionCapacity calRegionCapacity; + + @Async + @Override + public void onApplicationEvent(RegionCreatedEvent event) { + try { + logger.info("cal region capacity started when region created, regionDO:{}.", event.getRegionDO()); + + calRegionCapacity.processTask(event.getRegionDO()); + + logger.info("cal region capacity finished when region created, regionDO:{}.", event.getRegionDO()); + } catch (Exception e) { + logger.error("cal region capacity failed when region created, regionDO:{}.", event.getRegionDO(), e); + } + } +}