add region created event and handle it to cal region capacity immediately

This commit is contained in:
zengqiao
2022-01-11 17:48:24 +08:00
parent 2818584db6
commit c036483680
3 changed files with 62 additions and 0 deletions

View File

@@ -0,0 +1,20 @@
package com.xiaojukeji.kafka.manager.common.events;
import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
/**
* Region创建事件
* @author zengqiao
* @date 22/01/1
*/
@Getter
public class RegionCreatedEvent extends ApplicationEvent {
private final RegionDO regionDO;
public RegionCreatedEvent(Object source, RegionDO regionDO) {
super(source);
this.regionDO = regionDO;
}
}

View File

@@ -2,6 +2,8 @@ package com.xiaojukeji.kafka.manager.service.service.impl;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
import com.xiaojukeji.kafka.manager.common.events.RegionCreatedEvent;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.dao.RegionDao;
@@ -59,6 +61,8 @@ public class RegionServiceImpl implements RegionService {
return ResultStatus.BROKER_NOT_EXIST;
}
if (regionDao.insert(regionDO) > 0) {
// 发布region创建事件
SpringTool.publish(new RegionCreatedEvent(this, regionDO));
return ResultStatus.SUCCESS;
}
} catch (DuplicateKeyException e) {

View File

@@ -0,0 +1,38 @@
package com.xiaojukeji.kafka.manager.task.listener.biz;
import com.xiaojukeji.kafka.manager.common.events.RegionCreatedEvent;
import com.xiaojukeji.kafka.manager.task.dispatch.biz.CalRegionCapacity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* Region创建监听器,
* TODO 后续需要将其移动到core模块
* @author zengqiao
* @date 22/01/11
*/
@Component
public class RegionCreatedListener implements ApplicationListener<RegionCreatedEvent> {
private static final Logger logger = LoggerFactory.getLogger(RegionCreatedListener.class);
@Autowired
private CalRegionCapacity calRegionCapacity;
@Async
@Override
public void onApplicationEvent(RegionCreatedEvent event) {
try {
logger.info("cal region capacity started when region created, regionDO:{}.", event.getRegionDO());
calRegionCapacity.processTask(event.getRegionDO());
logger.info("cal region capacity finished when region created, regionDO:{}.", event.getRegionDO());
} catch (Exception e) {
logger.error("cal region capacity failed when region created, regionDO:{}.", event.getRegionDO(), e);
}
}
}