kafka-manager 2.0

This commit is contained in:
zengqiao
2020-09-28 15:46:34 +08:00
parent 28d985aaf1
commit c6e4b60424
1253 changed files with 82183 additions and 37179 deletions

View File

@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>kafka-manager-task</artifactId>
<version>2.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
<artifactId>kafka-manager</artifactId>
<groupId>com.xiaojukeji.kafka</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>kafka-manager-core</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>kafka-manager-kcm</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>kafka-manager-monitor</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>kafka-manager-bpm</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.9.RELEASE</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,20 @@
package com.xiaojukeji.kafka.manager.task.common;
import org.springframework.context.ApplicationEvent;
/**
* @author zengqiao
* @date 20/9/24
*/
public class BaseTaskEvent extends ApplicationEvent {
private long startTime;
public BaseTaskEvent(Object source, long startTime) {
super(source);
this.startTime = startTime;
}
public long getStartTime() {
return startTime;
}
}

View File

@@ -0,0 +1,24 @@
package com.xiaojukeji.kafka.manager.task.common;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicThrottledMetrics;
import java.util.List;
/**
* @author zengqiao
* @date 20/9/24
*/
public class TopicThrottledMetricsCollectedEvent extends BaseTaskEvent {
private List<TopicThrottledMetrics> metricsList;
public TopicThrottledMetricsCollectedEvent(Object source,
long startTime,
List<TopicThrottledMetrics> metricsList) {
super(source, startTime);
this.metricsList = metricsList;
}
public List<TopicThrottledMetrics> getMetricsList() {
return metricsList;
}
}

View File

@@ -0,0 +1,184 @@
package com.xiaojukeji.kafka.manager.task.component;
import com.google.common.collect.Lists;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.NetUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.dao.HeartbeatDao;
import com.xiaojukeji.kafka.manager.common.entity.pojo.HeartbeatDO;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;
import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
/**
* @author limeng
* @date 20/8/10
*/
public abstract class AbstractScheduledTask<E extends Comparable> implements SchedulingConfigurer {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private HeartbeatDao heartbeatDao;
private ExecutorService executorService;
private volatile String scheduledCron = "0 0/1 * * * *";
private volatile String scheduledName;
public String getCron() {
return this.scheduledCron;
}
public boolean modifyCron(String name, String cron) {
return checkAndModifyCron(name, cron, false);
}
public String getScheduledName() {
return scheduledName;
}
@PostConstruct
void init() {
CustomScheduled customSchedule = this.getClass().getAnnotation(CustomScheduled.class);
if (ValidateUtils.isNull(customSchedule)) {
LOGGER.error("extends AbstractScheduledTask must use CustomScheduled annotation.");
System.exit(0);
}
this.scheduledName = customSchedule.name();
checkAndModifyCron(customSchedule.name(), customSchedule.cron(), true);
this.executorService = new ThreadPoolExecutor(
customSchedule.threadNum(),
customSchedule.threadNum(),
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new DefaultThreadFactory("CustomScheduled-" + customSchedule.name())
);
LOGGER.info("init custom scheduled finished, scheduledName:{} scheduledCron:{}.", scheduledName, scheduledCron);
}
private boolean checkAndModifyCron(String scheduledName, String scheduledCron, boolean existIfIllegal) {
if (scheduledCron.matches(ScheduledTaskConstant.CRON_REG_EX)) {
this.scheduledCron = scheduledCron;
LOGGER.info("modify scheduledCron success, scheduledName:{} scheduledCron:{}."
, scheduledName, scheduledCron);
return true;
}
LOGGER.error("modify scheduledCron failed, format invalid, scheduledName:{} scheduledCron:{}."
, scheduledName, scheduledCron);
if (existIfIllegal) {
System.exit(0);
}
return false;
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(Executors.newScheduledThreadPool(20));
taskRegistrar.addTriggerTask(new Runnable() {
@Override
public void run() {
// 任务逻辑
scheduleFunction();
}
}, new Trigger() {
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
// 任务触发,可修改任务的执行周期
CronTrigger trigger = new CronTrigger(scheduledCron);
Date nextExec = trigger.nextExecutionTime(triggerContext);
return nextExec;
}
});
}
public void scheduleAllTaskFunction() {
List<E> taskList = this.listAllTasks();
for (E elem: taskList) {
executorService.submit(new BaseBizTask(elem, this));
}
}
public void scheduleFunction() {
LOGGER.info("customScheduled task start, scheduledName:{} scheduledCron:{}.", scheduledName, scheduledCron);
List<E> tasks = this.listAllTasks();
if (CollectionUtils.isEmpty(tasks)) {
LOGGER.info("customScheduled task finished, empty task, scheduledName:{}.", scheduledName);
return;
}
List<E> selectTasks = this.select(tasks);
if (ValidateUtils.isEmptyList(selectTasks)) {
LOGGER.info("customScheduled task finished, empty selected task, scheduledName:{}.", scheduledName);
return;
}
LOGGER.info("customScheduled task running, selected tasks, IP:{} selectedTasks:{}.",
NetUtils.localIp(), JsonUtils.toJSONString(selectTasks)
);
for (E elem : selectTasks) {
executorService.submit(new BaseBizTask(elem, this));
}
LOGGER.info("customScheduled task finished, scheduledName:{}.", scheduledName);
}
private List<E> select(List<E> allTaskList) {
long now = System.currentTimeMillis();
if(ValidateUtils.isEmptyList(allTaskList)){
return Lists.newArrayList();
}
Collections.sort(allTaskList);
List<HeartbeatDO> hostList = heartbeatDao.selectActiveHosts(
new Date(now - ScheduledTaskConstant.HEARTBEAT_TIME)
);
if (ValidateUtils.isEmptyList(hostList)) {
return Lists.newArrayList();
}
int idx = 0;
while (idx < hostList.size()) {
if (hostList.get(idx).getIp().equals(NetUtils.localIp())) {
break;
}
idx++;
}
if (idx == hostList.size()) {
// 当前机器未注册
LOGGER.error("customScheduled task running, registers not conclude, scheduledName:{}.", scheduledName);
return Lists.newArrayList();
}
int count = allTaskList.size() / hostList.size();
if (allTaskList.size() % hostList.size() != 0) {
count += 1;
}
if (idx * count >= allTaskList.size()) {
return Lists.newArrayList();
}
return allTaskList.subList(idx * count, Math.min(idx * count + count, allTaskList.size()));
}
protected abstract <E extends Comparable> List<E> listAllTasks();
protected abstract void processTask(E task);
}

View File

@@ -0,0 +1,35 @@
package com.xiaojukeji.kafka.manager.task.component;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author zengqiao
* @date 20/8/10
*/
public class BaseBizTask<E extends Comparable> implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
private E task;
private AbstractScheduledTask scheduledTask;
public BaseBizTask(E task, AbstractScheduledTask scheduledTask) {
this.task = task;
this.scheduledTask = scheduledTask;
}
@Override
public void run() {
long startTime = System.currentTimeMillis();
LOGGER.info("scheduled task scheduleName:{} start", scheduledTask.getScheduledName());
try {
scheduledTask.processTask(task);
} catch (Throwable t) {
LOGGER.error("scheduled task scheduleName:{} execute failed, task:{}", scheduledTask.getScheduledName(), task, t);
}
LOGGER.info("scheduled task scheduleName:{} finished, cost-time:{}ms.", scheduledTask.getScheduledName(), System.currentTimeMillis() - startTime);
}
}

View File

@@ -0,0 +1,21 @@
package com.xiaojukeji.kafka.manager.task.component;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
/**
* @author limeng
* @date 20/8/10
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface CustomScheduled {
String name();
String cron();
int threadNum() default 1;
}

View File

@@ -0,0 +1,30 @@
package com.xiaojukeji.kafka.manager.task.component;
/**
* @author zengqiao
* @date 20/8/11
*/
public class EmptyEntry implements Comparable<EmptyEntry> {
private Long id;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
@Override
public String toString() {
return "EmptyEntry{" +
"id=" + id +
'}';
}
@Override
public int compareTo(EmptyEntry emptyEntry) {
return this.id.compareTo(emptyEntry.id);
}
}

View File

@@ -0,0 +1,38 @@
package com.xiaojukeji.kafka.manager.task.component;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.utils.NetUtils;
import com.xiaojukeji.kafka.manager.dao.HeartbeatDao;
import com.xiaojukeji.kafka.manager.common.entity.pojo.HeartbeatDO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* @author limeng
* @date 20/8/10
*/
@Component
public class Heartbeat {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private HeartbeatDao heartbeatDao;
@Scheduled(cron = ScheduledTaskConstant.HEARTBEAT_CRON)
public void ipFlush() {
try {
// 随机回退0-100ms, 增加随机性. 后续在select任务那块, 可以引入时间戳, 增大任务随机的概率
Thread.sleep(Math.round(Math.random() * 1000));
HeartbeatDO heartbeatDO = new HeartbeatDO();
heartbeatDO.setIp(NetUtils.localIp());
heartbeatDO.setHostname(NetUtils.localHostname());
heartbeatDao.replace(heartbeatDO);
} catch (Exception e) {
LOGGER.error("flush heartbeat failed.", e);
}
}
}

View File

@@ -0,0 +1,26 @@
package com.xiaojukeji.kafka.manager.task.component;
/**
* @author zengqiao
* @date 20/8/12
*/
public class ScheduledTaskConstant {
public static final Long HEARTBEAT_TIME = 30 * 1000L;
public static final String HEARTBEAT_CRON = "0/10 * * * * ?";
public static final String CRON_REG_EX = "(((^([0-9]|[0-5][0-9])(\\,|\\-|\\/){1}([0-9]|[0-5][0-9]) )|^([0-9]|[0-5][0-9]) |^(\\* ))" +
"((([0-9]|[0-5][0-9])(\\,|\\-|\\/){1}([0-9]|[0-5][0-9]) )|([0-9]|[0-5][0-9]) |(\\* ))((([0-9]|" +
"[01][0-9]|2[0-3])(\\,|\\-|\\/){1}([0-9]|[01][0-9]|2[0-3]) )|([0-9]|[01][0-9]|2[0-3]) |(\\* ))" +
"((([0-9]|[0-2][0-9]|3[01])(\\,|\\-|\\/){1}([0-9]|[0-2][0-9]|3[01]) )|(([0-9]|[0-2][0-9]|3[01]) )|" +
"(\\? )|(\\* )|(([1-9]|[0-2][0-9]|3[01])L )|([1-7]W )|(LW )|([1-7]\\#[1-4] ))((([1-9]|0[1-9]|1[0-2])(\\,|\\-|\\/)" +
"{1}([1-9]|0[1-9]|1[0-2]) )|([1-9]|0[1-9]|1[0-2]) |(\\* ))(([1-7](\\,|\\-|\\/){1}[1-7])|([1-7])|(\\?)|(\\*)|" +
"(([1-7]L)|([1-7]\\#[1-4]))))|(((^([0-9]|[0-5][0-9])(\\,|\\-|\\/){1}([0-9]|[0-5][0-9]) )|^([0-9]|[0-5][0-9]) " +
"|^(\\* ))((([0-9]|[0-5][0-9])(\\,|\\-|\\/){1}([0-9]|[0-5][0-9]) )|([0-9]|[0-5][0-9]) |(\\* ))(" +
"(([0-9]|[01][0-9]|2[0-3])(\\,|\\-|\\/){1}([0-9]|[01][0-9]|2[0-3]) )|([0-9]|[01][0-9]|2[0-3]) |(\\* ))" +
"((([0-9]|[0-2][0-9]|3[01])(\\,|\\-|\\/){1}([0-9]|[0-2][0-9]|3[01]) )|(([0-9]|[0-2][0-9]|3[01]) )|" +
"(\\? )|(\\* )|(([1-9]|[0-2][0-9]|3[01])L )|([1-7]W )|(LW )|([1-7]\\#[1-4] ))((([1-9]|0[1-9]|1[0-2])(\\,|\\-|\\/)" +
"{1}([1-9]|0[1-9]|1[0-2]) )|([1-9]|0[1-9]|1[0-2]) |(\\* ))(([1-7](\\,|\\-|\\/){1}[1-7] )|([1-7] )|(\\? )|(\\* )|" +
"(([1-7]L )|([1-7]\\#[1-4]) ))((19[789][0-9]|20[0-9][0-9])\\-(19[789][0-9]|20[0-9][0-9])))";
}

View File

@@ -0,0 +1,71 @@
package com.xiaojukeji.kafka.manager.task.config;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
/**
* @author zengqiao
* @date 20/9/18
*/
public class RegionCapacityConfig {
private Long clusterId;
private Integer duration;
private Long latestTimeUnitMs;
private Long maxCapacityUnitB;
public Long getClusterId() {
if (this.clusterId == null) {
return Constant.INVALID_CODE.longValue();
}
return clusterId;
}
public void setClusterId(Long clusterId) {
this.clusterId = clusterId;
}
public Integer getDuration() {
if (this.duration == null) {
return 10;
}
return duration;
}
public void setDuration(Integer duration) {
this.duration = duration;
}
public Long getLatestTimeUnitMs() {
if (this.latestTimeUnitMs == null) {
return 7 * 24 * 60 * 60 * 1000L;
}
return latestTimeUnitMs;
}
public void setLatestTimeUnitMs(Long latestTimeUnitMs) {
this.latestTimeUnitMs = latestTimeUnitMs;
}
public Long getMaxCapacityUnitB() {
if (this.maxCapacityUnitB == null) {
return 120 * 1024 * 1024L;
}
return maxCapacityUnitB;
}
public void setMaxCapacityUnitB(Long maxCapacityUnitB) {
this.maxCapacityUnitB = maxCapacityUnitB;
}
@Override
public String toString() {
return "RegionCapacityConfig{" +
"clusterId=" + clusterId +
", duration=" + duration +
", latestTimeUnitMs=" + latestTimeUnitMs +
", maxCapacityUnitB=" + maxCapacityUnitB +
'}';
}
}

View File

@@ -0,0 +1,58 @@
package com.xiaojukeji.kafka.manager.task.config;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
/**
* Kafka账单配置
* @author zengqiao
* @date 20/6/9
*/
public class TopicBillConfig {
/**
* 取当月maxAvgDay天的峰值的均值Quota
*/
private Integer maxAvgDay;
/**
* Quota调整比例
*/
private Double quotaRatio;
/**
* 单价
*/
private Double priseUnitMB;
public Integer getMaxAvgDay() {
return maxAvgDay;
}
public void setMaxAvgDay(Integer maxAvgDay) {
this.maxAvgDay = maxAvgDay;
}
public Double getQuotaRatio() {
return quotaRatio;
}
public void setQuotaRatio(Double quotaRatio) {
this.quotaRatio = quotaRatio;
}
public Double getPriseUnitMB() {
return priseUnitMB;
}
public void setPriseUnitMB(Double priseUnitMB) {
this.priseUnitMB = priseUnitMB;
}
public boolean paramLegal() {
if (ValidateUtils.isNullOrLessThanZero(maxAvgDay)
|| ValidateUtils.isNullOrLessThanZero(quotaRatio)
|| ValidateUtils.isNullOrLessThanZero(priseUnitMB)) {
return false;
}
return true;
}
}

View File

@@ -0,0 +1,132 @@
package com.xiaojukeji.kafka.manager.task.dispatch.biz;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.task.config.TopicBillConfig;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO;
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.dao.TopicStatisticsDao;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaBillDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.*;
import com.xiaojukeji.kafka.manager.service.service.gateway.AppService;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
/**
* 计算账单
* @author zengqiao
* @date 20/5/11
*/
@CustomScheduled(name = "calKafkaBill", cron = "0 0 1 * * *", threadNum = 1)
public class CalKafkaTopicBill extends AbstractScheduledTask<ClusterDO> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private AppService appService;
@Autowired
private ConfigService configService;
@Autowired
private ClusterService clusterService;
@Autowired
private KafkaBillService kafkaBillService;
@Autowired
private TopicManagerService topicManagerService;
@Autowired
private TopicStatisticsDao topicStatisticsDao;
private static final String KAFKA_TOPIC_BILL_CONFIG_KEY = "KAFKA_TOPIC_BILL_CONFIG";
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
TopicBillConfig kafkaBillConfig =
configService.getByKey(KAFKA_TOPIC_BILL_CONFIG_KEY, TopicBillConfig.class);
if (ValidateUtils.isNull(kafkaBillConfig) || !kafkaBillConfig.paramLegal()) {
return ;
}
Date now = new Date();
Date startTime = DateUtils.getMonthStartTime(now);
Date endTime = DateUtils.getMonthEndTime(now);
String gmtDay = DateUtils.getFormattedDate(now).substring(0, 7);
List<AppDO> appDOList = appService.listAll();
Map<String, String> appMap = new HashMap<>();
for (AppDO appDO: appDOList) {
List<String> principalList = ListUtils.string2StrList(appDO.getPrincipals());
appMap.put(
appDO.getAppId(),
ValidateUtils.isEmptyList(principalList)? Constant.UNKNOWN_USER: principalList.get(0)
);
}
calAndUpdateBill(clusterDO.getId(), startTime, endTime, gmtDay, kafkaBillConfig, appMap);
}
private void calAndUpdateBill(Long clusterId,
Date startTime,
Date endTime,
String gmtDay,
TopicBillConfig kafkaBillConfig,
Map<String, String> appMap) {
List<TopicDO> topicDOList = topicManagerService.getByClusterId(clusterId);
if (ValidateUtils.isEmptyList(topicDOList)) {
topicDOList = new ArrayList<>();
}
Map<String, String> topicNamePrincipalList = new HashMap<>();
for (TopicDO topicDO: topicDOList) {
topicNamePrincipalList.put(
topicDO.getTopicName(),
appMap.getOrDefault(topicDO.getAppId(), Constant.UNKNOWN_USER)
);
}
// 获取今天的起始时间和结束时间
for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterId)) {
try {
Double maxAvgBytesIn = topicStatisticsDao.getTopicMaxAvgBytesIn(
clusterId,
topicName,
startTime,
endTime,
kafkaBillConfig.getMaxAvgDay()
);
if (ValidateUtils.isNull(maxAvgBytesIn)) {
continue;
}
KafkaBillDO kafkaBillDO = new KafkaBillDO();
kafkaBillDO.setClusterId(clusterId);
kafkaBillDO.setTopicName(topicName);
kafkaBillDO.setPrincipal(topicNamePrincipalList.getOrDefault(topicName, Constant.UNKNOWN_USER));
Double quotaUnitMB = maxAvgBytesIn * kafkaBillConfig.getQuotaRatio() / 1024.0 / 1024.0;
kafkaBillDO.setQuota(quotaUnitMB);
kafkaBillDO.setCost(quotaUnitMB * kafkaBillConfig.getPriseUnitMB());
kafkaBillDO.setGmtDay(gmtDay);
kafkaBillService.replace(kafkaBillDO);
} catch (Exception e) {
LOGGER.error("cal and update bill failed, clusterId:{}, startTime:{}, endTime:{}, gmtDay:{}.",
clusterId, startTime, endTime, gmtDay, e);
}
}
}
}

View File

@@ -0,0 +1,84 @@
package com.xiaojukeji.kafka.manager.task.dispatch.biz;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.BrokerService;
import com.xiaojukeji.kafka.manager.service.service.ConfigService;
import com.xiaojukeji.kafka.manager.service.service.RegionService;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import com.xiaojukeji.kafka.manager.task.config.RegionCapacityConfig;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
/**
* 计算Region容量
* @author zengqiao
* @date 20/6/30
*/
@CustomScheduled(name = "calRegionCapacity", cron = "0 0 0/12 * * ?", threadNum = 1)
public class CalRegionCapacity extends AbstractScheduledTask<RegionDO> {
@Autowired
private RegionService regionService;
@Autowired
private BrokerService brokerService;
@Autowired
private ConfigService configService;
private static final String REGION_CAPACITY_CONFIG_KEY = "REGION_CAPACITY_CONFIG";
@Override
protected List<RegionDO> listAllTasks() {
return regionService.listAll();
}
@Override
public void processTask(RegionDO regionDO) {
List<RegionCapacityConfig> configList
= configService.getArrayByKey(REGION_CAPACITY_CONFIG_KEY, RegionCapacityConfig.class);
if (ValidateUtils.isNull(configList)) {
configList = new ArrayList<>();
}
Map<Long, RegionCapacityConfig> configMap = new HashMap<>();
for (RegionCapacityConfig elem: configList) {
configMap.put(elem.getClusterId(), elem);
}
calAndUpdateRegionCapacity(
regionDO,
configMap.getOrDefault(regionDO.getClusterId(), new RegionCapacityConfig())
);
}
private void calAndUpdateRegionCapacity(RegionDO regionDO, RegionCapacityConfig capacityConfig) {
List<Integer> brokerIdList = ListUtils.string2IntList(regionDO.getBrokerList());
long notAliveBrokerNum = PhysicalClusterMetadataManager.getNotAliveBrokerNum(regionDO.getClusterId(), brokerIdList);
// 默认的容量是brokerNum * capacity
regionDO.setCapacity(capacityConfig.getMaxCapacityUnitB() * brokerIdList.size());
if (notAliveBrokerNum > 1) {
// 挂掉的机器数大于1, 容量计算失败
regionDO.setCapacity(-1L);
}
Double sumMaxAvgBytesIn = 0.0;
for (Integer brokerId: brokerIdList) {
sumMaxAvgBytesIn += brokerService.calBrokerMaxAvgBytesIn(
regionDO.getClusterId(),
brokerId,
capacityConfig.getDuration(),
new Date(System.currentTimeMillis() - capacityConfig.getLatestTimeUnitMs()),
new Date()
);
}
regionDO.setRealUsed(sumMaxAvgBytesIn.longValue());
regionDO.setEstimateUsed(sumMaxAvgBytesIn.longValue());
regionService.updateCapacityById(regionDO);
}
}

View File

@@ -0,0 +1,96 @@
package com.xiaojukeji.kafka.manager.task.dispatch.biz;
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicStatisticsDO;
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
import com.xiaojukeji.kafka.manager.service.service.TopicService;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* 定时计算Topic的统计数据
* 1. 统计今天的maxAvgBytesIn
* 2. 统计今天的sum(offset)
* @author zengqiao
* @date 20/3/29
*/
@CustomScheduled(name = "calTopicStatistics", cron = "0 0 0/4 * * ?", threadNum = 5)
public class CalTopicStatistics extends AbstractScheduledTask<ClusterDO> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private ClusterService clusterService;
@Autowired
private TopicService topicService;
@Autowired
private TopicManagerService topicManagerService;
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
// 获取今天的起始时间和结束时间
Date startTime = new Date(DateUtils.getDayStarTime(0));
Date endTime = new Date(DateUtils.getDayStarTime(1) - 1);
String gmtDay = DateUtils.getFormattedDate(System.currentTimeMillis());
for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) {
try {
calTopicStatistics(clusterDO, topicName, startTime, endTime, gmtDay);
} catch (Exception e) {
LOGGER.error("cal topic metrics failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName);
}
}
}
private void calTopicStatistics(ClusterDO clusterDO,
String topicName,
Date startTime,
Date endTime,
String gmtDay) {
TopicStatisticsDO dataDO = topicManagerService.getByTopicAndDay(clusterDO.getId(), topicName, gmtDay);
if (dataDO == null) {
dataDO = new TopicStatisticsDO();
}
// 获取offset
Map<TopicPartition, Long> offsetMap =
topicService.getPartitionOffset(clusterDO, topicName, OffsetPosEnum.END);
Long offsetSum = null;
if (!ValidateUtils.isEmptyMap(offsetMap)) {
offsetSum = 0L;
for (Long offset: offsetMap.values()) {
offsetSum += offset;
}
}
// 获取mxAvgBytesIn
Double maxAvgBytesIn = topicService.getMaxAvgBytesInFromDB(clusterDO.getId(), topicName, startTime, endTime);
dataDO.setClusterId(clusterDO.getId());
dataDO.setTopicName(topicName);
dataDO.setGmtDay(gmtDay);
dataDO.setMaxAvgBytesIn(ValidateUtils.isNull(maxAvgBytesIn)? dataDO.getMaxAvgBytesIn(): maxAvgBytesIn);
dataDO.setOffsetSum(offsetSum == null? dataDO.getOffsetSum(): offsetSum);
topicManagerService.replaceTopicStatistics(dataDO);
}
}

View File

@@ -0,0 +1,115 @@
package com.xiaojukeji.kafka.manager.task.dispatch.biz;
import com.xiaojukeji.kafka.manager.common.bizenum.DBStatusEnum;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.BrokerService;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
/**
* 刷新BrokerTable数据
* 1. 依赖DB计算Broker的峰值均值流量
* 2. 数据更新到DB
*
* @author zengqiao
* @date 20/6/2
*/
@CustomScheduled(name = "flushBrokerTable", cron = "0 0 0/1 * * ?", threadNum = 1)
public class FlushBrokerTable extends AbstractScheduledTask<ClusterDO> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private BrokerService brokerService;
@Autowired
private ClusterService clusterService;
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
Integer duration = 10;
Date endTime = new Date();
Date startTime = new Date(endTime.getTime() - 24 * 60 * 60 * 1000);
Map<Long, Map<Integer, BrokerDO>> allBrokerMap = getBrokerMap();
try {
execute(clusterDO.getId(),
duration,
startTime,
endTime,
allBrokerMap.getOrDefault(clusterDO.getId(), new HashMap<>(0))
);
} catch (Exception e) {
LOGGER.error("flush broker table failed, clusterId:{}.", clusterDO.getId(), e);
}
}
private void execute(Long clusterId,
Integer duration,
Date startTime,
Date endTime,
Map<Integer, BrokerDO> brokerMap) {
for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterId)) {
BrokerMetadata brokerMetadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
if (ValidateUtils.isNull(brokerMetadata)) {
continue;
}
// 获取信息
Double maxAvgBytesIn =
brokerService.calBrokerMaxAvgBytesIn(clusterId, brokerId, duration, startTime, endTime);
BrokerDO brokerDO = brokerMap.get(brokerId);
if (ValidateUtils.isNull(brokerDO)) {
brokerDO = new BrokerDO();
}
brokerDO.setClusterId(brokerMetadata.getClusterId());
brokerDO.setBrokerId(brokerMetadata.getBrokerId());
brokerDO.setHost(brokerMetadata.getHost());
brokerDO.setPort(brokerMetadata.getPort());
brokerDO.setTimestamp(brokerMetadata.getTimestamp());
brokerDO.setStatus(DBStatusEnum.ALIVE.getStatus());
brokerDO.setMaxAvgBytesIn(maxAvgBytesIn);
brokerService.replace(brokerDO);
}
for (Map.Entry<Integer, BrokerDO> entry: brokerMap.entrySet()) {
BrokerDO brokerDO = entry.getValue();
if (brokerDO.getStatus().equals(DBStatusEnum.ALIVE.getStatus())) {
continue;
}
brokerService.replace(brokerDO);
}
}
private Map<Long, Map<Integer, BrokerDO>> getBrokerMap() {
List<BrokerDO> brokerDOList = brokerService.listAll();
if (ValidateUtils.isNull(brokerDOList)) {
brokerDOList = new ArrayList<>();
}
Map<Long, Map<Integer, BrokerDO>> brokerMap = new HashMap<>();
for (BrokerDO brokerDO: brokerDOList) {
// 默认修改为DEAD先, 如果ALIVE则修改为ALIVE
brokerDO.setStatus(DBStatusEnum.DEAD.getStatus());
Map<Integer, BrokerDO> subBrokerMap = brokerMap.getOrDefault(brokerDO.getClusterId(), new HashMap<>());
subBrokerMap.put(brokerDO.getBrokerId(), brokerDO);
brokerMap.put(brokerDO.getClusterId(), subBrokerMap);
}
return brokerMap;
}
}

View File

@@ -0,0 +1,128 @@
package com.xiaojukeji.kafka.manager.task.dispatch.biz;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicExpiredDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicStatisticsDO;
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.dao.TopicExpiredDao;
import com.xiaojukeji.kafka.manager.dao.TopicStatisticsDao;
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 定时更新过期Topic
* @author zengqiao
* @date 20/4/1
*/
@CustomScheduled(name = "flushExpiredTopic", cron = "0 0 0/5 * * ?", threadNum = 1)
public class FlushExpiredTopic extends AbstractScheduledTask<ClusterDO> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private TopicExpiredDao topicExpiredDao;
@Autowired
private TopicStatisticsDao topicStatisticsDao;
@Autowired
private ClusterService clusterService;
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
Date startTime = new Date(DateUtils.getDayStarTime(-1 * Constant.DEFAULT_MAX_CAL_TOPIC_EXPIRED_DAY));
Date endTime = new Date(DateUtils.getDayStarTime(1) - 1000);
for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) {
try {
checkAndModifyIfTopicExpired(clusterDO, topicName, startTime, endTime);
} catch (Exception e) {
LOGGER.error("check topic expired failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e);
}
}
}
private void checkAndModifyIfTopicExpired(ClusterDO clusterDO, String topicName, Date startTime, Date endTime) {
TopicExpiredDO expiredDO = topicExpiredDao.getByTopic(clusterDO.getId(), topicName);
if (ValidateUtils.isNull(expiredDO)) {
expiredDO = new TopicExpiredDO();
expiredDO.setClusterId(clusterDO.getId());
expiredDO.setTopicName(topicName);
expiredDO.setGmtRetain(startTime);
expiredDO.setStatus(0);
}
if (checkAndModifyIfExistMetrics(clusterDO, topicName)) {
// Topic 未过期
expiredDO.setExpiredDay(0);
topicExpiredDao.replace(expiredDO);
return;
}
List<TopicStatisticsDO> doList = topicStatisticsDao.getTopicStatistic(clusterDO.getId(), topicName, startTime, endTime);
if (ValidateUtils.isEmptyList(doList)) {
// Topic 未过期
expiredDO.setExpiredDay(0);
topicExpiredDao.replace(expiredDO);
return;
}
Map<String, Long> dayOffsetSumMap = new HashMap<>();
for (TopicStatisticsDO elem: doList) {
dayOffsetSumMap.put(elem.getGmtDay(), elem.getOffsetSum());
}
Long todayTimestamp = endTime.getTime();
Long todayOffsetSum = dayOffsetSumMap.get(DateUtils.getFormattedDate(todayTimestamp));
if (ValidateUtils.isNull(todayOffsetSum) || todayOffsetSum.equals(Constant.INVALID_CODE)) {
// 今天是否过期还未知
expiredDO.setExpiredDay(0);
topicExpiredDao.replace(expiredDO);
return;
}
int expiredDay = 0;
for (int i = -1 * Constant.DEFAULT_MAX_CAL_TOPIC_EXPIRED_DAY; i <= 0; ++i) {
String gmtDay = DateUtils.getFormattedDate(DateUtils.getDayStarTime(i));
Long dayOffsetSum = dayOffsetSumMap.get(gmtDay);
if (todayOffsetSum.equals(dayOffsetSum)) {
expiredDay = (i * -1);
break;
}
}
expiredDO.setExpiredDay(expiredDay);
topicExpiredDao.replace(expiredDO);
}
private boolean checkAndModifyIfExistMetrics(ClusterDO clusterDO, String topicName) {
TopicMetrics metrics = KafkaMetricsCache.getTopicMetricsFromCache(clusterDO.getId(), topicName);
if (ValidateUtils.isNull(metrics)) {
return false;
}
Double bytesIn = metrics.getBytesInPerSecOneMinuteRate(null);
if (ValidateUtils.isNull(bytesIn) || bytesIn < 1.0) {
return false;
}
// 流量大于 1.0, 则认为是有数据的, 不过期
return true;
}
}

View File

@@ -0,0 +1,62 @@
package com.xiaojukeji.kafka.manager.task.dispatch.biz;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterTaskDO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.kcm.ClusterTaskService;
import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskStateEnum;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import com.xiaojukeji.kafka.manager.task.component.EmptyEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Arrays;
import java.util.List;
/**
* @author zengqiao
* @date 20/9/7
*/
@CustomScheduled(name = "syncClusterTaskState", cron = "0 0/1 * * * ?", threadNum = 1)
public class SyncClusterTaskState extends AbstractScheduledTask<EmptyEntry> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private ClusterTaskService clusterTaskService;
@Override
public List<EmptyEntry> listAllTasks() {
EmptyEntry emptyEntry = new EmptyEntry();
emptyEntry.setId(System.currentTimeMillis() / 1000);
return Arrays.asList(emptyEntry);
}
@Override
public void processTask(EmptyEntry entryEntry) {
List<ClusterTaskDO> doList = clusterTaskService.listAll();
if (ValidateUtils.isEmptyList(doList)) {
return;
}
for (ClusterTaskDO clusterTaskDO: doList) {
if (ClusterTaskStateEnum.FINISHED.getCode().equals(clusterTaskDO.getTaskStatus())) {
continue;
}
try {
syncAndUpdateClusterTaskState(clusterTaskDO);
} catch (Exception e) {
LOGGER.error("sync cluster task state failed, clusterTask:{}.", clusterTaskDO, e);
}
}
}
private void syncAndUpdateClusterTaskState(ClusterTaskDO clusterTaskDO) {
ClusterTaskStateEnum stateEnum = clusterTaskService.getTaskState(clusterTaskDO.getAgentTaskId());
if (ValidateUtils.isNull(stateEnum)) {
return;
}
clusterTaskService.updateTaskState(clusterTaskDO.getId(), stateEnum.getCode());
}
}

View File

@@ -0,0 +1,172 @@
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.collect;
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroupDTO;
import com.xiaojukeji.kafka.manager.common.entity.metrics.ConsumerMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.events.ConsumerMetricsCollectedEvent;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.cache.ThreadPool;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
import com.xiaojukeji.kafka.manager.service.service.TopicService;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
/**
* @author zengqiao
* @date 20/9/14
*/
@CustomScheduled(name = "newCollectAndPublishCGData", cron = "30 0/1 * * * *", threadNum = 10)
public class CollectAndPublishCGData extends AbstractScheduledTask<ClusterDO> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private TopicService topicService;
@Autowired
private ClusterService clusterService;
@Autowired
private ConsumerService consumerService;
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
LOGGER.info("collect consumer-group metrics start, clusterId:{}.", clusterDO.getId());
long startTime = System.currentTimeMillis();
try {
collectData(clusterDO, startTime);
} catch (Throwable t) {
LOGGER.error("collect consumer-group metrics failed, clusterId:{}.", clusterDO.getId(), t);
}
LOGGER.info("collect consumer-group metrics finish, clusterId:{} costTime:{}."
, clusterDO.getId(), System.currentTimeMillis() - startTime);
}
private void collectData(ClusterDO clusterDO, long startTimeUnitMs) {
List<String> topicNameList = PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId());
if (ValidateUtils.isEmptyList(topicNameList)) {
// 重试
topicNameList = PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId());
}
if (ValidateUtils.isEmptyList(topicNameList)) {
return;
}
FutureTask<List<ConsumerMetrics>>[] taskList = new FutureTask[topicNameList.size()];
for (int i = 0; i < topicNameList.size(); ++i) {
final String topicName = topicNameList.get(i);
taskList[i] = new FutureTask<List<ConsumerMetrics>>(new Callable<List<ConsumerMetrics>>() {
@Override
public List<ConsumerMetrics> call() throws Exception {
return getTopicConsumerMetrics(clusterDO, topicName, startTimeUnitMs);
}
});
ThreadPool.submitCollectMetricsTask(taskList[i]);
}
List<ConsumerMetrics> consumerMetricsList = new ArrayList<>();
for (int i = 0; i < taskList.length; ++i) {
try {
List<ConsumerMetrics> metricsList = taskList[i].get();
if (ValidateUtils.isEmptyList(metricsList)) {
continue;
}
consumerMetricsList.addAll(metricsList);
} catch (Exception e) {
LOGGER.error("collect consumer-group metrics failed, clusterId:{} topicName:{}.",
clusterDO.getId(), topicNameList.get(i), e
);
}
}
SpringTool.publish(new ConsumerMetricsCollectedEvent(this, consumerMetricsList));
}
private List<ConsumerMetrics> getTopicConsumerMetrics(ClusterDO clusterDO,
String topicName,
long startTimeUnitMs) {
List<ConsumerGroupDTO> consumerGroupDTOList = consumerService.getConsumerGroupList(clusterDO.getId(), topicName);
if (ValidateUtils.isEmptyList(consumerGroupDTOList)) {
// 重试
consumerGroupDTOList = consumerService.getConsumerGroupList(clusterDO.getId(), topicName);
}
if (ValidateUtils.isEmptyList(consumerGroupDTOList)) {
return new ArrayList<>();
}
List<ConsumerMetrics> metricsList = new ArrayList<>();
Map<TopicPartition, Long> offsetMap = topicService.getPartitionOffset(clusterDO, topicName, OffsetPosEnum.END);
if (ValidateUtils.isEmptyMap(offsetMap)) {
// 重试
offsetMap = topicService.getPartitionOffset(clusterDO, topicName, OffsetPosEnum.END);
}
if (ValidateUtils.isEmptyMap(offsetMap)) {
LOGGER.error("collect consumer-group metrics failed, partition offset is empty, clusterId:{} topicName:{}.",
clusterDO.getId(), topicName
);
return new ArrayList<>();
}
Map<Integer, Long> partitionOffsetMap = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry: offsetMap.entrySet()) {
partitionOffsetMap.put(entry.getKey().partition(), entry.getValue());
}
for (ConsumerGroupDTO consumerGroupDTO: consumerGroupDTOList) {
try {
ConsumerMetrics consumerMetrics =
getTopicConsumerMetrics(clusterDO, topicName, consumerGroupDTO, partitionOffsetMap, startTimeUnitMs);
if (ValidateUtils.isNull(consumerMetrics)) {
continue;
}
metricsList.add(consumerMetrics);
} catch (Exception e) {
LOGGER.error("collect consumer-group metrics failed, clusterId:{} topicName:{} consumerGroup:{}.",
clusterDO.getId(), topicName, consumerGroupDTO.getConsumerGroup(), e
);
}
}
return metricsList;
}
private ConsumerMetrics getTopicConsumerMetrics(ClusterDO clusterDO,
String topicName,
ConsumerGroupDTO consumerGroupDTO,
Map<Integer, Long> partitionOffsetMap,
long startTimeUnitMs) {
Map<Integer, Long> consumerOffsetMap =
consumerService.getConsumerOffset(clusterDO, topicName, consumerGroupDTO);
if (ValidateUtils.isEmptyMap(consumerOffsetMap)) {
return null;
}
ConsumerMetrics metrics = new ConsumerMetrics();
metrics.setClusterId(clusterDO.getId());
metrics.setTopicName(topicName);
metrics.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
metrics.setLocation(consumerGroupDTO.getOffsetStoreLocation().location);
metrics.setPartitionOffsetMap(partitionOffsetMap);
metrics.setConsumeOffsetMap(consumerOffsetMap);
metrics.setTimestampUnitMs(startTimeUnitMs);
return metrics;
}
}

View File

@@ -0,0 +1,50 @@
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.collect;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicThrottledMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.ThrottleService;
import com.xiaojukeji.kafka.manager.task.common.TopicThrottledMetricsCollectedEvent;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
/**
* @author zengqiao
* @date 2019-05-10
*/
@CustomScheduled(name = "collectAndPublishTopicThrottledMetrics", cron = "11 0/1 * * * ?", threadNum = 5)
public class CollectAndPublishTopicThrottledMetrics extends AbstractScheduledTask<ClusterDO> {
@Autowired
private ClusterService clusterService;
@Autowired
private ThrottleService throttleService;
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
long startTime = System.currentTimeMillis();
List<TopicThrottledMetrics> metricsList = throttleService.getThrottledTopicsFromJmx(
clusterDO.getId(),
new HashSet<>(PhysicalClusterMetadataManager.getBrokerIdList(clusterDO.getId())),
Arrays.asList(KafkaClientEnum.values())
);
if (ValidateUtils.isNull(metricsList)) {
return;
}
SpringTool.publish(new TopicThrottledMetricsCollectedEvent(this, startTime, metricsList));
}
}

View File

@@ -0,0 +1,117 @@
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.delete;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
import com.xiaojukeji.kafka.manager.dao.*;
import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import com.xiaojukeji.kafka.manager.task.component.EmptyEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
/**
* 定期删除Metrics信息
* @author zengqiao
* @date 20/1/8
*/
@CustomScheduled(name = "deleteMetrics", cron = "0 0/1 * * * ?", threadNum = 1)
public class DeleteMetrics extends AbstractScheduledTask<EmptyEntry> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private TopicMetricsDao topicMetricsDao;
@Autowired
private TopicAppMetricsDao topicAppMetricsDao;
@Autowired
private TopicRequestMetricsDao topicRequestMetricsDao;
@Autowired
private BrokerMetricsDao brokerMetricsDao;
@Autowired
private ClusterMetricsDao clusterMetricsDao;
@Autowired
private ConfigUtils configUtils;
@Override
public List<EmptyEntry> listAllTasks() {
EmptyEntry emptyEntry = new EmptyEntry();
emptyEntry.setId(System.currentTimeMillis() / 1000);
return Arrays.asList(emptyEntry);
}
@Override
public void processTask(EmptyEntry entryEntry) {
if (!"dev".equals(configUtils.getKafkaManagerEnv())) {
// 非预发&线上环境直接跳过
return;
}
long startTime = System.currentTimeMillis();
LOGGER.info("start delete metrics");
try {
deleteTopicMetrics();
} catch (Exception e) {
LOGGER.error("delete topic metrics failed.", e);
}
try {
deleteTopicAppMetrics();
} catch (Exception e) {
LOGGER.error("delete topic app metrics failed.", e);
}
try {
deleteTopicRequestMetrics();
} catch (Exception e) {
LOGGER.error("delete topic request metrics failed.", e);
}
try {
deleteBrokerMetrics();
} catch (Exception e) {
LOGGER.error("delete broker metrics failed.", e);
}
try {
deleteClusterMetrics();
} catch (Exception e) {
LOGGER.error("delete cluster metrics failed.", e);
}
LOGGER.info("finish delete metrics, costTime:{}ms.", System.currentTimeMillis() - startTime);
}
private void deleteTopicMetrics() {
Date endTime = new Date(System.currentTimeMillis() - 3 * 24 * 60 * 60 * 1000);
topicMetricsDao.deleteBeforeTime(endTime);
}
private void deleteTopicAppMetrics() {
Date endTime = new Date(System.currentTimeMillis() - 3 * 24 * 60 * 60 * 1000);
topicAppMetricsDao.deleteBeforeTime(endTime);
}
private void deleteTopicRequestMetrics() {
Date endTime = new Date(System.currentTimeMillis() - 3 * 24 * 60 * 60 * 1000);
topicRequestMetricsDao.deleteBeforeTime(endTime);
}
private void deleteBrokerMetrics() {
Date endTime = new Date(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 1000);
brokerMetricsDao.deleteBeforeTime(endTime);
}
private void deleteClusterMetrics() {
Date endTime = new Date(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 1000);
clusterMetricsDao.deleteBeforeTime(endTime);
}
}

View File

@@ -0,0 +1,132 @@
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
import com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.monitor.common.MonitorSinkConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
import com.xiaojukeji.kafka.manager.monitor.common.entry.sink.MonitorTopicSinkTag;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.monitor.component.AbstractMonitorService;
import com.xiaojukeji.kafka.manager.monitor.common.entry.MetricSinkPoint;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.LogicalClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author zengqiao
* @date 20/8/10
*/
@CustomScheduled(name = "sinkCommunityTopicMetrics2Monitor", cron = "1 0/1 * * * ?", threadNum = 5)
public class SinkCommunityTopicMetrics2Monitor extends AbstractScheduledTask<ClusterDO> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private AbstractMonitorService abstractMonitor;
@Autowired
private ClusterService clusterService;
@Autowired
private LogicalClusterMetadataManager logicalClusterMetadataManager;
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
long startTime = System.currentTimeMillis();
try {
sink2Monitor(clusterDO.getId(), startTime / 1000);
} catch (Throwable t) {
LOGGER.error("sink topic metrics failed, clusterId:{}.", clusterDO.getId(), t);
}
}
private void sink2Monitor(Long clusterId, Long now) throws Exception {
// 上报Topic指标
List<MetricSinkPoint> metricSinkPoints = new ArrayList<>();
for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterId)) {
LogicalClusterDO logicalClusterDO =
logicalClusterMetadataManager.getTopicLogicalCluster(clusterId, topicName);
if (ValidateUtils.isNull(logicalClusterDO)) {
continue;
}
TopicMetrics metrics = KafkaMetricsCache.getTopicMetricsFromCache(clusterId, topicName);
if (ValidateUtils.isNull(metrics)) {
continue;
}
metricSinkPoints.addAll(recordTopics(now, logicalClusterDO.getName(), metrics));
if (metricSinkPoints.size() > MonitorSinkConstant.MONITOR_SYSTEM_SINK_THRESHOLD) {
abstractMonitor.sinkMetrics(metricSinkPoints);
metricSinkPoints.clear();
}
}
if (metricSinkPoints.isEmpty()) {
return;
}
abstractMonitor.sinkMetrics(metricSinkPoints);
}
private static List<MetricSinkPoint> recordTopics(long timestamp,
String logicalClusterName,
TopicMetrics metrics) {
if (ValidateUtils.isNull(logicalClusterName) || ValidateUtils.isNull(metrics)) {
return new ArrayList<>();
}
return Arrays.asList(
new MetricSinkPoint(
MonitorMetricNameEnum.TOPIC_MSG_IN.getMetricName(),
metrics.getMessagesInPerSecOneMinuteRate(0.0),
MonitorSinkConstant.MONITOR_SYSTEM_SINK_STEP,
timestamp,
new MonitorTopicSinkTag(
MonitorSinkConstant.MONITOR_SYSTEM_TAG_DEFAULT_HOST,
logicalClusterName,
metrics.getTopicName()
)
),
new MetricSinkPoint(
MonitorMetricNameEnum.TOPIC_BYTES_IN.getMetricName(),
metrics.getBytesInPerSecOneMinuteRate(0.0),
MonitorSinkConstant.MONITOR_SYSTEM_SINK_STEP,
timestamp,
new MonitorTopicSinkTag(
MonitorSinkConstant.MONITOR_SYSTEM_TAG_DEFAULT_HOST,
logicalClusterName,
metrics.getTopicName()
)
),
new MetricSinkPoint(
MonitorMetricNameEnum.TOPIC_BYTES_REJECTED.getMetricName(),
metrics.getBytesRejectedPerSecOneMinuteRate(0.0),
MonitorSinkConstant.MONITOR_SYSTEM_SINK_STEP,
timestamp,
new MonitorTopicSinkTag(
MonitorSinkConstant.MONITOR_SYSTEM_TAG_DEFAULT_HOST,
logicalClusterName,
metrics.getTopicName()
)
)
);
}
}

View File

@@ -0,0 +1,134 @@
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
import com.xiaojukeji.kafka.manager.common.entity.metrics.ClusterMetrics;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.dao.BrokerMetricsDao;
import com.xiaojukeji.kafka.manager.dao.ClusterMetricsDao;
import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerMetricsDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterMetricsDO;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.JmxService;
import com.xiaojukeji.kafka.manager.service.strategy.AbstractHealthScoreStrategy;
import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Broker指标信息存DB, Broker流量, 集群流量
* @author zengqiao
* @date 20/5/7
*/
@CustomScheduled(name = "storeBrokerMetrics", cron = "21 0/1 * * * ?", threadNum = 2)
public class StoreBrokerMetrics extends AbstractScheduledTask<ClusterDO> {
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private JmxService jmxService;
@Autowired
private ClusterService clusterService;
@Autowired
private BrokerMetricsDao brokerMetricsDao;
@Autowired
private ClusterMetricsDao clusterMetricsDao;
@Autowired
private AbstractHealthScoreStrategy healthScoreStrategy;
private static final Integer INSERT_BATCH_SIZE = 100;
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
long startTime = System.currentTimeMillis();
List<ClusterMetrics> clusterMetricsList = new ArrayList<>();
try {
List<BrokerMetrics> brokerMetricsList = getAndBatchAddMetrics(startTime, clusterDO.getId());
clusterMetricsList.add(supplyAndConvert2ClusterMetrics(
clusterDO.getId(),
MetricsConvertUtils.merge2BaseMetricsByAdd(brokerMetricsList))
);
} catch (Throwable t) {
LOGGER.error("collect failed, clusterId:{}.", clusterDO.getId(), t);
}
long endTime = System.currentTimeMillis();
LOGGER.info("collect finish, clusterId:{} costTime:{}", clusterDO.getId(), endTime - startTime);
List<ClusterMetricsDO> doList = MetricsConvertUtils.convertAndUpdateCreateTime2ClusterMetricsDOList(
startTime,
clusterMetricsList
);
clusterMetricsDao.batchAdd(doList);
}
private List<BrokerMetrics> getAndBatchAddMetrics(Long startTime, Long clusterId) {
List<BrokerMetrics> metricsList = new ArrayList<>();
for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterId)) {
BrokerMetrics metrics = jmxService.getBrokerMetrics(
clusterId,
brokerId,
KafkaMetricsCollections.BROKER_TO_DB_METRICS
);
if (ValidateUtils.isNull(metrics)) {
continue;
}
metrics.getMetricsMap().put(
JmxConstant.HEALTH_SCORE,
healthScoreStrategy.calBrokerHealthScore(clusterId, brokerId, metrics)
);
metricsList.add(metrics);
}
if (ValidateUtils.isEmptyList(metricsList)) {
return new ArrayList<>();
}
List<BrokerMetricsDO> doList =
MetricsConvertUtils.convertAndUpdateCreateTime2BrokerMetricsDOList(startTime, metricsList);
int i = 0;
do {
brokerMetricsDao.batchAdd(doList.subList(i, Math.min(i + INSERT_BATCH_SIZE, doList.size())));
i += INSERT_BATCH_SIZE;
} while (i < doList.size());
return metricsList;
}
private ClusterMetrics supplyAndConvert2ClusterMetrics(Long clusterId, BaseMetrics baseMetrics) {
ClusterMetrics metrics = new ClusterMetrics(clusterId);
Map<String, Object> metricsMap = metrics.getMetricsMap();
metricsMap.putAll(baseMetrics.getMetricsMap());
metricsMap.put(JmxConstant.TOPIC_NUM, PhysicalClusterMetadataManager.getTopicNameList(clusterId).size());
metricsMap.put(JmxConstant.BROKER_NUM, PhysicalClusterMetadataManager.getBrokerIdList(clusterId).size());
Integer partitionNum = 0;
for (String topicName : PhysicalClusterMetadataManager.getTopicNameList(clusterId)) {
TopicMetadata topicMetaData = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName);
if (ValidateUtils.isNull(topicMetaData)) {
continue;
}
partitionNum += topicMetaData.getPartitionNum();
}
metricsMap.put(JmxConstant.PARTITION_NUM, partitionNum);
return metrics;
}
}

View File

@@ -0,0 +1,52 @@
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.events.TopicMetricsCollectedEvent;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.JmxService;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
/**
* Topic社区指标存储
* @author zengqiao
* @date 20/7/21
*/
@CustomScheduled(name = "storeCommunityTopicMetrics", cron = "31 0/1 * * * ?", threadNum = 5)
public class StoreCommunityTopicMetrics extends AbstractScheduledTask<ClusterDO> {
@Autowired
private JmxService jmxService;
@Autowired
private ClusterService clusterService;
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
List<TopicMetrics> metricsList = getTopicMetrics(clusterDO.getId());
SpringTool.publish(new TopicMetricsCollectedEvent(this, clusterDO.getId(), metricsList));
}
private List<TopicMetrics> getTopicMetrics(Long clusterId) {
List<TopicMetrics> metricsList =
jmxService.getTopicMetrics(clusterId, KafkaMetricsCollections.TOPIC_METRICS_TO_DB, true);
if (ValidateUtils.isEmptyList(metricsList)) {
KafkaMetricsCache.putTopicMetricsToCache(clusterId, new ArrayList<>());
return new ArrayList<>();
}
KafkaMetricsCache.putTopicMetricsToCache(clusterId, metricsList);
return metricsList;
}
}

View File

@@ -0,0 +1,70 @@
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.dao.TopicAppMetricsDao;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.JmxService;
import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.*;
/**
* @author zengqiao
* @date 20/7/21
*/
@CustomScheduled(name = "storeDiDiAppTopicMetrics", cron = "41 0/1 * * * ?", threadNum = 5)
public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask<ClusterDO> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private JmxService jmxService;
@Autowired
private ClusterService clusterService;
@Autowired
private TopicAppMetricsDao topicAppMetricsDao;
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
long startTime = System.currentTimeMillis();
try {
getAndBatchAddTopicAppMetrics(startTime, clusterDO.getId());
} catch (Throwable t) {
LOGGER.error("save topic metrics failed, clusterId:{}.", clusterDO.getId(), t);
}
}
private void getAndBatchAddTopicAppMetrics(Long startTime, Long clusterId) {
List<TopicMetrics> metricsList =
jmxService.getTopicAppMetrics(clusterId, KafkaMetricsCollections.APP_TOPIC_METRICS_TO_DB);
if (ValidateUtils.isEmptyList(metricsList)) {
return;
}
List<TopicMetricsDO> doList =
MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList);
int i = 0;
do {
topicAppMetricsDao.batchAdd(doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())));
i += Constant.BATCH_INSERT_SIZE;
} while (i < doList.size());
}
}

View File

@@ -0,0 +1,74 @@
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.JmxService;
import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
/**
* @author zengqiao
* @date 20/7/21
*/
@CustomScheduled(name = "storeDiDiTopicRequestTimeMetrics", cron = "51 0/1 * * * ?", threadNum = 5)
public class StoreDiDiTopicRequestTimeMetrics extends AbstractScheduledTask<ClusterDO> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private JmxService jmxService;
@Autowired
private ClusterService clusterService;
@Autowired
private TopicRequestMetricsDao topicRequestMetricsDao;
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
long startTime = System.currentTimeMillis();
try {
LOGGER.info("save topic metrics, clusterId:{}, start.", clusterDO.getId());
getAndBatchAddTopicRequestTimeMetrics(startTime, clusterDO.getId());
LOGGER.info("save topic metrics, clusterId:{}, end costTime:{}.", clusterDO.getId(), System.currentTimeMillis() - startTime);
} catch (Throwable t) {
LOGGER.error("save topic metrics failed, clusterId:{}.", clusterDO.getId(), t);
}
}
private void getAndBatchAddTopicRequestTimeMetrics(Long startTime, Long clusterId) {
LOGGER.info("save topic metrics, clusterId:{}, collect start.", clusterId);
List<TopicMetrics> metricsList =
jmxService.getTopicMetrics(clusterId, KafkaMetricsCollections.TOPIC_REQUEST_TIME_METRICS_TO_DB, false);
LOGGER.info("save topic metrics, clusterId:{}, collect end costTime:{} .", clusterId, System.currentTimeMillis() - startTime);
if (ValidateUtils.isEmptyList(metricsList)) {
return;
}
List<TopicMetricsDO> doList =
MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList);
int i = 0;
do {
topicRequestMetricsDao.batchAdd(doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())));
i += Constant.BATCH_INSERT_SIZE;
} while (i < doList.size());
}
}

View File

@@ -0,0 +1,192 @@
package com.xiaojukeji.kafka.manager.task.dispatch.op;
import com.alibaba.fastjson.JSON;
import com.xiaojukeji.kafka.manager.bpm.OrderService;
import com.xiaojukeji.kafka.manager.bpm.common.OrderStatusEnum;
import com.xiaojukeji.kafka.manager.bpm.common.OrderTypeEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.constant.SystemCodeConstant;
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.CreateTopicElemConfig;
import com.xiaojukeji.kafka.manager.bpm.common.entry.apply.OrderExtensionApplyTopicDTO;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.*;
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.*;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import com.xiaojukeji.kafka.manager.task.component.EmptyEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
* @author zengqiao
* @date 20/7/28
*/
@Component
@CustomScheduled(name = "autoHandleTopicOrder", cron = "0 0/1 * * * ?", threadNum = 1)
public class AutoHandleTopicOrder extends AbstractScheduledTask<EmptyEntry> {
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private ConfigService configService;
@Autowired
private OrderService orderService;
@Autowired
private RegionService regionService;
@Autowired
private AdminService adminService;
@Autowired
private ClusterService clusterService;
@Autowired
private LogicalClusterMetadataManager logicalClusterMetadataManager;
@Override
public List<EmptyEntry> listAllTasks() {
EmptyEntry emptyEntry = new EmptyEntry();
emptyEntry.setId(System.currentTimeMillis() / 1000);
return Arrays.asList(emptyEntry);
}
@Override
public void processTask(EmptyEntry entryEntry) {
List<OrderDO> doList = orderService.getWaitDealOrder();
if (ValidateUtils.isEmptyList(doList)) {
return ;
}
for (OrderDO orderDO: doList) {
if (!OrderTypeEnum.APPLY_TOPIC.getCode().equals(orderDO.getType())) {
continue;
}
try {
if (!handleApplyTopicOrder(orderDO)) {
continue;
}
return;
} catch (Exception e) {
LOGGER.error("handle apply topic order failed, orderDO:{}.", orderDO, e);
}
}
}
private boolean handleApplyTopicOrder(OrderDO orderDO) {
OrderExtensionApplyTopicDTO dto = JSON.parseObject(orderDO.getExtensions(), OrderExtensionApplyTopicDTO.class);
Long physicalClusterId =
logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId(), dto.isPhysicalClusterId());
CreateTopicElemConfig createConfig =
configService.getCreateTopicConfig(physicalClusterId, SystemCodeConstant.KAFKA_MANAGER);
if (ValidateUtils.isNull(createConfig)) {
return false;
}
if (dto.getPeakBytesIn() > createConfig.getAutoExecMaxPeakBytesInUnitB()) {
return false;
}
ClusterDO clusterDO = clusterService.getById(physicalClusterId);
if (ValidateUtils.isNull(clusterDO)) {
return false;
}
if (ValidateUtils.isNull(dto.isPhysicalClusterId()) || !dto.isPhysicalClusterId()) {
return handleApplyTopicOrderByLogicalClusterId(clusterDO, orderDO, dto, createConfig);
}
// 物理集群ID
return handleApplyTopicOrderByPhysicalClusterId(clusterDO, orderDO, dto, createConfig);
}
/**
* 逻辑集群申请单
*/
private boolean handleApplyTopicOrderByLogicalClusterId(ClusterDO clusterDO,
OrderDO orderDO,
OrderExtensionApplyTopicDTO orderExtensionApplyTopicDTO,
CreateTopicElemConfig createConfig) {
LogicalClusterDO logicalClusterDO =
logicalClusterMetadataManager.getLogicalCluster(orderExtensionApplyTopicDTO.getClusterId());
if (ValidateUtils.isNull(logicalClusterDO)) {
return false;
}
return createTopic(
clusterDO,
orderExtensionApplyTopicDTO,
orderDO,
regionService.getIdleRegionBrokerList(clusterDO.getId(), ListUtils.string2LongList(logicalClusterDO.getRegionList())),
createConfig
);
}
/**
* 物理集群申请单
*/
private boolean handleApplyTopicOrderByPhysicalClusterId(ClusterDO clusterDO,
OrderDO orderDO,
OrderExtensionApplyTopicDTO orderExtensionApplyTopicDTO,
CreateTopicElemConfig createConfig) {
return createTopic(
clusterDO,
orderExtensionApplyTopicDTO,
orderDO,
regionService.getIdleRegionBrokerList(clusterDO.getId(), createConfig.getRegionIdList()),
createConfig
);
}
private boolean createTopic(ClusterDO clusterDO,
OrderExtensionApplyTopicDTO orderExtensionApplyTopicDTO,
OrderDO orderDO,
List<Integer> brokerIdList,
CreateTopicElemConfig createConfig) {
if (ValidateUtils.isEmptyList(brokerIdList)) {
return false;
}
TopicDO topicDO = new TopicDO();
topicDO.setAppId(orderExtensionApplyTopicDTO.getAppId());
topicDO.setClusterId(clusterDO.getId());
topicDO.setTopicName(orderExtensionApplyTopicDTO.getTopicName());
topicDO.setDescription(orderDO.getDescription());
topicDO.setPeakBytesIn(orderExtensionApplyTopicDTO.getPeakBytesIn());
Long partitionNum = Math.max(1, orderExtensionApplyTopicDTO.getPeakBytesIn() / (3 * 1024 * 1024));
Properties properties = TopicCreationConstant.createNewProperties(
createConfig.getRetentionTimeUnitHour() * 60 * 60 * 1000L
);
ResultStatus rs = adminService.createTopic(
clusterDO,
topicDO,
partitionNum.intValue(),
createConfig.getReplicaNum(),
null,
brokerIdList,
properties,
orderDO.getApplicant(),
Constant.AUTO_HANDLE_USER_NAME
);
if (!ResultStatus.SUCCESS.equals(rs)) {
return false;
}
orderDO.setApprover(Constant.AUTO_HANDLE_USER_NAME);
orderDO.setOpinion(Constant.AUTO_HANDLE_CHINESE_NAME);
orderDO.setStatus(OrderStatusEnum.PASSED.getCode());
orderService.updateOrderById(orderDO);
return true;
}
}

View File

@@ -0,0 +1,117 @@
package com.xiaojukeji.kafka.manager.task.dispatch.op;
import com.xiaojukeji.kafka.manager.bpm.OrderService;
import com.xiaojukeji.kafka.manager.bpm.common.OrderStatusEnum;
import com.xiaojukeji.kafka.manager.bpm.common.OrderTypeEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.bpm.common.handle.OrderHandleBaseDTO;
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ConfigDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.OrderDO;
import com.xiaojukeji.kafka.manager.bpm.order.impl.ApplyAppOrder;
import com.xiaojukeji.kafka.manager.service.service.ConfigService;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import com.xiaojukeji.kafka.manager.task.component.EmptyEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.*;
/**
* 工单自动化审批
* @author zhongyuankai
* @date 2020/6/12
*/
@Component
@CustomScheduled(name = "automatedHandleOrder", cron = "0 0/1 * * * ?", threadNum = 1)
public class AutomatedHandleOrder extends AbstractScheduledTask<EmptyEntry> {
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private OrderService orderService;
@Autowired
private ConfigService configService;
@Override
public List<EmptyEntry> listAllTasks() {
EmptyEntry emptyEntry = new EmptyEntry();
emptyEntry.setId(System.currentTimeMillis() / 1000);
return Arrays.asList(emptyEntry);
}
@Override
public void processTask(EmptyEntry entryEntry) {
List<OrderDO> waitDealOrderList = orderService.getWaitDealOrder();
if (ValidateUtils.isEmptyList(waitDealOrderList)) {
return;
}
List<OrderDO> passedOrderList = orderService.getPassedOrder(new Date(DateUtils.getDayStarTime(0)));
Map<Integer, List<OrderDO>> waitDealMap = getOrderTypeMap(waitDealOrderList);
Map<Integer, List<OrderDO>> passedMap = getOrderTypeMap(passedOrderList);
handleAppApplyOrder(
waitDealMap.getOrDefault(OrderTypeEnum.APPLY_APP.getCode(), new ArrayList<>()),
passedMap.getOrDefault(OrderTypeEnum.APPLY_APP.getCode(), new ArrayList<>())
);
}
private void handleAppApplyOrder(List<OrderDO> waitDealOrderList, List<OrderDO> passedOrderList) {
LOGGER.info("start handle app apply order.");
if (ValidateUtils.isEmptyList(waitDealOrderList)) {
return;
}
Integer maxNum = Constant.HANDLE_APP_APPLY_MAX_NUM_DEFAULT;
ConfigDO configDO = configService.getByKey(Constant.HANDLE_APP_APPLY_MAX_NUM);
if (!ValidateUtils.isNull(configDO)) {
try {
maxNum = Integer.parseInt(configDO.getConfigValue());
} catch (Exception e) {
LOGGER.error("", e);
}
}
int handleNum = Math.min(maxNum - passedOrderList.size(), waitDealOrderList.size());
if (handleNum <= 0) {
return;
}
OrderHandleBaseDTO baseDTO = new OrderHandleBaseDTO();
baseDTO.setStatus(OrderStatusEnum.PASSED.getCode());
baseDTO.setOpinion("通过");
baseDTO.setDetail("{}");
ApplyAppOrder applyAppOrder = (ApplyAppOrder) SpringTool.getBean(OrderTypeEnum.APPLY_APP.getOrderName());
for (int i = 0; i < handleNum; i++) {
OrderDO orderDO = waitDealOrderList.get(i);
try {
ResultStatus resultStatus =
applyAppOrder.handleOrderDetail(orderDO, baseDTO, Constant.AUTO_HANDLE_USER_NAME);
if (ResultStatus.SUCCESS.equals(resultStatus)) {
applyAppOrder.updateOrder(orderDO, baseDTO, Constant.AUTO_HANDLE_USER_NAME);
}
} catch (Exception e) {
LOGGER.error("", e);
}
}
}
private Map<Integer, List<OrderDO>> getOrderTypeMap(List<OrderDO> orderList) {
if (ValidateUtils.isEmptyList(orderList)) {
return new HashMap<>();
}
Map<Integer, List<OrderDO>> orderMap = new HashMap<>();
for (OrderDO orderDO : orderList) {
List<OrderDO> list = orderMap.getOrDefault(orderDO.getType(), new ArrayList<>());
list.add(orderDO);
orderMap.put(orderDO.getType(), list);
}
return orderMap;
}
}

View File

@@ -0,0 +1,252 @@
package com.xiaojukeji.kafka.manager.task.dispatch.op;
import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusEnum;
import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusReassignEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil;
import com.xiaojukeji.kafka.manager.dao.ReassignTaskDao;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ReassignTaskDO;
import com.xiaojukeji.kafka.manager.service.service.AdminService;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.ReassignService;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import com.xiaojukeji.kafka.manager.task.component.EmptyEntry;
import kafka.admin.ReassignPartitionsCommand;
import kafka.common.TopicAndPartition;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.*;
/**
* 分区迁移
* @author zengqiao
* @date 19/12/29
*/
@Component
@CustomScheduled(name = "flushReassignment", cron = "0 0/1 * * * ?", threadNum = 1)
public class FlushReassignment extends AbstractScheduledTask<EmptyEntry> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private ClusterService clusterService;
@Autowired
private ReassignService reassignService;
@Autowired
private ReassignTaskDao reassignTaskDao;
@Autowired
private AdminService adminService;
@Override
public List<EmptyEntry> listAllTasks() {
EmptyEntry emptyEntry = new EmptyEntry();
emptyEntry.setId(System.currentTimeMillis() / 1000);
return Arrays.asList(emptyEntry);
}
@Override
public void processTask(EmptyEntry entryEntry) {
// 查询近7天的迁移任务
List<ReassignTaskDO> doList =
reassignTaskDao.listAfterTime(new Date(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 1000));
if (ValidateUtils.isEmptyList(doList)) {
return;
}
Map<Long, Long> runningClusterIdTaskIdMap = new HashMap<>();
Map<Long, Long> runnableClusterIdTaskIdMap = new HashMap<>();
Map<Long, List<ReassignTaskDO>> doMap = new HashMap<>();
for (ReassignTaskDO elem: doList) {
List<ReassignTaskDO> subDOList = doMap.getOrDefault(elem.getTaskId(), new ArrayList<>());
subDOList.add(elem);
doMap.put(elem.getTaskId(), subDOList);
// 获取正在 running 和 runnable 的任务
if (TaskStatusReassignEnum.RUNNABLE.getCode().equals(elem.getStatus())
&& elem.getBeginTime().getTime() <= System.currentTimeMillis()) {
LOGGER.debug("reassignment tasks, runnable task:{}.", elem);
runnableClusterIdTaskIdMap.put(elem.getClusterId(), elem.getTaskId());
}
if (TaskStatusReassignEnum.RUNNING.getCode().equals(elem.getStatus())) {
LOGGER.debug("reassignment tasks, running task:{}.", elem);
runningClusterIdTaskIdMap.put(elem.getClusterId(), elem.getTaskId());
}
}
// 处理正在执行的任务
for (Map.Entry<Long, Long> entry: runningClusterIdTaskIdMap.entrySet()) {
modifyRunning(doMap.get(entry.getValue()));
}
// 尝试启动待执行的任务
for (Map.Entry<Long, Long> entry: runnableClusterIdTaskIdMap.entrySet()) {
if (runningClusterIdTaskIdMap.containsKey(entry.getKey())) {
// 如果这个集群已经有迁移任务, 则忽略
continue;
}
if (startRunnable(doMap.get(entry.getValue()))) {
// 启动任务成功后直接返回, 每次调度仅启动一个迁移任务
return;
}
}
}
private void modifyRunning(List<ReassignTaskDO> subDOList) {
if (ValidateUtils.isEmptyList(subDOList)) {
return;
}
for (ReassignTaskDO elem: subDOList) {
if (!TaskStatusReassignEnum.RUNNING.getCode().equals(elem.getStatus())) {
continue;
}
ZkUtils zkUtils = null;
try {
ClusterDO clusterDO = clusterService.getById(elem.getClusterId());
if (ValidateUtils.isNull(clusterDO)) {
continue;
}
zkUtils = ZkUtils.apply(clusterDO.getZookeeper(),
Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS,
Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS,
JaasUtils.isZkSecurityEnabled());
Map<TopicAndPartition, TaskStatusReassignEnum> statusMap =
reassignService.verifyAssignment(zkUtils, elem.getReassignmentJson());
if (ValidateUtils.isNull(statusMap)) {
return;
}
Set<TaskStatusReassignEnum> statusSet = new HashSet<>();
for (Map.Entry<TopicAndPartition, TaskStatusReassignEnum> entry: statusMap.entrySet()) {
statusSet.add(entry.getValue());
}
if (statusSet.contains(TaskStatusReassignEnum.RUNNING)) {
// 迁移任务未完成, 则执行限流, 并结束调用
ReassignPartitionsCommand.executeAssignment(
zkUtils,
elem.getReassignmentJson(),
elem.getRealThrottle()
);
return;
}
// 迁移任务已经完成
ReassignPartitionsCommand.verifyAssignment(zkUtils, elem.getReassignmentJson());
Thread.sleep(10000);
// 恢复Topic保存时间
changeTopicRetentionTime(clusterDO, elem.getTopicName(), elem.getOriginalRetentionTime());
if (statusSet.contains(TaskStatusReassignEnum.FAILED)) {
elem.setStatus(TaskStatusReassignEnum.FAILED.getCode());
} else {
elem.setStatus(TaskStatusReassignEnum.SUCCEED.getCode());
}
if (reassignTaskDao.updateById(elem) < 1) {
LOGGER.error("modify mysql failed, task:{}.", elem);
return;
}
} catch (Exception e) {
LOGGER.error("modify running failed, task:{}.", elem, e);
} finally {
if (zkUtils != null) {
zkUtils.close();
}
zkUtils = null;
}
}
}
private boolean startRunnable(List<ReassignTaskDO> subDOList) {
if (ValidateUtils.isEmptyList(subDOList)) {
return false;
}
ZkUtils zkUtils = null;
for (ReassignTaskDO elem : subDOList) {
if (!TaskStatusReassignEnum.RUNNABLE.getCode().equals(elem.getStatus())) {
continue;
}
if (elem.getBeginTime().getTime() > System.currentTimeMillis()) {
continue;
}
ClusterDO clusterDO = clusterService.getById(elem.getClusterId());
if (ValidateUtils.isNull(clusterDO)) {
continue;
}
try {
zkUtils = ZkUtils.apply(clusterDO.getZookeeper(),
Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS,
Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS,
JaasUtils.isZkSecurityEnabled());
if (zkUtils.pathExists(ZkPathUtil.REASSIGN_PARTITIONS_ROOT_NODE)) {
// 任务已经存在, 不知道是谁弄的, 因为未知, 就将其当作是本次触发的
return true;
}
// 缩短Topic保存时间
changeTopicRetentionTime(clusterDO, elem.getTopicName(), elem.getReassignRetentionTime());
// 启动Topic迁移
ReassignPartitionsCommand.executeAssignment(
zkUtils,
elem.getReassignmentJson(),
elem.getRealThrottle()
);
} catch (Throwable t) {
LOGGER.error("execute assignment failed, task:{}.", elem, t);
return false;
} finally {
if (zkUtils != null) {
zkUtils.close();
}
zkUtils = null;
}
try {
// 修改状态
elem.setStatus(TaskStatusEnum.RUNNING.getCode());
if (reassignTaskDao.updateById(elem) < 1) {
LOGGER.error("modify mysql failed, task:{}.", elem);
}
} catch (Throwable t) {
LOGGER.error("execute assignment failed, task:{}.", elem, t);
}
return true;
}
return false;
}
private void changeTopicRetentionTime(ClusterDO clusterDO, String topicName, Long retentionTime) throws Exception {
Properties properties = adminService.getTopicConfig(clusterDO, topicName);
if (ValidateUtils.isNull(properties)) {
throw new RuntimeException("get topic config failed");
}
properties.setProperty(TopicCreationConstant.TOPIC_RETENTION_TIME_KEY_NAME, String.valueOf(retentionTime));
ResultStatus rs = adminService.modifyTopicConfig(clusterDO, topicName, properties, Constant.AUTO_HANDLE_USER_NAME);
if (ResultStatus.SUCCESS.equals(rs)) {
return;
}
throw new RuntimeException("modify topic config failed");
}
}

View File

@@ -0,0 +1,82 @@
package com.xiaojukeji.kafka.manager.task.listener;
import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.TopicNameConfig;
import com.xiaojukeji.kafka.manager.common.entity.ao.remote.KafkaTopicMetrics;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
import com.xiaojukeji.kafka.manager.common.events.TopicMetricsCollectedEvent;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.ConfigService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 数据上报Kafka
* @author zengqiao
* @date 20/8/31
*/
@Component("sinkCommunityTopicMetrics2Kafka")
public class SinkCommunityTopicMetrics2Kafka implements ApplicationListener<TopicMetricsCollectedEvent> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private ConfigService configService;
@Override
public void onApplicationEvent(TopicMetricsCollectedEvent event) {
List<TopicMetrics> metricsList = event.getMetricsList();
if (ValidateUtils.isEmptyList(metricsList)) {
LOGGER.warn("produce topic metrics failed, data is empty.");
return;
}
TopicNameConfig config = configService.getByKey(
ConfigConstant.PRODUCE_TOPIC_METRICS_CONFIG_KEY,
TopicNameConfig.class
);
if (ValidateUtils.isNull(config) || !config.legal()) {
LOGGER.warn("produce consumer metrics failed, config illegal, config:{}.", config);
return;
}
Long now = System.currentTimeMillis();
for (TopicMetrics metrics: metricsList) {
try {
convertAndProduceMetrics(metrics, config, now);
} catch (Exception e) {
LOGGER.error("convert and produce failed, metrics:{}.", metrics);
}
}
}
private void convertAndProduceMetrics(TopicMetrics metrics,
TopicNameConfig config,
Long now) {
KafkaTopicMetrics kafkaTopicMetrics = new KafkaTopicMetrics();
kafkaTopicMetrics.setClusterId(metrics.getClusterId());
kafkaTopicMetrics.setTopic(metrics.getTopicName());
TopicMetadata topicMetadata =
PhysicalClusterMetadataManager.getTopicMetadata(metrics.getClusterId(), metrics.getTopicName());
if (!ValidateUtils.isNull(topicMetadata)) {
kafkaTopicMetrics.setPartitionNum(topicMetadata.getPartitionNum());
}
kafkaTopicMetrics.setMessagesInPerSec(metrics.getMessagesInPerSecOneMinuteRate(null));
kafkaTopicMetrics.setBytesInPerSec(metrics.getBytesInPerSecOneMinuteRate(null));
kafkaTopicMetrics.setTimestamp(now);
KafkaClientPool.produceData2Kafka(
config.getClusterId(),
config.getTopicName(),
JsonUtils.toJSONString(kafkaTopicMetrics)
);
}
}

View File

@@ -0,0 +1,94 @@
package com.xiaojukeji.kafka.manager.task.listener;
import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.TopicNameConfig;
import com.xiaojukeji.kafka.manager.common.entity.ao.remote.KafkaConsumerMetrics;
import com.xiaojukeji.kafka.manager.common.entity.ao.remote.KafkaConsumerMetricsElem;
import com.xiaojukeji.kafka.manager.common.entity.metrics.ConsumerMetrics;
import com.xiaojukeji.kafka.manager.common.events.ConsumerMetricsCollectedEvent;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool;
import com.xiaojukeji.kafka.manager.service.service.ConfigService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author zengqiao
* @date 20/8/31
*/
@Component("produceConsumerMetrics")
public class SinkConsumerMetrics2Kafka implements ApplicationListener<ConsumerMetricsCollectedEvent> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private ConfigService configService;
@Override
public void onApplicationEvent(ConsumerMetricsCollectedEvent event) {
List<ConsumerMetrics> metricsList = event.getMetricsList();
if (ValidateUtils.isEmptyList(metricsList)) {
LOGGER.warn("produce consumer metrics failed, data is empty.");
return;
}
TopicNameConfig config = configService.getByKey(
ConfigConstant.PRODUCE_CONSUMER_METRICS_CONFIG_KEY,
TopicNameConfig.class
);
if (ValidateUtils.isNull(config) || !config.legal()) {
LOGGER.warn("produce consumer metrics failed, config illegal, config:{}.", config);
return;
}
Long now = System.currentTimeMillis();
for (ConsumerMetrics consumerMetrics: metricsList) {
try {
convertAndProduceMetrics(consumerMetrics, config, now);
} catch (Exception e) {
LOGGER.error("convert and produce failed, metrics:{}.", consumerMetrics);
}
}
}
private void convertAndProduceMetrics(ConsumerMetrics consumerMetrics,
TopicNameConfig config,
Long now) {
KafkaConsumerMetrics kafkaConsumerMetrics = new KafkaConsumerMetrics();
kafkaConsumerMetrics.setClusterId(consumerMetrics.getClusterId());
kafkaConsumerMetrics.setTopicName(consumerMetrics.getTopicName());
kafkaConsumerMetrics.setConsumerGroup(consumerMetrics.getConsumerGroup());
kafkaConsumerMetrics.setLocation(consumerMetrics.getLocation().toLowerCase());
kafkaConsumerMetrics.setPartitionNum(consumerMetrics.getPartitionOffsetMap().size());
kafkaConsumerMetrics.setCreateTime(now);
List<KafkaConsumerMetricsElem> elemList = new ArrayList<>();
for (Map.Entry<Integer, Long> entry: consumerMetrics.getPartitionOffsetMap().entrySet()) {
KafkaConsumerMetricsElem elem = new KafkaConsumerMetricsElem();
if (ValidateUtils.isNull(entry.getKey()) || ValidateUtils.isNull(entry.getValue())) {
LOGGER.error("sink consumer metrics 2 kafka failed, exist null data, consumer-metrics:{}."
, consumerMetrics);
return;
}
elem.setPartitionId(entry.getKey());
elem.setPartitionOffset(entry.getValue());
elem.setConsumeOffset(consumerMetrics.getConsumeOffsetMap().get(entry.getKey()));
elemList.add(elem);
}
kafkaConsumerMetrics.setConsumeDetailList(elemList);
KafkaClientPool.produceData2Kafka(
config.getClusterId(),
config.getTopicName(),
JsonUtils.toJSONString(kafkaConsumerMetrics)
);
}
}

View File

@@ -0,0 +1,171 @@
package com.xiaojukeji.kafka.manager.task.listener;
import com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.monitor.common.MonitorSinkConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.ConsumerMetrics;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.LogicalClusterDO;
import com.xiaojukeji.kafka.manager.common.events.ConsumerMetricsCollectedEvent;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.monitor.common.entry.MetricSinkPoint;
import com.xiaojukeji.kafka.manager.monitor.common.entry.sink.MonitorConsumePartitionSinkTag;
import com.xiaojukeji.kafka.manager.monitor.common.entry.sink.MonitorConsumerSinkTag;
import com.xiaojukeji.kafka.manager.monitor.component.AbstractMonitorService;
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.*;
/**
* @author zengqiao
* @date 20/9/2
*/
@Component("sinkConsumerMetrics2Monitor")
public class SinkConsumerMetrics2Monitor implements ApplicationListener<ConsumerMetricsCollectedEvent> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private LogicalClusterMetadataManager logicalClusterMetadataManager;
@Autowired
private AbstractMonitorService abstractMonitor;
@Override
public void onApplicationEvent(ConsumerMetricsCollectedEvent event) {
LOGGER.warn("sink consumer metrics start.");
List<ConsumerMetrics> metricsList = event.getMetricsList();
if (ValidateUtils.isEmptyList(metricsList)) {
LOGGER.warn("sink consumer metrics failed, data is empty.");
return;
}
long startTime = System.currentTimeMillis();
sinkConsumerGroup(metricsList);
LOGGER.info("sink consumer metrics to monitor-system finish, clusterId:{} costTime:{}"
, metricsList.get(0).getClusterId(), System.currentTimeMillis() - startTime);
}
private void sinkConsumerGroup(List<ConsumerMetrics> metricsList) {
List<MetricSinkPoint> metricSinkPoints = new ArrayList<>();
for (ConsumerMetrics elem: metricsList) {
LogicalClusterDO logicalClusterDO =
logicalClusterMetadataManager.getTopicLogicalCluster(elem.getClusterId(), elem.getTopicName());
if (ValidateUtils.isNull(logicalClusterDO)) {
continue;
}
metricSinkPoints.addAll(recordConsumer(elem.getTimestampUnitMs() / 1000, logicalClusterDO.getName(), elem));
if (metricSinkPoints.size() > MonitorSinkConstant.MONITOR_SYSTEM_SINK_THRESHOLD) {
abstractMonitor.sinkMetrics(metricSinkPoints);
metricSinkPoints.clear();
}
}
if (metricSinkPoints.isEmpty()) {
return;
}
abstractMonitor.sinkMetrics(metricSinkPoints);
}
private static List<MetricSinkPoint> recordConsumer(long timestamp,
String logicalClusterName,
ConsumerMetrics metrics) {
if (ValidateUtils.isNull(logicalClusterName) || ValidateUtils.isNull(metrics)) {
return new ArrayList<>();
}
Long maxLag = 0L;
List<MetricSinkPoint> pointList = new ArrayList<>();
for (Integer partitionId: metrics.getPartitionOffsetMap().keySet()) {
Long partitionOffset = metrics.getPartitionOffsetMap().get(partitionId);
Long consumerOffset = metrics.getConsumeOffsetMap().get(partitionId);
if (ValidateUtils.isNull(partitionOffset) || ValidateUtils.isNull(consumerOffset)) {
continue;
}
Long lag = Math.max(partitionOffset - consumerOffset, 0L);
pointList.add(new MetricSinkPoint(
MonitorMetricNameEnum.CONSUMER_PARTITION_LAG.getMetricName(),
lag,
MonitorSinkConstant.MONITOR_SYSTEM_SINK_STEP,
timestamp,
new MonitorConsumePartitionSinkTag(
MonitorSinkConstant.MONITOR_SYSTEM_TAG_DEFAULT_HOST,
logicalClusterName,
metrics.getTopicName(),
partitionId,
metrics.getConsumerGroup()
)
));
maxLag = Math.max(maxLag, lag);
}
pointList.add(new MetricSinkPoint(
MonitorMetricNameEnum.CONSUMER_MAX_LAG.getMetricName(),
maxLag,
MonitorSinkConstant.MONITOR_SYSTEM_SINK_STEP,
timestamp,
new MonitorConsumerSinkTag(
MonitorSinkConstant.MONITOR_SYSTEM_TAG_DEFAULT_HOST,
logicalClusterName,
metrics.getTopicName(),
metrics.getConsumerGroup()
)
));
Long maxDelayTime = calMaxDelayTime(
metrics.getClusterId(),
metrics.getTopicName(),
metrics.getConsumerGroup(),
maxLag
);
if (ValidateUtils.isNull(maxDelayTime)) {
LOGGER.error("cal maxDelayTime failed, clusterId:{} topicName:{} consumerGroup:{} maxLag:{}."
, metrics.getClusterId(), metrics.getTopicName(), metrics.getConsumerGroup(), maxLag);
return pointList;
}
pointList.add(new MetricSinkPoint(
MonitorMetricNameEnum.CONSUMER_MAX_DELAY_TIME.getMetricName(),
maxDelayTime,
MonitorSinkConstant.MONITOR_SYSTEM_SINK_STEP,
timestamp,
new MonitorConsumerSinkTag(
MonitorSinkConstant.MONITOR_SYSTEM_TAG_DEFAULT_HOST,
logicalClusterName,
metrics.getTopicName(),
metrics.getConsumerGroup()
)
));
return pointList;
}
private static Long calMaxDelayTime(Long clusterId, String topicName, String consumerGroup, Long maxLag) {
try {
TopicMetrics metrics = KafkaMetricsCache.getTopicMetricsFromCache(clusterId, topicName);
if (ValidateUtils.isNull(metrics)) {
return null;
}
Double messageIn = metrics.getMessagesInPerSecOneMinuteRate(-1.0);
if (messageIn.equals(-1.0) || messageIn.equals(0.0)) {
return null;
}
TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName);
if (ValidateUtils.isNull(topicMetadata)) {
return null;
}
return Math.round(maxLag / messageIn * topicMetadata.getPartitionNum());
} catch (Exception e) {
LOGGER.error("cal maxDelayTime failed, clusterId:{} topicName:{} consumerGroup:{} maxLag:{}."
, clusterId, topicName, consumerGroup, maxLag);
}
return null;
}
}

View File

@@ -0,0 +1,110 @@
package com.xiaojukeji.kafka.manager.task.listener;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
import com.xiaojukeji.kafka.manager.monitor.common.MonitorSinkConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicThrottledMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.LogicalClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.monitor.common.entry.MetricSinkPoint;
import com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum;
import com.xiaojukeji.kafka.manager.monitor.common.entry.sink.MonitorTopicThrottledSinkTag;
import com.xiaojukeji.kafka.manager.monitor.component.AbstractMonitorService;
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.task.common.TopicThrottledMetricsCollectedEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* @author zengqiao
* @date 20/9/24
*/
@Component("sinkTopicThrottledMetrics2Monitor")
public class SinkTopicThrottledMetrics2Monitor implements ApplicationListener<TopicThrottledMetricsCollectedEvent> {
@Autowired
private AbstractMonitorService abstractMonitor;
@Autowired
private LogicalClusterMetadataManager logicalClusterMetadataManager;
@Override
public void onApplicationEvent(TopicThrottledMetricsCollectedEvent event) {
List<TopicThrottledMetrics> metrics = event.getMetricsList();
if (ValidateUtils.isEmptyList(metrics)) {
return;
}
Long clusterId = metrics.get(0).getClusterId();
sink2MonitorSystem(clusterId, event.getStartTime(), metrics);
}
private void sink2MonitorSystem(Long clusterId,
Long startTime,
List<TopicThrottledMetrics> metricsList) {
if (ValidateUtils.isEmptyList(metricsList)) {
return;
}
List<MetricSinkPoint> metricSinkPoints = new ArrayList<>();
for (TopicThrottledMetrics elem: metricsList) {
LogicalClusterDO logicalClusterDO =
logicalClusterMetadataManager.getTopicLogicalCluster(clusterId, elem.getTopicName());
if (ValidateUtils.isNull(logicalClusterDO)) {
continue;
}
MetricSinkPoint point = recordTopicThrottled(startTime, logicalClusterDO.getName(), elem);
if (ValidateUtils.isNull(point)) {
continue;
}
metricSinkPoints.add(point);
if (metricSinkPoints.size() > MonitorSinkConstant.MONITOR_SYSTEM_SINK_THRESHOLD) {
abstractMonitor.sinkMetrics(metricSinkPoints);
metricSinkPoints.clear();
}
}
if (metricSinkPoints.isEmpty()) {
return;
}
abstractMonitor.sinkMetrics(metricSinkPoints);
}
private static MetricSinkPoint recordTopicThrottled(long startTime,
String logicalClusterName,
TopicThrottledMetrics metrics) {
if (metrics.getClientType().equals(KafkaClientEnum.FETCH_CLIENT)) {
return new MetricSinkPoint(
MonitorMetricNameEnum.TOPIC_APP_FETCH_THROTTLE.getMetricName(),
MonitorSinkConstant.MONITOR_SYSTEM_METRIC_VALUE_EFFECTIVE,
MonitorSinkConstant.MONITOR_SYSTEM_SINK_STEP,
startTime / 1000,
new MonitorTopicThrottledSinkTag(
MonitorSinkConstant.MONITOR_SYSTEM_TAG_DEFAULT_HOST,
logicalClusterName,
metrics.getTopicName(),
metrics.getAppId()
)
);
}
if (metrics.getClientType().equals(KafkaClientEnum.PRODUCE_CLIENT)) {
return new MetricSinkPoint(
MonitorMetricNameEnum.TOPIC_APP_PRODUCE_THROTTLE.getMetricName(),
MonitorSinkConstant.MONITOR_SYSTEM_METRIC_VALUE_EFFECTIVE,
MonitorSinkConstant.MONITOR_SYSTEM_SINK_STEP,
startTime / 1000,
new MonitorTopicThrottledSinkTag(
MonitorSinkConstant.MONITOR_SYSTEM_TAG_DEFAULT_HOST,
logicalClusterName,
metrics.getTopicName(),
metrics.getAppId()
)
);
}
return null;
}
}

View File

@@ -0,0 +1,55 @@
package com.xiaojukeji.kafka.manager.task.listener;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO;
import com.xiaojukeji.kafka.manager.common.events.TopicMetricsCollectedEvent;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.dao.TopicMetricsDao;
import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 数据存储DB
* @author zengqiao
* @date 20/9/1
*/
@Component("storeCommunityTopicMetrics2DB")
public class StoreCommunityTopicMetrics2DB implements ApplicationListener<TopicMetricsCollectedEvent> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private TopicMetricsDao topicMetricsDao;
@Override
public void onApplicationEvent(TopicMetricsCollectedEvent event) {
List<TopicMetrics> metricsList = event.getMetricsList();
if (ValidateUtils.isEmptyList(metricsList)) {
LOGGER.warn("store topic metrics failed, data is empty.");
return;
}
try {
store2DB(System.currentTimeMillis(), metricsList);
} catch (Throwable t) {
LOGGER.error("save topic metrics failed, clusterId:{}.", event.getClusterId(), t);
}
}
private void store2DB(Long startTime, List<TopicMetrics> metricsList) throws Exception {
List<TopicMetricsDO> doList =
MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList);
int i = 0;
do {
topicMetricsDao.batchAdd(doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())));
i += Constant.BATCH_INSERT_SIZE;
} while (i < doList.size());
}
}

View File

@@ -0,0 +1,86 @@
package com.xiaojukeji.kafka.manager.task.listener;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicThrottledMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicThrottledMetricsDO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.service.ThrottleService;
import com.xiaojukeji.kafka.manager.task.common.TopicThrottledMetricsCollectedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.*;
/**
* @author zengqiao
* @date 20/9/24
*/
@Component("storeTopicThrottledMetrics2DB")
public class StoreTopicThrottledMetrics2DB implements ApplicationListener<TopicThrottledMetricsCollectedEvent> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private ThrottleService throttleService;
@Override
public void onApplicationEvent(TopicThrottledMetricsCollectedEvent event) {
List<TopicThrottledMetrics> metrics = event.getMetricsList();
if (ValidateUtils.isEmptyList(metrics)) {
return;
}
Long clusterId = metrics.get(0).getClusterId();
store2DB(clusterId, convert2TopicThrottledMetricsDO(metrics));
}
private List<TopicThrottledMetricsDO> convert2TopicThrottledMetricsDO(List<TopicThrottledMetrics> metricsList) {
Map<String, TopicThrottledMetricsDO> doMap = new HashMap<>();
for (TopicThrottledMetrics metrics: metricsList) {
String key = new StringBuilder()
.append(metrics.getClusterId())
.append(metrics.getAppId())
.append(metrics.getTopicName())
.toString();
TopicThrottledMetricsDO metricsDO = doMap.get(key);
if (ValidateUtils.isNull(metricsDO)) {
metricsDO = new TopicThrottledMetricsDO();
metricsDO.setAppId(metrics.getAppId());
metricsDO.setClusterId(metrics.getClusterId());
metricsDO.setTopicName(metrics.getTopicName());
metricsDO.setFetchThrottled(0);
metricsDO.setProduceThrottled(0);
}
if (KafkaClientEnum.PRODUCE_CLIENT.equals(metrics.getClientType())) {
metricsDO.setProduceThrottled(1);
} else {
metricsDO.setFetchThrottled(1);
}
doMap.put(key, metricsDO);
}
return new ArrayList<>(doMap.values());
}
private void store2DB(Long clusterId,
List<TopicThrottledMetricsDO> doList) {
if (ValidateUtils.isEmptyList(doList)) {
return;
}
int i = 0;
do {
try {
throttleService.insertBatch(doList.subList(
i,
Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()))
);
} catch (Exception e) {
LOGGER.error("store topic throttled metrics failed, clusterId:{}.", clusterId);
}
i += Constant.BATCH_INSERT_SIZE;
} while (i < doList.size());
}
}

View File

@@ -0,0 +1,58 @@
package com.xiaojukeji.kafka.manager.task.schedule;
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.JmxService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.*;
/**
* @author zengqiao
* @date 20/7/2
*/
@Component
public class FlushTopicMetrics {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private JmxService jmxService;
@Autowired
private ClusterService clusterService;
@Scheduled(cron="5 0/1 * * * ?")
public void flushTopicMetrics() {
long startTime = System.currentTimeMillis();
LOGGER.info("flush topic-metrics start.");
List<ClusterDO> clusterDOList = clusterService.list();
for (ClusterDO clusterDO : clusterDOList) {
try {
flushTopicMetrics(clusterDO.getId());
} catch (Exception e) {
LOGGER.error("flush topic-metrics failed, clusterId:{}.", clusterDO.getId(), e);
}
}
LOGGER.info("flush topic-metrics finished, costTime:{}.", System.currentTimeMillis() - startTime);
}
private void flushTopicMetrics(Long clusterId) {
List<TopicMetrics> metricsList =
jmxService.getTopicMetrics(clusterId, KafkaMetricsCollections.TOPIC_METRICS_TO_DB, true);
if (ValidateUtils.isEmptyList(metricsList)) {
KafkaMetricsCache.putTopicMetricsToCache(clusterId, new ArrayList<>());
return;
}
KafkaMetricsCache.putTopicMetricsToCache(clusterId, metricsList);
}
}

View File

@@ -0,0 +1,170 @@
package com.xiaojukeji.kafka.manager.task.schedule.metadata;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.ConsumerMetadata;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.ConsumerMetadataCache;
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import kafka.admin.AdminClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import scala.collection.JavaConversions;
import java.util.*;
/**
* @author zengqiao
* @date 19/12/25
*/
@Component
public class FlushBKConsumerGroupMetadata {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private ClusterService clusterService;
@Scheduled(cron="15 0/1 * * * ?")
public void schedule() {
List<ClusterDO> doList = clusterService.list();
for (ClusterDO clusterDO: doList) {
LOGGER.info("flush broker cg start, clusterId:{}.", clusterDO.getId());
long startTime = System.currentTimeMillis();
try {
flush(clusterDO.getId());
} catch (Throwable t) {
LOGGER.error("flush broker cg failed, clusterId:{}.", clusterDO.getId(), t);
}
LOGGER.info("flush broker cg finished, clusterId:{} costTime:{}.",
clusterDO.getId(), System.currentTimeMillis() - startTime);
}
}
private void flush(Long clusterId) {
// 获取消费组列表
Set<String> consumerGroupSet = new HashSet<>();
Map<String, List<String>> consumerGroupAppIdMap = new HashMap<>();
collectAndSaveConsumerGroup(clusterId, consumerGroupSet, consumerGroupAppIdMap);
// 获取消费组summary信息
Map<String, Set<String>> topicNameConsumerGroupMap = new HashMap<>();
Map<String, AdminClient.ConsumerGroupSummary> consumerGroupSummary =
collectConsumerGroupSummary(clusterId, consumerGroupSet, topicNameConsumerGroupMap);
// 获取Topic下的消费组
topicNameConsumerGroupMap =
collectTopicAndConsumerGroupMap(clusterId, consumerGroupSet, topicNameConsumerGroupMap);
ConsumerMetadataCache.putConsumerMetadataInBK(clusterId,
new ConsumerMetadata(
consumerGroupSet,
topicNameConsumerGroupMap,
consumerGroupSummary,
consumerGroupAppIdMap
)
);
}
private void collectAndSaveConsumerGroup(Long clusterId,
Set<String> consumerGroupSet,
Map<String, List<String>> consumerGroupAppIdMap) {
try {
AdminClient adminClient = KafkaClientPool.getAdminClient(clusterId);
scala.collection.immutable.Map<org.apache.kafka.common.Node, scala.collection.immutable.List<kafka.coordinator.GroupOverview>> brokerGroupMap = adminClient.listAllGroups();
for (scala.collection.immutable.List<kafka.coordinator.GroupOverview> brokerGroup : JavaConversions.asJavaMap(brokerGroupMap).values()) {
List<kafka.coordinator.GroupOverview> lists = JavaConversions.asJavaList(brokerGroup);
for (kafka.coordinator.GroupOverview groupOverview : lists) {
String consumerGroup = groupOverview.groupId();
List<String> appIdList = new ArrayList<>();
if (consumerGroup != null && consumerGroup.contains("#")) {
String[] splitArray = consumerGroup.split("#");
consumerGroup = splitArray[splitArray.length - 1];
appIdList = Arrays.asList(splitArray).subList(0, splitArray.length - 1);
}
consumerGroupAppIdMap.put(consumerGroup, appIdList);
consumerGroupSet.add(consumerGroup);
}
}
return ;
} catch (Exception e) {
LOGGER.error("collect consumerGroup failed, clusterId:{}.", clusterId, e);
}
}
private Map<String, AdminClient.ConsumerGroupSummary> collectConsumerGroupSummary(Long clusterId,
Set<String> consumerGroupSet,
Map<String, Set<String>> topicNameConsumerGroupMap) {
if (consumerGroupSet == null || consumerGroupSet.isEmpty()) {
return new HashMap<>();
}
AdminClient adminClient = KafkaClientPool.getAdminClient(clusterId);
Map<String, AdminClient.ConsumerGroupSummary> consumerGroupSummaryMap = new HashMap<>();
for (String consumerGroup : consumerGroupSet) {
try {
AdminClient.ConsumerGroupSummary consumerGroupSummary = adminClient.describeConsumerGroup(consumerGroup);
if (consumerGroupSummary == null) {
continue;
}
consumerGroupSummaryMap.put(consumerGroup, consumerGroupSummary);
java.util.Iterator<scala.collection.immutable.List<AdminClient.ConsumerSummary>> it =
JavaConversions.asJavaIterator(consumerGroupSummary.consumers().iterator());
while (it.hasNext()) {
List<AdminClient.ConsumerSummary> consumerSummaryList = JavaConversions.asJavaList(it.next());
for (AdminClient.ConsumerSummary consumerSummary: consumerSummaryList) {
List<TopicPartition> topicPartitionList = JavaConversions.asJavaList(consumerSummary.assignment());
if (topicPartitionList == null) {
continue;
}
for (TopicPartition topicPartition: topicPartitionList) {
Set<String> groupSet = topicNameConsumerGroupMap.getOrDefault(topicPartition.topic(), new HashSet<>());
groupSet.add(consumerGroup);
topicNameConsumerGroupMap.put(topicPartition.topic(), groupSet);
}
}
}
} catch (SchemaException e) {
LOGGER.error("schemaException exception, clusterId:{} consumerGroup:{}.", clusterId, consumerGroup, e);
} catch (Exception e) {
LOGGER.error("collect consumerGroupSummary failed, clusterId:{} consumerGroup:{}.", clusterId, consumerGroup, e);
}
}
return consumerGroupSummaryMap;
}
private Map<String, Set<String>> collectTopicAndConsumerGroupMap(Long clusterId,
Set<String> consumerGroupSet,
Map<String, Set<String>> topicNameConsumerGroupMap) {
if (ValidateUtils.isEmptySet(consumerGroupSet)) {
return new HashMap<>(0);
}
AdminClient adminClient = KafkaClientPool.getAdminClient(clusterId);
for (String consumerGroup: consumerGroupSet) {
try {
Map<TopicPartition, Object> topicPartitionAndOffsetMap = JavaConversions.asJavaMap(adminClient.listGroupOffsets(consumerGroup));
for (Map.Entry<TopicPartition, Object> entry : topicPartitionAndOffsetMap.entrySet()) {
TopicPartition tp = entry.getKey();
Set<String> subConsumerGroupSet = topicNameConsumerGroupMap.getOrDefault(tp.topic(), new HashSet<>());
subConsumerGroupSet.add(consumerGroup);
topicNameConsumerGroupMap.put(tp.topic(), subConsumerGroupSet);
}
} catch (Exception e) {
LOGGER.error("update consumer group failed, clusterId:{} consumerGroup:{}.", clusterId, consumerGroup, e);
}
}
return topicNameConsumerGroupMap;
}
}

View File

@@ -0,0 +1,51 @@
package com.xiaojukeji.kafka.manager.task.schedule.metadata;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @author zengqiao
* @date 20/6/3
*/
@Component
public class FlushClusterMetadata {
@Autowired
private ClusterService clusterService;
@Autowired
private PhysicalClusterMetadataManager physicalClusterMetadataManager;
@Scheduled(cron="0/30 * * * * ?")
public void flush() {
List<ClusterDO> doList = clusterService.list();
Set<Long> newClusterIdSet = new HashSet<>();
Set<Long> oldClusterIdSet = physicalClusterMetadataManager.getClusterIdSet();
for (ClusterDO clusterDO: doList) {
newClusterIdSet.add(clusterDO.getId());
if (oldClusterIdSet.contains(clusterDO.getId())) {
continue;
}
// 添加集群
physicalClusterMetadataManager.addNew(clusterDO);
}
for (Long clusterId: oldClusterIdSet) {
if (newClusterIdSet.contains(clusterId)) {
continue;
}
// 移除集群
physicalClusterMetadataManager.remove(clusterId);
}
}
}

View File

@@ -0,0 +1,63 @@
package com.xiaojukeji.kafka.manager.task.schedule.metadata;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
import com.xiaojukeji.kafka.manager.service.utils.KafkaZookeeperUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author zengqiao
* @date 20/7/23
*/
@Component
public class FlushTopicRetentionTime {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private ClusterService clusterService;
@Scheduled(cron="25 0/1 * * * ?")
public void flush() {
List<ClusterDO> doList = clusterService.list();
for (ClusterDO clusterDO: doList) {
try {
flush(clusterDO);
} catch (Exception e) {
LOGGER.error("flush topic retention time failed, clusterId:{}.", clusterDO.getId(), e);
}
}
}
private void flush(ClusterDO clusterDO) {
ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterDO.getId());
if (ValidateUtils.isNull(zkConfig)) {
LOGGER.error("flush topic retention time, get zk config failed, clusterId:{}.", clusterDO.getId());
return;
}
for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) {
try {
Long retentionTime = KafkaZookeeperUtils.getTopicRetentionTime(zkConfig, topicName);
if (retentionTime == null) {
LOGGER.warn("get topic retentionTime failed, clusterId:{} topicName:{}.",
clusterDO.getId(), topicName);
continue;
}
PhysicalClusterMetadataManager.putTopicRetentionTime(clusterDO.getId(), topicName, retentionTime);
} catch (Exception e) {
LOGGER.error("get topic retentionTime failed, clusterId:{} topicName:{}.",
clusterDO.getId(), topicName, e);
}
}
}
}

View File

@@ -0,0 +1,121 @@
package com.xiaojukeji.kafka.manager.task.schedule.metadata;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.ConsumerMetadata;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.ConsumerMetadataCache;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil;
import com.xiaojukeji.kafka.manager.service.cache.ThreadPool;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.stream.Collectors;
/**
* @author zengqiao
* @date 19/12/25
*/
@Component
public class FlushZKConsumerGroupMetadata {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private ClusterService clusterService;
@Scheduled(cron="35 0/1 * * * ?")
public void schedule() {
List<ClusterDO> doList = clusterService.list();
for (ClusterDO clusterDO: doList) {
LOGGER.info("flush zookeeper cg start, clusterId:{}.", clusterDO.getId());
long startTime = System.currentTimeMillis();
try {
flush(clusterDO.getId());
} catch (Throwable t) {
LOGGER.error("flush zookeeper cg failed, clusterId:{}.", clusterDO.getId(), t);
}
LOGGER.info("flush zookeeper cg finished, clusterId:{} costTime:{}.",
clusterDO.getId(), System.currentTimeMillis() - startTime);
}
}
private void flush(Long clusterId) {
Set<String> consumerGroupSet = collectConsumerGroup(clusterId);
Map<String, Set<String>> topicNameConsumerGroupMap =
collectTopicAndConsumerGroupMap(clusterId, new ArrayList<>(consumerGroupSet));
ConsumerMetadataCache.putConsumerMetadataInZK(
clusterId,
new ConsumerMetadata(consumerGroupSet, topicNameConsumerGroupMap, new HashMap<>(0), new HashMap<>(0))
);
}
private Set<String> collectConsumerGroup(Long clusterId) {
try {
ZkConfigImpl zkConfigImpl = PhysicalClusterMetadataManager.getZKConfig(clusterId);
List<String> consumerGroupList = zkConfigImpl.getChildren(ZkPathUtil.CONSUMER_ROOT_NODE);
if (consumerGroupList == null) {
return new HashSet<>();
}
return consumerGroupList
.stream()
.filter(elem -> !elem.startsWith("console-consumer"))
.collect(Collectors.toSet());
} catch (Exception e) {
LOGGER.error("collect consumerGroup failed, clusterId:{}.", clusterId, e);
}
return new HashSet<>();
}
private Map<String, Set<String>> collectTopicAndConsumerGroupMap(Long clusterId,
List<String> consumerGroupList) {
ZkConfigImpl zkConfigImpl = PhysicalClusterMetadataManager.getZKConfig(clusterId);
FutureTask<List<String>>[] taskList = new FutureTask[consumerGroupList.size()];
for (int i = 0; i < consumerGroupList.size(); i++) {
final String consumerGroup = consumerGroupList.get(i);
taskList[i] = new FutureTask<List<String>>(new Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
try {
return zkConfigImpl.getChildren(ZkPathUtil.getConsumerGroupOffsetRoot(consumerGroup));
} catch (Exception e) {
LOGGER.error("collect topicName and consumerGroup failed, clusterId:{} consumerGroup:{}.",
clusterId, consumerGroup, e);
}
return new ArrayList<>();
}
});
ThreadPool.submitCollectMetricsTask(taskList[i]);
}
Map<String, Set<String>> topicNameConsumerGroupMap = new HashMap<>();
for (int i = 0; i < taskList.length; ++i) {
try {
List<String> topicNameList = taskList[i].get();
if (ValidateUtils.isEmptyList(topicNameList)) {
continue;
}
for (String topicName: topicNameList) {
Set<String> subConsumerGroupSet =
topicNameConsumerGroupMap.getOrDefault(topicName, new HashSet<>());
subConsumerGroupSet.add(consumerGroupList.get(i));
topicNameConsumerGroupMap.put(topicName, subConsumerGroupSet);
}
} catch (Exception e) {
LOGGER.error("get topic list failed, clusterId:{} consumerGroup:{}.",
clusterId, consumerGroupList.get(i), e);
}
}
return topicNameConsumerGroupMap;
}
}