mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
Merge branch 'dev' into dev_v2.6.0
This commit is contained in:
@@ -208,7 +208,8 @@ public class LogicalClusterMetadataManager {
|
||||
// 计算逻辑集群到Topic名称的映射
|
||||
Set<String> topicNameSet = PhysicalClusterMetadataManager.getBrokerTopicNum(
|
||||
logicalClusterDO.getClusterId(),
|
||||
brokerIdSet);
|
||||
brokerIdSet
|
||||
);
|
||||
LOGICAL_CLUSTER_ID_TOPIC_NAME_MAP.put(logicalClusterDO.getId(), topicNameSet);
|
||||
|
||||
// 计算Topic名称到逻辑集群的映射
|
||||
|
||||
@@ -317,7 +317,7 @@ public class PhysicalClusterMetadataManager {
|
||||
metadataMap.put(brokerId, brokerMetadata);
|
||||
|
||||
Map<Integer, JmxConnectorWrap> jmxMap = JMX_CONNECTOR_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>());
|
||||
jmxMap.put(brokerId, new JmxConnectorWrap(brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxConfig));
|
||||
jmxMap.put(brokerId, new JmxConnectorWrap(clusterId, brokerId, brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxConfig));
|
||||
JMX_CONNECTOR_MAP.put(clusterId, jmxMap);
|
||||
|
||||
Map<Integer, KafkaVersion> versionMap = KAFKA_VERSION_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>());
|
||||
@@ -542,9 +542,12 @@ public class PhysicalClusterMetadataManager {
|
||||
}
|
||||
|
||||
public static Set<String> getBrokerTopicNum(Long clusterId, Set<Integer> brokerIdSet) {
|
||||
Set<String> topicNameSet = new HashSet<>();
|
||||
|
||||
Map<String, TopicMetadata> metadataMap = TOPIC_METADATA_MAP.get(clusterId);
|
||||
if (metadataMap == null) {
|
||||
return new HashSet<>();
|
||||
}
|
||||
|
||||
Set<String> topicNameSet = new HashSet<>();
|
||||
for (String topicName: metadataMap.keySet()) {
|
||||
try {
|
||||
TopicMetadata tm = metadataMap.get(topicName);
|
||||
|
||||
@@ -13,4 +13,12 @@ public interface TopicExpiredService {
|
||||
List<TopicExpiredData> getExpiredTopicDataList(String username);
|
||||
|
||||
ResultStatus retainExpiredTopic(Long physicalClusterId, String topicName, Integer retainDays);
|
||||
|
||||
/**
|
||||
* 通过topictopic名称删除
|
||||
* @param clusterId 集群id
|
||||
* @param topicName topic名称
|
||||
* @return int
|
||||
*/
|
||||
int deleteByTopicName(Long clusterId, String topicName);
|
||||
}
|
||||
@@ -185,7 +185,8 @@ public class GatewayConfigServiceImpl implements GatewayConfigService {
|
||||
List<GatewayConfigDO> gatewayConfigDOList = gatewayConfigDao.getByConfigType(gatewayConfigDO.getType());
|
||||
Long version = 1L;
|
||||
for (GatewayConfigDO elem: gatewayConfigDOList) {
|
||||
if (elem.getVersion() > version) {
|
||||
if (elem.getVersion() >= version) {
|
||||
// 大于等于的情况下,都需要+1
|
||||
version = elem.getVersion() + 1L;
|
||||
}
|
||||
}
|
||||
@@ -204,6 +205,7 @@ public class GatewayConfigServiceImpl implements GatewayConfigService {
|
||||
@Override
|
||||
public Result deleteById(Long id) {
|
||||
try {
|
||||
// TODO 删除的时候,不能直接删,也需要变更一下version
|
||||
if (gatewayConfigDao.deleteById(id) > 0) {
|
||||
return Result.buildSuc();
|
||||
}
|
||||
@@ -232,7 +234,8 @@ public class GatewayConfigServiceImpl implements GatewayConfigService {
|
||||
List<GatewayConfigDO> gatewayConfigDOList = gatewayConfigDao.getByConfigType(newGatewayConfigDO.getType());
|
||||
Long version = 1L;
|
||||
for (GatewayConfigDO elem: gatewayConfigDOList) {
|
||||
if (elem.getVersion() > version) {
|
||||
if (elem.getVersion() >= version) {
|
||||
// 大于等于的情况下,都需要+1
|
||||
version = elem.getVersion() + 1L;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,9 @@ public class AdminServiceImpl implements AdminService {
|
||||
@Autowired
|
||||
private TopicManagerService topicManagerService;
|
||||
|
||||
@Autowired
|
||||
private TopicExpiredService topicExpiredService;
|
||||
|
||||
@Autowired
|
||||
private TopicService topicService;
|
||||
|
||||
@@ -143,6 +146,7 @@ public class AdminServiceImpl implements AdminService {
|
||||
|
||||
// 3. 数据库中删除topic
|
||||
topicManagerService.deleteByTopicName(clusterDO.getId(), topicName);
|
||||
topicExpiredService.deleteByTopicName(clusterDO.getId(), topicName);
|
||||
|
||||
// 4. 数据库中删除authority
|
||||
authorityService.deleteAuthorityByTopic(clusterDO.getId(), topicName);
|
||||
|
||||
@@ -19,6 +19,8 @@ import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.service.*;
|
||||
import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -210,7 +212,7 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
|
||||
ZooKeeper zk = null;
|
||||
try {
|
||||
zk = new ZooKeeper(zookeeper, 1000, null);
|
||||
zk = new ZooKeeper(zookeeper, 1000, watchedEvent -> LOGGER.info(" receive event : " + watchedEvent.getType().name()));
|
||||
for (int i = 0; i < 15; ++i) {
|
||||
if (zk.getState().isConnected()) {
|
||||
// 只有状态是connected的时候,才表示地址是合法的
|
||||
|
||||
@@ -2,6 +2,8 @@ package com.xiaojukeji.kafka.manager.service.service.impl;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
|
||||
import com.xiaojukeji.kafka.manager.common.events.RegionCreatedEvent;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
|
||||
import com.xiaojukeji.kafka.manager.dao.RegionDao;
|
||||
@@ -59,6 +61,8 @@ public class RegionServiceImpl implements RegionService {
|
||||
return ResultStatus.BROKER_NOT_EXIST;
|
||||
}
|
||||
if (regionDao.insert(regionDO) > 0) {
|
||||
// 发布region创建事件
|
||||
SpringTool.publish(new RegionCreatedEvent(this, regionDO));
|
||||
return ResultStatus.SUCCESS;
|
||||
}
|
||||
} catch (DuplicateKeyException e) {
|
||||
|
||||
@@ -75,4 +75,14 @@ public class TopicExpiredServiceImpl implements TopicExpiredService {
|
||||
}
|
||||
return ResultStatus.MYSQL_ERROR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteByTopicName(Long clusterId, String topicName) {
|
||||
try {
|
||||
return topicExpiredDao.deleteByName(clusterId, topicName);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete topic failed, clusterId:{} topicName:{}", clusterId, topicName, e);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -210,7 +210,7 @@ public class TopicManagerServiceImpl implements TopicManagerService {
|
||||
}
|
||||
}
|
||||
|
||||
// 增加流量信息
|
||||
// 增加流量和描述信息
|
||||
Map<Long, Map<String, TopicMetrics>> metricMap = KafkaMetricsCache.getAllTopicMetricsFromCache();
|
||||
for (MineTopicSummary mineTopicSummary : summaryList) {
|
||||
TopicMetrics topicMetrics = getTopicMetricsFromCacheOrJmx(
|
||||
@@ -219,6 +219,10 @@ public class TopicManagerServiceImpl implements TopicManagerService {
|
||||
metricMap);
|
||||
mineTopicSummary.setBytesIn(topicMetrics.getSpecifiedMetrics("BytesInPerSecOneMinuteRate"));
|
||||
mineTopicSummary.setBytesOut(topicMetrics.getSpecifiedMetrics("BytesOutPerSecOneMinuteRate"));
|
||||
|
||||
// 增加topic描述信息
|
||||
TopicDO topicDO = topicDao.getByTopicName(mineTopicSummary.getPhysicalClusterId(), mineTopicSummary.getTopicName());
|
||||
mineTopicSummary.setDescription(topicDO.getDescription());
|
||||
}
|
||||
return summaryList;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.service.utils;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@@ -8,38 +9,15 @@ import org.springframework.stereotype.Service;
|
||||
* @author zengqiao
|
||||
* @date 20/4/26
|
||||
*/
|
||||
@Data
|
||||
@Service("configUtils")
|
||||
public class ConfigUtils {
|
||||
@Value(value = "${custom.idc}")
|
||||
private ConfigUtils() {
|
||||
}
|
||||
|
||||
@Value(value = "${custom.idc:cn}")
|
||||
private String idc;
|
||||
|
||||
@Value(value = "${spring.profiles.active}")
|
||||
private String kafkaManagerEnv;
|
||||
|
||||
@Value(value = "${custom.store-metrics-task.save-days}")
|
||||
private Long maxMetricsSaveDays;
|
||||
|
||||
public String getIdc() {
|
||||
return idc;
|
||||
}
|
||||
|
||||
public void setIdc(String idc) {
|
||||
this.idc = idc;
|
||||
}
|
||||
|
||||
public String getKafkaManagerEnv() {
|
||||
return kafkaManagerEnv;
|
||||
}
|
||||
|
||||
public void setKafkaManagerEnv(String kafkaManagerEnv) {
|
||||
this.kafkaManagerEnv = kafkaManagerEnv;
|
||||
}
|
||||
|
||||
public Long getMaxMetricsSaveDays() {
|
||||
return maxMetricsSaveDays;
|
||||
}
|
||||
|
||||
public void setMaxMetricsSaveDays(Long maxMetricsSaveDays) {
|
||||
this.maxMetricsSaveDays = maxMetricsSaveDays;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,13 +19,13 @@ import org.springframework.dao.DuplicateKeyException;
|
||||
* @date 20/5/14
|
||||
*/
|
||||
public class ControllerStateListener implements StateChangeListener {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(ControllerStateListener.class);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ControllerStateListener.class);
|
||||
|
||||
private Long clusterId;
|
||||
private final Long clusterId;
|
||||
|
||||
private ZkConfigImpl zkConfig;
|
||||
private final ZkConfigImpl zkConfig;
|
||||
|
||||
private ControllerDao controllerDao;
|
||||
private final ControllerDao controllerDao;
|
||||
|
||||
public ControllerStateListener(Long clusterId, ZkConfigImpl zkConfig, ControllerDao controllerDao) {
|
||||
this.clusterId = clusterId;
|
||||
@@ -35,8 +35,11 @@ public class ControllerStateListener implements StateChangeListener {
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
if (!checkNodeExist()) {
|
||||
LOGGER.warn("kafka-controller data not exist, clusterId:{}.", clusterId);
|
||||
return;
|
||||
}
|
||||
processControllerChange();
|
||||
return;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -49,12 +52,21 @@ public class ControllerStateListener implements StateChangeListener {
|
||||
break;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("process controller state change failed, clusterId:{} state:{} path:{}.",
|
||||
clusterId, state, path, e);
|
||||
LOGGER.error("process controller state change failed, clusterId:{} state:{} path:{}.", clusterId, state, path, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void processControllerChange(){
|
||||
private boolean checkNodeExist() {
|
||||
try {
|
||||
return zkConfig.checkPathExists(ZkPathUtil.CONTROLLER_ROOT_NODE);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("init kafka-controller data failed, clusterId:{}.", clusterId, e);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private void processControllerChange() {
|
||||
LOGGER.warn("init controllerData or controller change, clusterId:{}.", clusterId);
|
||||
ControllerData controllerData = null;
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user