mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-05 21:12:13 +08:00
开放接口&近期BUG修复
This commit is contained in:
@@ -0,0 +1,20 @@
|
||||
package com.xiaojukeji.kafka.manager.openapi;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.ConsumeHealthEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.openapi.common.dto.*;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 20/5/22
|
||||
*/
|
||||
public interface ThirdPartService {
|
||||
Result<ConsumeHealthEnum> checkConsumeHealth(Long clusterId,
|
||||
String topicName,
|
||||
String consumerGroup,
|
||||
Long maxDelayTime);
|
||||
|
||||
List<Result> resetOffsets(ClusterDO clusterDO, OffsetResetDTO dto);}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.xiaojukeji.kafka.manager.openapi;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.bpm.common.OrderTypeEnum;
|
||||
|
||||
/**
|
||||
* @author zhongyuankai
|
||||
* @date 2020/08/31
|
||||
*/
|
||||
public class ThirdPartUtils {
|
||||
|
||||
public static String getOrderLimitKey(OrderTypeEnum orderTypeEnum, String systemCode) {
|
||||
return orderTypeEnum.getOrderName() + "_" + systemCode;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package com.xiaojukeji.kafka.manager.openapi.common.constant;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 20/10/26
|
||||
*/
|
||||
public class ThirdPartConstant {
|
||||
public final static List<Long> QUOTA_MODIFY_WHITE_CLUSTER_LIST = Arrays.asList(70L, 46L);
|
||||
|
||||
public final static Integer DATA_DREAM_MAX_APP_NUM = 20;
|
||||
|
||||
public final static Integer DATA_DREAM_MAX_AUTHORITY_NUM = 500;
|
||||
|
||||
public final static String SELF_SYSTEM_CODE = "kafkamanager";
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
package com.xiaojukeji.kafka.manager.openapi.common.dto;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 20/6/2
|
||||
*/
|
||||
@ApiModel(description = "消费健康")
|
||||
public class ConsumeHealthDTO {
|
||||
@ApiModelProperty(value = "集群ID")
|
||||
private Long clusterId;
|
||||
|
||||
@ApiModelProperty(value = "Topic名称")
|
||||
private List<String> topicNameList;
|
||||
|
||||
@ApiModelProperty(value = "消费组")
|
||||
private String consumerGroup;
|
||||
|
||||
@ApiModelProperty(value = "允许最大延迟(ms)")
|
||||
private Long maxDelayTime;
|
||||
|
||||
public Long getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
public void setClusterId(Long clusterId) {
|
||||
this.clusterId = clusterId;
|
||||
}
|
||||
|
||||
public List<String> getTopicNameList() {
|
||||
return topicNameList;
|
||||
}
|
||||
|
||||
public void setTopicNameList(List<String> topicNameList) {
|
||||
this.topicNameList = topicNameList;
|
||||
}
|
||||
|
||||
public String getConsumerGroup() {
|
||||
return consumerGroup;
|
||||
}
|
||||
|
||||
public void setConsumerGroup(String consumerGroup) {
|
||||
this.consumerGroup = consumerGroup;
|
||||
}
|
||||
|
||||
public Long getMaxDelayTime() {
|
||||
return maxDelayTime;
|
||||
}
|
||||
|
||||
public void setMaxDelayTime(Long maxDelayTime) {
|
||||
this.maxDelayTime = maxDelayTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ConsumeHealthDTO{" +
|
||||
"clusterId=" + clusterId +
|
||||
", topicNameList=" + topicNameList +
|
||||
", consumerGroup='" + consumerGroup + '\'' +
|
||||
", maxDelayTime=" + maxDelayTime +
|
||||
'}';
|
||||
}
|
||||
|
||||
public boolean paramLegal() {
|
||||
if (ValidateUtils.isNull(clusterId)
|
||||
|| ValidateUtils.isEmptyList(topicNameList)
|
||||
|| ValidateUtils.isBlank(consumerGroup)
|
||||
|| ValidateUtils.isNullOrLessThanZero(maxDelayTime)) {
|
||||
return false;
|
||||
}
|
||||
for (String topicName: topicNameList) {
|
||||
if (ValidateUtils.isExistBlank(topicName)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,208 @@
|
||||
package com.xiaojukeji.kafka.manager.openapi.common.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetResetTypeEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO;
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
@ApiModel(description = "重置消费偏移")
|
||||
public class OffsetResetDTO {
|
||||
@ApiModelProperty(value = "集群ID")
|
||||
private Long clusterId;
|
||||
|
||||
@ApiModelProperty(value = "Topic名称")
|
||||
private String topicName;
|
||||
|
||||
@ApiModelProperty(value = "消费组")
|
||||
private String consumerGroup;
|
||||
|
||||
@ApiModelProperty(value = "消费组位置")
|
||||
private String location;
|
||||
|
||||
@ApiModelProperty(value = "重置的方式[0:依据时间进行重置, 1:指定分区offset进行重置]")
|
||||
private Integer offsetResetType;
|
||||
|
||||
@ApiModelProperty(value = "依据时间进行重置时, 传的参数, 13位时间戳")
|
||||
private Long timestamp;
|
||||
|
||||
@ApiModelProperty(value = "指定分区进行重置时, 传的参数")
|
||||
private List<PartitionOffsetDTO> partitionOffsetDTOList;
|
||||
|
||||
@ApiModelProperty(value = "如果消费组不存在则创建")
|
||||
private Boolean createIfAbsent = Boolean.FALSE;
|
||||
|
||||
@ApiModelProperty(value = "使用的AppID")
|
||||
private String appId;
|
||||
|
||||
@ApiModelProperty(value = "App密码")
|
||||
private String password;
|
||||
|
||||
@ApiModelProperty(value = "操作人")
|
||||
private String operator;
|
||||
|
||||
@ApiModelProperty(value = "系统code")
|
||||
private String systemCode;
|
||||
|
||||
/**
|
||||
* 默认使用assign的方式进行重置,
|
||||
* 但是使用assign方式对于多个Topic的消费使用同一个消费组的场景, 需要停掉所有的client才可以重置成功, 否则重置失败
|
||||
*
|
||||
* 使用subscribe重置offset, 针对上面的场景可以重置成功, 但是涉及到poll函数调用, 所以默认是关闭的
|
||||
*/
|
||||
private Boolean subscribeReset = Boolean.FALSE; // 订阅重置, 默认是assign方式重置
|
||||
|
||||
public Long getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
public void setClusterId(Long clusterId) {
|
||||
this.clusterId = clusterId;
|
||||
}
|
||||
|
||||
public String getTopicName() {
|
||||
return topicName;
|
||||
}
|
||||
|
||||
public void setTopicName(String topicName) {
|
||||
this.topicName = topicName;
|
||||
}
|
||||
|
||||
public String getConsumerGroup() {
|
||||
return consumerGroup;
|
||||
}
|
||||
|
||||
public void setConsumerGroup(String consumerGroup) {
|
||||
this.consumerGroup = consumerGroup;
|
||||
}
|
||||
|
||||
public String getLocation() {
|
||||
return location;
|
||||
}
|
||||
|
||||
public void setLocation(String location) {
|
||||
this.location = location;
|
||||
}
|
||||
|
||||
public Integer getOffsetResetType() {
|
||||
return offsetResetType;
|
||||
}
|
||||
|
||||
public void setOffsetResetType(Integer offsetResetType) {
|
||||
this.offsetResetType = offsetResetType;
|
||||
}
|
||||
|
||||
public Long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public void setTimestamp(Long timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
public List<PartitionOffsetDTO> getPartitionOffsetDTOList() {
|
||||
return partitionOffsetDTOList;
|
||||
}
|
||||
|
||||
public void setPartitionOffsetDTOList(List<PartitionOffsetDTO> partitionOffsetDTOList) {
|
||||
this.partitionOffsetDTOList = partitionOffsetDTOList;
|
||||
}
|
||||
|
||||
public Boolean getCreateIfAbsent() {
|
||||
return createIfAbsent;
|
||||
}
|
||||
|
||||
public void setCreateIfAbsent(Boolean createIfAbsent) {
|
||||
this.createIfAbsent = createIfAbsent;
|
||||
}
|
||||
|
||||
public String getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public void setAppId(String appId) {
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
public void setPassword(String password) {
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public String getOperator() {
|
||||
return operator;
|
||||
}
|
||||
|
||||
public void setOperator(String operator) {
|
||||
this.operator = operator;
|
||||
}
|
||||
|
||||
public String getSystemCode() {
|
||||
return systemCode;
|
||||
}
|
||||
|
||||
public void setSystemCode(String systemCode) {
|
||||
this.systemCode = systemCode;
|
||||
}
|
||||
|
||||
public Boolean getSubscribeReset() {
|
||||
return subscribeReset;
|
||||
}
|
||||
|
||||
public void setSubscribeReset(Boolean subscribeReset) {
|
||||
this.subscribeReset = subscribeReset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "OffsetResetModel{" +
|
||||
"clusterId=" + clusterId +
|
||||
", topicName='" + topicName + '\'' +
|
||||
", consumerGroup='" + consumerGroup + '\'' +
|
||||
", location='" + location + '\'' +
|
||||
", offsetResetType=" + offsetResetType +
|
||||
", timestamp=" + timestamp +
|
||||
", partitionOffsetDTOList=" + partitionOffsetDTOList +
|
||||
", createIfAbsent=" + createIfAbsent +
|
||||
", appId='" + appId + '\'' +
|
||||
", password='" + password + '\'' +
|
||||
", operator='" + operator + '\'' +
|
||||
", systemCode='" + systemCode + '\'' +
|
||||
", subscribeReset=" + subscribeReset +
|
||||
'}';
|
||||
}
|
||||
|
||||
public boolean legal() {
|
||||
if (clusterId == null
|
||||
|| StringUtils.isEmpty(topicName)
|
||||
|| StringUtils.isEmpty(consumerGroup)
|
||||
|| StringUtils.isEmpty(location)
|
||||
|| offsetResetType == null
|
||||
|| StringUtils.isEmpty(operator)) {
|
||||
return false;
|
||||
}
|
||||
appId = (appId == null? "": appId);
|
||||
password = (password == null? "": password);
|
||||
if (createIfAbsent == null) {
|
||||
createIfAbsent = false;
|
||||
}
|
||||
if (subscribeReset == null) {
|
||||
subscribeReset = false;
|
||||
}
|
||||
|
||||
// 只能依据时间或者offset中的一个进行重置
|
||||
if (OffsetResetTypeEnum.RESET_BY_TIME.getCode().equals(offsetResetType)) {
|
||||
return timestamp != null;
|
||||
} else if (OffsetResetTypeEnum.RESET_BY_OFFSET.getCode().equals(offsetResetType)) {
|
||||
return partitionOffsetDTOList != null;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package com.xiaojukeji.kafka.manager.openapi.common.vo;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 20/9/14
|
||||
*/
|
||||
public class BrokerRegionVO {
|
||||
private Long clusterId;
|
||||
|
||||
private Integer brokerId;
|
||||
|
||||
private String hostname;
|
||||
|
||||
private String regionName;
|
||||
|
||||
public Long getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
public void setClusterId(Long clusterId) {
|
||||
this.clusterId = clusterId;
|
||||
}
|
||||
|
||||
public Integer getBrokerId() {
|
||||
return brokerId;
|
||||
}
|
||||
|
||||
public void setBrokerId(Integer brokerId) {
|
||||
this.brokerId = brokerId;
|
||||
}
|
||||
|
||||
public String getHostname() {
|
||||
return hostname;
|
||||
}
|
||||
|
||||
public void setHostname(String hostname) {
|
||||
this.hostname = hostname;
|
||||
}
|
||||
|
||||
public String getRegionName() {
|
||||
return regionName;
|
||||
}
|
||||
|
||||
public void setRegionName(String regionName) {
|
||||
this.regionName = regionName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "BrokerRegionVO{" +
|
||||
"clusterId=" + clusterId +
|
||||
", brokerId=" + brokerId +
|
||||
", hostname='" + hostname + '\'' +
|
||||
", regionName='" + regionName + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package com.xiaojukeji.kafka.manager.openapi.common.vo;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 20/10/26
|
||||
*/
|
||||
public class ConsumeHealthVO {
|
||||
private Integer healthCode;
|
||||
|
||||
public ConsumeHealthVO(Integer healthCode) {
|
||||
this.healthCode = healthCode;
|
||||
}
|
||||
|
||||
public Integer getHealthCode() {
|
||||
return healthCode;
|
||||
}
|
||||
|
||||
public void setHealthCode(Integer healthCode) {
|
||||
this.healthCode = healthCode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ConsumeHealthVO{" +
|
||||
"healthCode=" + healthCode +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
package com.xiaojukeji.kafka.manager.openapi.common.vo;
|
||||
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 20/9/9
|
||||
*/
|
||||
@ApiModel(description="第三方-Broker概览")
|
||||
public class ThirdPartBrokerOverviewVO {
|
||||
@ApiModelProperty(value = "集群ID")
|
||||
private Long clusterId;
|
||||
|
||||
@ApiModelProperty(value = "BrokerId")
|
||||
private Integer brokerId;
|
||||
|
||||
@ApiModelProperty(value = "处于同步状态 false:已同步, true:未同步")
|
||||
private Boolean underReplicated;
|
||||
|
||||
public ThirdPartBrokerOverviewVO(Long clusterId, Integer brokerId, Boolean underReplicated) {
|
||||
this.clusterId = clusterId;
|
||||
this.brokerId = brokerId;
|
||||
this.underReplicated = underReplicated;
|
||||
}
|
||||
|
||||
public Long getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
public void setClusterId(Long clusterId) {
|
||||
this.clusterId = clusterId;
|
||||
}
|
||||
|
||||
public Integer getBrokerId() {
|
||||
return brokerId;
|
||||
}
|
||||
|
||||
public void setBrokerId(Integer brokerId) {
|
||||
this.brokerId = brokerId;
|
||||
}
|
||||
|
||||
public Boolean getUnderReplicated() {
|
||||
return underReplicated;
|
||||
}
|
||||
|
||||
public void setUnderReplicated(Boolean underReplicated) {
|
||||
this.underReplicated = underReplicated;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ThirdPartBrokerOverviewVO{" +
|
||||
"clusterId=" + clusterId +
|
||||
", brokerId=" + brokerId +
|
||||
", underReplicated=" + underReplicated +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package com.xiaojukeji.kafka.manager.openapi.common.vo;
|
||||
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 20/8/24
|
||||
*/
|
||||
@ApiModel(description="TopicOffset变化")
|
||||
public class TopicOffsetChangedVO {
|
||||
@ApiModelProperty(value="Offset是否变化, 0:否, 1:是, -1:未知")
|
||||
private Integer offsetChanged;
|
||||
|
||||
public TopicOffsetChangedVO(Integer offsetChanged) {
|
||||
this.offsetChanged = offsetChanged;
|
||||
}
|
||||
|
||||
public Integer getOffsetChanged() {
|
||||
return offsetChanged;
|
||||
}
|
||||
|
||||
public void setOffsetChanged(Integer offsetChanged) {
|
||||
this.offsetChanged = offsetChanged;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TopicOffsetChangedVO{" +
|
||||
"offsetChanged=" + offsetChanged +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.xiaojukeji.kafka.manager.openapi.common.vo;
|
||||
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 20/8/14
|
||||
*/
|
||||
@ApiModel(description="Topic流量统计信息")
|
||||
public class TopicStatisticMetricsVO {
|
||||
@ApiModelProperty(value="峰值流入流量(B/s)")
|
||||
private Double peakBytesIn;
|
||||
|
||||
public TopicStatisticMetricsVO(Double peakBytesIn) {
|
||||
this.peakBytesIn = peakBytesIn;
|
||||
|
||||
}
|
||||
|
||||
public Double getPeakBytesIn() {
|
||||
return peakBytesIn;
|
||||
}
|
||||
|
||||
public void setPeakBytesIn(Double peakBytesIn) {
|
||||
this.peakBytesIn = peakBytesIn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TopicStatisticMetricsVO{" +
|
||||
"peakBytesIn=" + peakBytesIn +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,200 @@
|
||||
package com.xiaojukeji.kafka.manager.openapi.impl;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.*;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroupDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.openapi.ThirdPartService;
|
||||
import com.xiaojukeji.kafka.manager.openapi.common.dto.*;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.service.*;
|
||||
import kafka.admin.AdminClient;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import scala.collection.JavaConversions;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 20/5/22
|
||||
*/
|
||||
@Service("thirdPartService")
|
||||
public class ThirdPartServiceImpl implements ThirdPartService {
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(ThirdPartServiceImpl.class);
|
||||
|
||||
@Autowired
|
||||
private ClusterService clusterService;
|
||||
|
||||
@Autowired
|
||||
private TopicService topicService;
|
||||
|
||||
@Autowired
|
||||
private ConsumerService consumerService;
|
||||
|
||||
@Override
|
||||
public Result<ConsumeHealthEnum> checkConsumeHealth(Long clusterId,
|
||||
String topicName,
|
||||
String consumerGroup,
|
||||
Long maxDelayTime) {
|
||||
ClusterDO clusterDO = clusterService.getById(clusterId);
|
||||
if (ValidateUtils.isNull(clusterDO)) {
|
||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||
}
|
||||
TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName);
|
||||
if (ValidateUtils.isNull(topicMetadata)) {
|
||||
return Result.buildFrom(ResultStatus.TOPIC_NOT_EXIST);
|
||||
}
|
||||
|
||||
// 获取消费组当前的offset
|
||||
Map<TopicPartition, Object> consumeOffsetMap = listGroupOffsets(clusterId, consumerGroup);
|
||||
if (ValidateUtils.isNull(consumeOffsetMap)) {
|
||||
return new Result<>(ConsumeHealthEnum.UNKNOWN);
|
||||
}
|
||||
if (consumeOffsetMap.isEmpty()) {
|
||||
return Result.buildFrom(ResultStatus.CONSUMER_GROUP_NOT_EXIST);
|
||||
}
|
||||
|
||||
Long delayTimestamp = System.currentTimeMillis() - maxDelayTime;
|
||||
|
||||
// 获取指定时间的offset
|
||||
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimeMap =
|
||||
offsetsForTimes(clusterDO, topicMetadata, delayTimestamp);
|
||||
if (ValidateUtils.isNull(offsetAndTimeMap)) {
|
||||
return new Result<>(ConsumeHealthEnum.UNKNOWN);
|
||||
}
|
||||
|
||||
for (TopicPartition tp : offsetAndTimeMap.keySet()) {
|
||||
OffsetAndTimestamp offsetAndTimestamp = offsetAndTimeMap.get(tp);
|
||||
Long consumeOffset = (Long) consumeOffsetMap.get(tp);
|
||||
if (ValidateUtils.isNull(consumeOffset)) {
|
||||
return new Result<>(ConsumeHealthEnum.UNKNOWN);
|
||||
}
|
||||
|
||||
if (offsetAndTimestamp.offset() <= consumeOffset) {
|
||||
// 健康的
|
||||
continue;
|
||||
}
|
||||
|
||||
return new Result<>(ConsumeHealthEnum.UNHEALTH);
|
||||
}
|
||||
return new Result<>(ConsumeHealthEnum.HEALTH);
|
||||
}
|
||||
|
||||
private Map<TopicPartition, Object> listGroupOffsets(Long clusterId, String consumerGroup) {
|
||||
AdminClient client = KafkaClientPool.getAdminClient(clusterId);
|
||||
if (ValidateUtils.isNull(client)) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return JavaConversions.asJavaMap(client.listGroupOffsets(consumerGroup));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("list group offsets failed, clusterId:{}, consumerGroup:{}.", clusterId, consumerGroup, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(ClusterDO clusterDO,
|
||||
TopicMetadata topicMetadata,
|
||||
Long timestamp) {
|
||||
KafkaConsumer kafkaConsumer = null;
|
||||
try {
|
||||
kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO);
|
||||
if (ValidateUtils.isNull(kafkaConsumer)) {
|
||||
return null;
|
||||
}
|
||||
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
|
||||
for (Integer partitionId : topicMetadata.getPartitionMap().getPartitions().keySet()) {
|
||||
timestampsToSearch.put(new TopicPartition(topicMetadata.getTopic(), partitionId), timestamp);
|
||||
}
|
||||
return kafkaConsumer.offsetsForTimes(timestampsToSearch);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("get offset for time failed, clusterDO:{} topicMetadata:{} timestamp:{}.",
|
||||
clusterDO, topicMetadata, timestamp, e);
|
||||
} finally {
|
||||
KafkaClientPool.returnKafkaConsumerClient(clusterDO.getId(), kafkaConsumer);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Result> resetOffsets(ClusterDO clusterDO, OffsetResetDTO dto) {
|
||||
if (ValidateUtils.isNull(dto)) {
|
||||
return null;
|
||||
}
|
||||
List<PartitionOffsetDTO> offsetDTOList = dto.getPartitionOffsetDTOList();
|
||||
if (ValidateUtils.isEmptyList(offsetDTOList)) {
|
||||
offsetDTOList = topicService.getPartitionOffsetList(
|
||||
clusterDO, dto.getTopicName(), dto.getTimestamp());
|
||||
}
|
||||
if (ValidateUtils.isEmptyList(offsetDTOList)) {
|
||||
return null;
|
||||
}
|
||||
OffsetLocationEnum offsetLocation = dto.getLocation().equals(
|
||||
OffsetLocationEnum.ZOOKEEPER.location) ? OffsetLocationEnum.ZOOKEEPER : OffsetLocationEnum.BROKER;
|
||||
ResultStatus result = checkConsumerGroupExist(clusterDO, dto.getTopicName(), dto.getConsumerGroup(), offsetLocation, dto.getCreateIfAbsent());
|
||||
if (ResultStatus.SUCCESS.getCode() != result.getCode()) {
|
||||
return null;
|
||||
}
|
||||
ConsumerGroupDTO consumerGroupDTO = new ConsumerGroupDTO(
|
||||
clusterDO.getId(),
|
||||
dto.getConsumerGroup(),
|
||||
new ArrayList<>(),
|
||||
OffsetLocationEnum.getOffsetStoreLocation(dto.getLocation())
|
||||
);
|
||||
return consumerService.resetConsumerOffset(
|
||||
clusterDO,
|
||||
dto.getTopicName(),
|
||||
consumerGroupDTO,
|
||||
offsetDTOList
|
||||
);
|
||||
}
|
||||
|
||||
private ResultStatus checkConsumerGroupExist(ClusterDO clusterDO,
|
||||
String topicName,
|
||||
String consumerGroup,
|
||||
OffsetLocationEnum offsetLocation,
|
||||
Boolean createIfAbsent) {
|
||||
if (createIfAbsent) {
|
||||
// 如果不存在, 则直接创建
|
||||
return isCreateIfAbsentOverflow(clusterDO, topicName);
|
||||
}
|
||||
if (!consumerService.checkConsumerGroupExist(offsetLocation, clusterDO.getId(), topicName, consumerGroup)) {
|
||||
return ResultStatus.PARAM_ILLEGAL;
|
||||
}
|
||||
return ResultStatus.SUCCESS;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 限制单天单集群的重置次数不能超过20个
|
||||
* <clusterId-topicName, timestamp * 100 + count>
|
||||
*/
|
||||
private static final Map<String, Long> createIfAbsentCountMap = new HashMap<>();
|
||||
|
||||
private synchronized ResultStatus isCreateIfAbsentOverflow(ClusterDO clusterDO, String topicName) {
|
||||
String key = clusterDO.getId() + "_" + topicName;
|
||||
Long timestampAndCount = createIfAbsentCountMap.get(key);
|
||||
if (ValidateUtils.isNull(timestampAndCount) ||
|
||||
(System.currentTimeMillis() - (timestampAndCount / 100) >= (24 *60 * 60 * 1000))) {
|
||||
// 24小时卫触发, 统计归0
|
||||
timestampAndCount = System.currentTimeMillis() * 100L + 1;
|
||||
} else if (timestampAndCount % 100 > 20) {
|
||||
return ResultStatus.OPERATION_FORBIDDEN;
|
||||
} else {
|
||||
timestampAndCount += 1;
|
||||
}
|
||||
createIfAbsentCountMap.put(key, timestampAndCount);
|
||||
return ResultStatus.SUCCESS;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user