Merge pull request #425 from didi/dev

1. optimize reeassign task-name; 2. ignore read kafka-controller data when znode not exist; 3.add region created event and handle it to cal region capacity immediately;
This commit is contained in:
EricZeng
2022-01-12 11:44:22 +08:00
committed by GitHub
5 changed files with 94 additions and 8 deletions

View File

@@ -0,0 +1,20 @@
package com.xiaojukeji.kafka.manager.common.events;
import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
/**
* Region创建事件
* @author zengqiao
* @date 22/01/1
*/
@Getter
public class RegionCreatedEvent extends ApplicationEvent {
private final RegionDO regionDO;
public RegionCreatedEvent(Object source, RegionDO regionDO) {
super(source);
this.regionDO = regionDO;
}
}

View File

@@ -2,6 +2,8 @@ package com.xiaojukeji.kafka.manager.service.service.impl;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
import com.xiaojukeji.kafka.manager.common.events.RegionCreatedEvent;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.dao.RegionDao; import com.xiaojukeji.kafka.manager.dao.RegionDao;
@@ -59,6 +61,8 @@ public class RegionServiceImpl implements RegionService {
return ResultStatus.BROKER_NOT_EXIST; return ResultStatus.BROKER_NOT_EXIST;
} }
if (regionDao.insert(regionDO) > 0) { if (regionDao.insert(regionDO) > 0) {
// 发布region创建事件
SpringTool.publish(new RegionCreatedEvent(this, regionDO));
return ResultStatus.SUCCESS; return ResultStatus.SUCCESS;
} }
} catch (DuplicateKeyException e) { } catch (DuplicateKeyException e) {

View File

@@ -19,13 +19,13 @@ import org.springframework.dao.DuplicateKeyException;
* @date 20/5/14 * @date 20/5/14
*/ */
public class ControllerStateListener implements StateChangeListener { public class ControllerStateListener implements StateChangeListener {
private final static Logger LOGGER = LoggerFactory.getLogger(ControllerStateListener.class); private static final Logger LOGGER = LoggerFactory.getLogger(ControllerStateListener.class);
private Long clusterId; private final Long clusterId;
private ZkConfigImpl zkConfig; private final ZkConfigImpl zkConfig;
private ControllerDao controllerDao; private final ControllerDao controllerDao;
public ControllerStateListener(Long clusterId, ZkConfigImpl zkConfig, ControllerDao controllerDao) { public ControllerStateListener(Long clusterId, ZkConfigImpl zkConfig, ControllerDao controllerDao) {
this.clusterId = clusterId; this.clusterId = clusterId;
@@ -35,8 +35,11 @@ public class ControllerStateListener implements StateChangeListener {
@Override @Override
public void init() { public void init() {
if (!checkNodeExist()) {
LOGGER.warn("kafka-controller data not exist, clusterId:{}.", clusterId);
return;
}
processControllerChange(); processControllerChange();
return;
} }
@Override @Override
@@ -49,12 +52,21 @@ public class ControllerStateListener implements StateChangeListener {
break; break;
} }
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("process controller state change failed, clusterId:{} state:{} path:{}.", LOGGER.error("process controller state change failed, clusterId:{} state:{} path:{}.", clusterId, state, path, e);
clusterId, state, path, e);
} }
} }
private void processControllerChange(){ private boolean checkNodeExist() {
try {
return zkConfig.checkPathExists(ZkPathUtil.CONTROLLER_ROOT_NODE);
} catch (Exception e) {
LOGGER.error("init kafka-controller data failed, clusterId:{}.", clusterId, e);
}
return false;
}
private void processControllerChange() {
LOGGER.warn("init controllerData or controller change, clusterId:{}.", clusterId); LOGGER.warn("init controllerData or controller change, clusterId:{}.", clusterId);
ControllerData controllerData = null; ControllerData controllerData = null;
try { try {

View File

@@ -0,0 +1,38 @@
package com.xiaojukeji.kafka.manager.task.listener.biz;
import com.xiaojukeji.kafka.manager.common.events.RegionCreatedEvent;
import com.xiaojukeji.kafka.manager.task.dispatch.biz.CalRegionCapacity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* Region创建监听器,
* TODO 后续需要将其移动到core模块
* @author zengqiao
* @date 22/01/11
*/
@Component
public class RegionCreatedListener implements ApplicationListener<RegionCreatedEvent> {
private static final Logger logger = LoggerFactory.getLogger(RegionCreatedListener.class);
@Autowired
private CalRegionCapacity calRegionCapacity;
@Async
@Override
public void onApplicationEvent(RegionCreatedEvent event) {
try {
logger.info("cal region capacity started when region created, regionDO:{}.", event.getRegionDO());
calRegionCapacity.processTask(event.getRegionDO());
logger.info("cal region capacity finished when region created, regionDO:{}.", event.getRegionDO());
} catch (Exception e) {
logger.error("cal region capacity failed when region created, regionDO:{}.", event.getRegionDO(), e);
}
}
}

View File

@@ -95,12 +95,21 @@ public class ReassignModelConverter {
vo.setBeginTime(0L); vo.setBeginTime(0L);
vo.setEndTime(0L); vo.setEndTime(0L);
StringBuilder clusterAndTopicName = new StringBuilder();
Integer completedTopicNum = 0; Integer completedTopicNum = 0;
Set<Integer> statusSet = new HashSet<>(); Set<Integer> statusSet = new HashSet<>();
for (ReassignTaskDO elem: doList) { for (ReassignTaskDO elem: doList) {
vo.setGmtCreate(elem.getGmtCreate().getTime()); vo.setGmtCreate(elem.getGmtCreate().getTime());
vo.setOperator(elem.getOperator()); vo.setOperator(elem.getOperator());
vo.setDescription(elem.getDescription()); vo.setDescription(elem.getDescription());
if (clusterAndTopicName.length() == 0) {
clusterAndTopicName.append("-").append(elem.getClusterId()).append("-").append(elem.getTopicName());
} else {
clusterAndTopicName.append("");
}
if (TaskStatusReassignEnum.isFinished(elem.getStatus())) { if (TaskStatusReassignEnum.isFinished(elem.getStatus())) {
completedTopicNum += 1; completedTopicNum += 1;
statusSet.add(elem.getStatus()); statusSet.add(elem.getStatus());
@@ -114,6 +123,9 @@ public class ReassignModelConverter {
vo.setBeginTime(elem.getBeginTime().getTime()); vo.setBeginTime(elem.getBeginTime().getTime());
} }
// 任务名称上增加展示集群ID和Topic名称多个时仅展示第一个. PR from Hongten
vo.setTaskName(String.format("%s 数据迁移任务%s", DateUtils.getFormattedDate(taskId), clusterAndTopicName.toString()));
// 任务整体状态 // 任务整体状态
if (statusSet.contains(TaskStatusReassignEnum.RUNNING.getCode())) { if (statusSet.contains(TaskStatusReassignEnum.RUNNING.getCode())) {
vo.setStatus(TaskStatusReassignEnum.RUNNING.getCode()); vo.setStatus(TaskStatusReassignEnum.RUNNING.getCode());