Merge pull request #341 from didi/dev

Topic基本信息中增加retention.bytes信息
This commit is contained in:
EricZeng
2021-07-02 18:34:56 +08:00
committed by GitHub
8 changed files with 119 additions and 73 deletions

View File

@@ -25,6 +25,8 @@ public class TopicCreationConstant {
public static final String TOPIC_RETENTION_TIME_KEY_NAME = "retention.ms"; public static final String TOPIC_RETENTION_TIME_KEY_NAME = "retention.ms";
public static final String TOPIC_RETENTION_BYTES_KEY_NAME = "retention.bytes";
public static final Long DEFAULT_QUOTA = 3 * 1024 * 1024L; public static final Long DEFAULT_QUOTA = 3 * 1024 * 1024L;
public static Properties createNewProperties(Long retentionTime) { public static Properties createNewProperties(Long retentionTime) {

View File

@@ -37,6 +37,8 @@ public class TopicBasicDTO {
private Long retentionTime; private Long retentionTime;
private Long retentionBytes;
public Long getClusterId() { public Long getClusterId() {
return clusterId; return clusterId;
} }
@@ -157,6 +159,14 @@ public class TopicBasicDTO {
this.retentionTime = retentionTime; this.retentionTime = retentionTime;
} }
public Long getRetentionBytes() {
return retentionBytes;
}
public void setRetentionBytes(Long retentionBytes) {
this.retentionBytes = retentionBytes;
}
@Override @Override
public String toString() { public String toString() {
return "TopicBasicDTO{" + return "TopicBasicDTO{" +
@@ -166,7 +176,7 @@ public class TopicBasicDTO {
", principals='" + principals + '\'' + ", principals='" + principals + '\'' +
", topicName='" + topicName + '\'' + ", topicName='" + topicName + '\'' +
", description='" + description + '\'' + ", description='" + description + '\'' +
", regionNameList='" + regionNameList + '\'' + ", regionNameList=" + regionNameList +
", score=" + score + ", score=" + score +
", topicCodeC='" + topicCodeC + '\'' + ", topicCodeC='" + topicCodeC + '\'' +
", partitionNum=" + partitionNum + ", partitionNum=" + partitionNum +
@@ -175,6 +185,7 @@ public class TopicBasicDTO {
", modifyTime=" + modifyTime + ", modifyTime=" + modifyTime +
", createTime=" + createTime + ", createTime=" + createTime +
", retentionTime=" + retentionTime + ", retentionTime=" + retentionTime +
", retentionBytes=" + retentionBytes +
'}'; '}';
} }
} }

View File

@@ -33,6 +33,9 @@ public class TopicBasicVO {
@ApiModelProperty(value = "存储时间(ms)") @ApiModelProperty(value = "存储时间(ms)")
private Long retentionTime; private Long retentionTime;
@ApiModelProperty(value = "单分区数据保存大小(Byte)")
private Long retentionBytes;
@ApiModelProperty(value = "创建时间") @ApiModelProperty(value = "创建时间")
private Long createTime; private Long createTime;
@@ -62,12 +65,20 @@ public class TopicBasicVO {
this.clusterId = clusterId; this.clusterId = clusterId;
} }
public String getTopicCodeC() { public String getAppId() {
return topicCodeC; return appId;
} }
public void setTopicCodeC(String topicCodeC) { public void setAppId(String appId) {
this.topicCodeC = topicCodeC; this.appId = appId;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
} }
public Integer getPartitionNum() { public Integer getPartitionNum() {
@@ -86,22 +97,6 @@ public class TopicBasicVO {
this.replicaNum = replicaNum; this.replicaNum = replicaNum;
} }
public Long getModifyTime() {
return modifyTime;
}
public void setModifyTime(Long modifyTime) {
this.modifyTime = modifyTime;
}
public Long getCreateTime() {
return createTime;
}
public void setCreateTime(Long createTime) {
this.createTime = createTime;
}
public String getPrincipals() { public String getPrincipals() {
return principals; return principals;
} }
@@ -110,30 +105,6 @@ public class TopicBasicVO {
this.principals = principals; this.principals = principals;
} }
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public void setAppId(String appId) {
this.appId = appId;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getAppId() {
return appId;
}
public String getBootstrapServers() {
return bootstrapServers;
}
public Long getRetentionTime() { public Long getRetentionTime() {
return retentionTime; return retentionTime;
} }
@@ -142,12 +113,28 @@ public class TopicBasicVO {
this.retentionTime = retentionTime; this.retentionTime = retentionTime;
} }
public String getAppName() { public Long getRetentionBytes() {
return appName; return retentionBytes;
} }
public void setAppName(String appName) { public void setRetentionBytes(Long retentionBytes) {
this.appName = appName; this.retentionBytes = retentionBytes;
}
public Long getCreateTime() {
return createTime;
}
public void setCreateTime(Long createTime) {
this.createTime = createTime;
}
public Long getModifyTime() {
return modifyTime;
}
public void setModifyTime(Long modifyTime) {
this.modifyTime = modifyTime;
} }
public Integer getScore() { public Integer getScore() {
@@ -158,6 +145,30 @@ public class TopicBasicVO {
this.score = score; this.score = score;
} }
public String getTopicCodeC() {
return topicCodeC;
}
public void setTopicCodeC(String topicCodeC) {
this.topicCodeC = topicCodeC;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getBootstrapServers() {
return bootstrapServers;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public List<String> getRegionNameList() { public List<String> getRegionNameList() {
return regionNameList; return regionNameList;
} }
@@ -176,6 +187,7 @@ public class TopicBasicVO {
", replicaNum=" + replicaNum + ", replicaNum=" + replicaNum +
", principals='" + principals + '\'' + ", principals='" + principals + '\'' +
", retentionTime=" + retentionTime + ", retentionTime=" + retentionTime +
", retentionBytes=" + retentionBytes +
", createTime=" + createTime + ", createTime=" + createTime +
", modifyTime=" + modifyTime + ", modifyTime=" + modifyTime +
", score=" + score + ", score=" + score +

View File

@@ -3,10 +3,12 @@ package com.xiaojukeji.kafka.manager.service.cache;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum; import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion; import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils; import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
@@ -56,7 +58,7 @@ public class PhysicalClusterMetadataManager {
private final static Map<Long, Map<String, TopicMetadata>> TOPIC_METADATA_MAP = new ConcurrentHashMap<>(); private final static Map<Long, Map<String, TopicMetadata>> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
private final static Map<Long, Map<String, Long>> TOPIC_RETENTION_TIME_MAP = new ConcurrentHashMap<>(); private final static Map<Long, Map<String, Properties>> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>();
private final static Map<Long, Map<Integer, BrokerMetadata>> BROKER_METADATA_MAP = new ConcurrentHashMap<>(); private final static Map<Long, Map<Integer, BrokerMetadata>> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
@@ -95,7 +97,7 @@ public class PhysicalClusterMetadataManager {
// 初始化topic-map // 初始化topic-map
TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
TOPIC_RETENTION_TIME_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); TOPIC_PROPERTIES_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
// 初始化cluster-map // 初始化cluster-map
CLUSTER_MAP.put(clusterDO.getId(), clusterDO); CLUSTER_MAP.put(clusterDO.getId(), clusterDO);
@@ -158,7 +160,7 @@ public class PhysicalClusterMetadataManager {
KAFKA_VERSION_MAP.remove(clusterId); KAFKA_VERSION_MAP.remove(clusterId);
TOPIC_METADATA_MAP.remove(clusterId); TOPIC_METADATA_MAP.remove(clusterId);
TOPIC_RETENTION_TIME_MAP.remove(clusterId); TOPIC_PROPERTIES_MAP.remove(clusterId);
CLUSTER_MAP.remove(clusterId); CLUSTER_MAP.remove(clusterId);
} }
@@ -262,24 +264,45 @@ public class PhysicalClusterMetadataManager {
//---------------------------配置相关元信息-------------- //---------------------------配置相关元信息--------------
public static void putTopicRetentionTime(Long clusterId, String topicName, Long retentionTime) { public static void putTopicProperties(Long clusterId, String topicName, Properties properties) {
Map<String, Long> timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId); if (ValidateUtils.isNull(clusterId) || ValidateUtils.isBlank(topicName) || ValidateUtils.isNull(properties)) {
if (timeMap == null) {
return; return;
} }
timeMap.put(topicName, retentionTime);
Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
if (ValidateUtils.isNull(propertiesMap)) {
return;
}
propertiesMap.put(topicName, properties);
} }
public static Long getTopicRetentionTime(Long clusterId, String topicName) { public static Long getTopicRetentionTime(Long clusterId, String topicName) {
Map<String, Long> timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId); Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
if (timeMap == null) { if (ValidateUtils.isNull(propertiesMap)) {
return null; return null;
} }
return timeMap.get(topicName);
Properties properties = propertiesMap.get(topicName);
if (ValidateUtils.isNull(properties)) {
return null;
} }
return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_TIME_KEY_NAME));
}
public static Long getTopicRetentionBytes(Long clusterId, String topicName) {
Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
if (ValidateUtils.isNull(propertiesMap)) {
return null;
}
Properties properties = propertiesMap.get(topicName);
if (ValidateUtils.isNull(properties)) {
return null;
}
return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_BYTES_KEY_NAME));
}
//---------------------------Broker元信息相关-------------- //---------------------------Broker元信息相关--------------

View File

@@ -223,6 +223,7 @@ public class TopicServiceImpl implements TopicService {
basicDTO.setCreateTime(topicMetadata.getCreateTime()); basicDTO.setCreateTime(topicMetadata.getCreateTime());
basicDTO.setModifyTime(topicMetadata.getModifyTime()); basicDTO.setModifyTime(topicMetadata.getModifyTime());
basicDTO.setRetentionTime(PhysicalClusterMetadataManager.getTopicRetentionTime(clusterId, topicName)); basicDTO.setRetentionTime(PhysicalClusterMetadataManager.getTopicRetentionTime(clusterId, topicName));
basicDTO.setRetentionBytes(PhysicalClusterMetadataManager.getTopicRetentionBytes(clusterId, topicName));
TopicDO topicDO = topicManagerService.getByTopicName(clusterId, topicName); TopicDO topicDO = topicManagerService.getByTopicName(clusterId, topicName);
if (!ValidateUtils.isNull(topicDO)) { if (!ValidateUtils.isNull(topicDO)) {

View File

@@ -14,13 +14,14 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
import java.util.Properties;
/** /**
* @author zengqiao * @author zengqiao
* @date 20/7/23 * @date 20/7/23
*/ */
@Component @Component
public class FlushTopicRetentionTime { public class FlushTopicProperties {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired @Autowired
@@ -33,7 +34,7 @@ public class FlushTopicRetentionTime {
try { try {
flush(clusterDO); flush(clusterDO);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("flush topic retention time failed, clusterId:{}.", clusterDO.getId(), e); LOGGER.error("flush topic properties failed, clusterId:{}.", clusterDO.getId(), e);
} }
} }
} }
@@ -41,22 +42,20 @@ public class FlushTopicRetentionTime {
private void flush(ClusterDO clusterDO) { private void flush(ClusterDO clusterDO) {
ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterDO.getId()); ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterDO.getId());
if (ValidateUtils.isNull(zkConfig)) { if (ValidateUtils.isNull(zkConfig)) {
LOGGER.error("flush topic retention time, get zk config failed, clusterId:{}.", clusterDO.getId()); LOGGER.error("flush topic properties, get zk config failed, clusterId:{}.", clusterDO.getId());
return; return;
} }
for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) { for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) {
try { try {
Long retentionTime = KafkaZookeeperUtils.getTopicRetentionTime(zkConfig, topicName); Properties properties = KafkaZookeeperUtils.getTopicProperties(zkConfig, topicName);
if (retentionTime == null) { if (ValidateUtils.isNull(properties)) {
LOGGER.warn("get topic retentionTime failed, clusterId:{} topicName:{}.", LOGGER.warn("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName);
clusterDO.getId(), topicName);
continue; continue;
} }
PhysicalClusterMetadataManager.putTopicRetentionTime(clusterDO.getId(), topicName, retentionTime); PhysicalClusterMetadataManager.putTopicProperties(clusterDO.getId(), topicName, properties);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("get topic retentionTime failed, clusterId:{} topicName:{}.", LOGGER.error("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e);
clusterDO.getId(), topicName, e);
} }
} }
} }

View File

@@ -61,10 +61,7 @@ public class NormalTopicController {
@ApiOperation(value = "Topic基本信息", notes = "") @ApiOperation(value = "Topic基本信息", notes = "")
@RequestMapping(value = "{clusterId}/topics/{topicName}/basic-info", method = RequestMethod.GET) @RequestMapping(value = "{clusterId}/topics/{topicName}/basic-info", method = RequestMethod.GET)
@ResponseBody @ResponseBody
public Result<TopicBasicVO> getTopicBasic( public Result<TopicBasicVO> getTopicBasic(@PathVariable Long clusterId, @PathVariable String topicName, @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) {
@PathVariable Long clusterId,
@PathVariable String topicName,
@RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) {
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId); Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId);
if (ValidateUtils.isNull(physicalClusterId)) { if (ValidateUtils.isNull(physicalClusterId)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);

View File

@@ -31,6 +31,7 @@ public class TopicModelConverter {
vo.setReplicaNum(dto.getReplicaNum()); vo.setReplicaNum(dto.getReplicaNum());
vo.setPrincipals(dto.getPrincipals()); vo.setPrincipals(dto.getPrincipals());
vo.setRetentionTime(dto.getRetentionTime()); vo.setRetentionTime(dto.getRetentionTime());
vo.setRetentionBytes(dto.getRetentionBytes());
vo.setCreateTime(dto.getCreateTime()); vo.setCreateTime(dto.getCreateTime());
vo.setModifyTime(dto.getModifyTime()); vo.setModifyTime(dto.getModifyTime());
vo.setScore(dto.getScore()); vo.setScore(dto.getScore());