mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-05 04:50:55 +08:00
version 2.3.0
This commit is contained in:
@@ -15,10 +15,7 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
|
||||
import com.xiaojukeji.kafka.manager.dao.ControllerDao;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
|
||||
import com.xiaojukeji.kafka.manager.dao.TopicDao;
|
||||
import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao;
|
||||
import com.xiaojukeji.kafka.manager.service.service.JmxService;
|
||||
import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
|
||||
import com.xiaojukeji.kafka.manager.service.zookeeper.*;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil;
|
||||
@@ -49,15 +46,6 @@ public class PhysicalClusterMetadataManager {
|
||||
@Autowired
|
||||
private ClusterService clusterService;
|
||||
|
||||
@Autowired
|
||||
private ConfigUtils configUtils;
|
||||
|
||||
@Autowired
|
||||
private TopicDao topicDao;
|
||||
|
||||
@Autowired
|
||||
private AuthorityDao authorityDao;
|
||||
|
||||
private final static Map<Long, ClusterDO> CLUSTER_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final static Map<Long, ControllerData> CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
|
||||
@@ -133,7 +121,7 @@ public class PhysicalClusterMetadataManager {
|
||||
zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener);
|
||||
|
||||
//增加Topic监控
|
||||
TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig, topicDao, authorityDao);
|
||||
TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig);
|
||||
topicListener.init();
|
||||
zkConfig.watchChildren(ZkPathUtil.BROKER_TOPICS_ROOT, topicListener);
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.ClusterDetailDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.cluster.ControllerPreferredCandidate;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.op.ControllerPreferredCandidateDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.ClusterNameDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterMetricsDO;
|
||||
@@ -51,4 +52,20 @@ public interface ClusterService {
|
||||
* @return void
|
||||
*/
|
||||
Result<List<ControllerPreferredCandidate>> getControllerPreferredCandidates(Long clusterId);
|
||||
|
||||
/**
|
||||
* 增加优先被选举为controller的broker
|
||||
* @param clusterId 集群ID
|
||||
* @param brokerIdList brokerId列表
|
||||
* @return
|
||||
*/
|
||||
Result addControllerPreferredCandidates(Long clusterId, List<Integer> brokerIdList);
|
||||
|
||||
/**
|
||||
* 减少优先被选举为controller的broker
|
||||
* @param clusterId 集群ID
|
||||
* @param brokerIdList brokerId列表
|
||||
* @return
|
||||
*/
|
||||
Result deleteControllerPreferredCandidates(Long clusterId, List<Integer> brokerIdList);
|
||||
}
|
||||
|
||||
@@ -26,4 +26,20 @@ public interface ZookeeperService {
|
||||
* @return 操作结果
|
||||
*/
|
||||
Result<List<Integer>> getControllerPreferredCandidates(Long clusterId);
|
||||
|
||||
/**
|
||||
* 增加优先被选举为controller的broker
|
||||
* @param clusterId 集群ID
|
||||
* @param brokerId brokerId
|
||||
* @return
|
||||
*/
|
||||
Result addControllerPreferredCandidate(Long clusterId, Integer brokerId);
|
||||
|
||||
/**
|
||||
* 减少优先被选举为controller的broker
|
||||
* @param clusterId 集群ID
|
||||
* @param brokerId brokerId
|
||||
* @return
|
||||
*/
|
||||
Result deleteControllerPreferredCandidate(Long clusterId, Integer brokerId);
|
||||
}
|
||||
|
||||
@@ -221,13 +221,24 @@ public class GatewayConfigServiceImpl implements GatewayConfigService {
|
||||
if (ValidateUtils.isNull(oldGatewayConfigDO)) {
|
||||
return Result.buildFrom(ResultStatus.RESOURCE_NOT_EXIST);
|
||||
}
|
||||
|
||||
if (!oldGatewayConfigDO.getName().equals(newGatewayConfigDO.getName())
|
||||
|| !oldGatewayConfigDO.getType().equals(newGatewayConfigDO.getType())
|
||||
|| ValidateUtils.isBlank(newGatewayConfigDO.getValue())) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
newGatewayConfigDO.setVersion(oldGatewayConfigDO.getVersion() + 1);
|
||||
if (gatewayConfigDao.updateById(oldGatewayConfigDO) > 0) {
|
||||
|
||||
// 获取当前同类配置, 插入之后需要增大这个version
|
||||
List<GatewayConfigDO> gatewayConfigDOList = gatewayConfigDao.getByConfigType(newGatewayConfigDO.getType());
|
||||
Long version = 1L;
|
||||
for (GatewayConfigDO elem: gatewayConfigDOList) {
|
||||
if (elem.getVersion() > version) {
|
||||
version = elem.getVersion() + 1L;
|
||||
}
|
||||
}
|
||||
|
||||
newGatewayConfigDO.setVersion(version);
|
||||
if (gatewayConfigDao.updateById(newGatewayConfigDO) > 0) {
|
||||
return Result.buildSuc();
|
||||
}
|
||||
return Result.buildFrom(ResultStatus.MYSQL_ERROR);
|
||||
|
||||
@@ -111,12 +111,13 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
// 不允许修改zk地址
|
||||
return ResultStatus.CHANGE_ZOOKEEPER_FORBIDDEN;
|
||||
}
|
||||
clusterDO.setStatus(originClusterDO.getStatus());
|
||||
Map<String, String> content = new HashMap<>();
|
||||
content.put("cluster id", clusterDO.getId().toString());
|
||||
content.put("security properties", clusterDO.getSecurityProperties());
|
||||
content.put("jmx properties", clusterDO.getJmxProperties());
|
||||
operateRecordService.insert(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.EDIT, content);
|
||||
|
||||
clusterDO.setStatus(originClusterDO.getStatus());
|
||||
return updateById(clusterDO);
|
||||
}
|
||||
|
||||
@@ -214,7 +215,7 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
if (zk != null) {
|
||||
zk.close();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -275,7 +276,7 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
try {
|
||||
Map<String, String> content = new HashMap<>();
|
||||
content.put("cluster id", clusterId.toString());
|
||||
operateRecordService.insert(operator, ModuleEnum.CLUSTER, getClusterName(clusterId).getPhysicalClusterName(), OperateEnum.DELETE, content);
|
||||
operateRecordService.insert(operator, ModuleEnum.CLUSTER, String.valueOf(clusterId), OperateEnum.DELETE, content);
|
||||
if (clusterDao.deleteById(clusterId) <= 0) {
|
||||
LOGGER.error("delete cluster failed, clusterId:{}.", clusterId);
|
||||
return ResultStatus.MYSQL_ERROR;
|
||||
@@ -289,8 +290,9 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
|
||||
private ClusterDetailDTO getClusterDetailDTO(ClusterDO clusterDO, Boolean needDetail) {
|
||||
if (ValidateUtils.isNull(clusterDO)) {
|
||||
return null;
|
||||
return new ClusterDetailDTO();
|
||||
}
|
||||
|
||||
ClusterDetailDTO dto = new ClusterDetailDTO();
|
||||
dto.setClusterId(clusterDO.getId());
|
||||
dto.setClusterName(clusterDO.getClusterName());
|
||||
@@ -299,6 +301,7 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
dto.setKafkaVersion(physicalClusterMetadataManager.getKafkaVersionFromCache(clusterDO.getId()));
|
||||
dto.setIdc(configUtils.getIdc());
|
||||
dto.setSecurityProperties(clusterDO.getSecurityProperties());
|
||||
dto.setJmxProperties(clusterDO.getJmxProperties());
|
||||
dto.setStatus(clusterDO.getStatus());
|
||||
dto.setGmtCreate(clusterDO.getGmtCreate());
|
||||
dto.setGmtModify(clusterDO.getGmtModify());
|
||||
@@ -337,4 +340,39 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
}
|
||||
return Result.buildSuc(controllerPreferredCandidateList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result addControllerPreferredCandidates(Long clusterId, List<Integer> brokerIdList) {
|
||||
if (ValidateUtils.isNull(clusterId) || ValidateUtils.isEmptyList(brokerIdList)) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
|
||||
// 增加的BrokerId需要判断是否存活
|
||||
for (Integer brokerId: brokerIdList) {
|
||||
if (!PhysicalClusterMetadataManager.isBrokerAlive(clusterId, brokerId)) {
|
||||
return Result.buildFrom(ResultStatus.BROKER_NOT_EXIST);
|
||||
}
|
||||
|
||||
Result result = zookeeperService.addControllerPreferredCandidate(clusterId, brokerId);
|
||||
if (result.failed()) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
return Result.buildSuc();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result deleteControllerPreferredCandidates(Long clusterId, List<Integer> brokerIdList) {
|
||||
if (ValidateUtils.isNull(clusterId) || ValidateUtils.isEmptyList(brokerIdList)) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
|
||||
for (Integer brokerId: brokerIdList) {
|
||||
Result result = zookeeperService.deleteControllerPreferredCandidate(clusterId, brokerId);
|
||||
if (result.failed()) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
return Result.buildSuc();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,7 +129,7 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
}
|
||||
summary.setState(consumerGroupSummary.state());
|
||||
|
||||
java.util.Iterator<scala.collection.immutable.List<AdminClient.ConsumerSummary>> it = JavaConversions.asJavaIterator(consumerGroupSummary.consumers().iterator());
|
||||
Iterator<scala.collection.immutable.List<AdminClient.ConsumerSummary>> it = JavaConversions.asJavaIterator(consumerGroupSummary.consumers().iterator());
|
||||
while (it.hasNext()) {
|
||||
List<AdminClient.ConsumerSummary> consumerSummaryList = JavaConversions.asJavaList(it.next());
|
||||
for (AdminClient.ConsumerSummary consumerSummary: consumerSummaryList) {
|
||||
|
||||
@@ -70,4 +70,58 @@ public class ZookeeperServiceImpl implements ZookeeperService {
|
||||
}
|
||||
return Result.buildFrom(ResultStatus.ZOOKEEPER_READ_FAILED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result addControllerPreferredCandidate(Long clusterId, Integer brokerId) {
|
||||
if (ValidateUtils.isNull(clusterId)) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterId);
|
||||
if (ValidateUtils.isNull(zkConfig)) {
|
||||
return Result.buildFrom(ResultStatus.ZOOKEEPER_CONNECT_FAILED);
|
||||
}
|
||||
|
||||
try {
|
||||
if (zkConfig.checkPathExists(ZkPathUtil.getControllerCandidatePath(brokerId))) {
|
||||
// 节点已经存在, 则直接忽略
|
||||
return Result.buildSuc();
|
||||
}
|
||||
|
||||
if (!zkConfig.checkPathExists(ZkPathUtil.D_CONFIG_EXTENSION_ROOT_NODE)) {
|
||||
zkConfig.setOrCreatePersistentNodeStat(ZkPathUtil.D_CONFIG_EXTENSION_ROOT_NODE, "");
|
||||
}
|
||||
|
||||
if (!zkConfig.checkPathExists(ZkPathUtil.D_CONTROLLER_CANDIDATES)) {
|
||||
zkConfig.setOrCreatePersistentNodeStat(ZkPathUtil.D_CONTROLLER_CANDIDATES, "");
|
||||
}
|
||||
|
||||
zkConfig.setOrCreatePersistentNodeStat(ZkPathUtil.getControllerCandidatePath(brokerId), "");
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=ZookeeperServiceImpl||method=addControllerPreferredCandidate||clusterId={}||brokerId={}||errMsg={}||", clusterId, brokerId, e.getMessage());
|
||||
}
|
||||
return Result.buildFrom(ResultStatus.ZOOKEEPER_WRITE_FAILED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result deleteControllerPreferredCandidate(Long clusterId, Integer brokerId) {
|
||||
if (ValidateUtils.isNull(clusterId)) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterId);
|
||||
if (ValidateUtils.isNull(zkConfig)) {
|
||||
return Result.buildFrom(ResultStatus.ZOOKEEPER_CONNECT_FAILED);
|
||||
}
|
||||
|
||||
try {
|
||||
if (!zkConfig.checkPathExists(ZkPathUtil.getControllerCandidatePath(brokerId))) {
|
||||
return Result.buildSuc();
|
||||
}
|
||||
zkConfig.delete(ZkPathUtil.getControllerCandidatePath(brokerId));
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=ZookeeperServiceImpl||method=deleteControllerPreferredCandidate||clusterId={}||brokerId={}||errMsg={}||", clusterId, brokerId, e.getMessage());
|
||||
}
|
||||
return Result.buildFrom(ResultStatus.ZOOKEEPER_DELETE_FAILED);
|
||||
}
|
||||
}
|
||||
@@ -44,7 +44,7 @@ public class TopicCommands {
|
||||
);
|
||||
|
||||
// 生成分配策略
|
||||
scala.collection.Map<Object, scala.collection.Seq<Object>> replicaAssignment =
|
||||
scala.collection.Map<Object, Seq<Object>> replicaAssignment =
|
||||
AdminUtils.assignReplicasToBrokers(
|
||||
convert2BrokerMetadataSeq(brokerIdList),
|
||||
partitionNum,
|
||||
@@ -177,7 +177,7 @@ public class TopicCommands {
|
||||
)
|
||||
);
|
||||
|
||||
Map<TopicAndPartition, scala.collection.Seq<Object>> existingAssignJavaMap =
|
||||
Map<TopicAndPartition, Seq<Object>> existingAssignJavaMap =
|
||||
JavaConversions.asJavaMap(existingAssignScalaMap);
|
||||
// 新增分区的分配策略和旧的分配策略合并
|
||||
Map<Object, Seq<Object>> targetMap = new HashMap<>();
|
||||
|
||||
@@ -5,8 +5,6 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.StateChangeListener;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil;
|
||||
import com.xiaojukeji.kafka.manager.dao.TopicDao;
|
||||
import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.ThreadPool;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
@@ -24,28 +22,17 @@ import java.util.concurrent.*;
|
||||
* @date 20/5/14
|
||||
*/
|
||||
public class TopicStateListener implements StateChangeListener {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(TopicStateListener.class);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TopicStateListener.class);
|
||||
|
||||
private Long clusterId;
|
||||
|
||||
private ZkConfigImpl zkConfig;
|
||||
|
||||
private TopicDao topicDao;
|
||||
|
||||
private AuthorityDao authorityDao;
|
||||
|
||||
public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig) {
|
||||
this.clusterId = clusterId;
|
||||
this.zkConfig = zkConfig;
|
||||
}
|
||||
|
||||
public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig, TopicDao topicDao, AuthorityDao authorityDao) {
|
||||
this.clusterId = clusterId;
|
||||
this.zkConfig = zkConfig;
|
||||
this.topicDao = topicDao;
|
||||
this.authorityDao = authorityDao;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
try {
|
||||
@@ -53,7 +40,7 @@ public class TopicStateListener implements StateChangeListener {
|
||||
FutureTask[] taskList = new FutureTask[topicNameList.size()];
|
||||
for (int i = 0; i < topicNameList.size(); i++) {
|
||||
String topicName = topicNameList.get(i);
|
||||
taskList[i] = new FutureTask(new Callable() {
|
||||
taskList[i] = new FutureTask(new Callable<Object>() {
|
||||
@Override
|
||||
public Object call() throws Exception {
|
||||
processTopicAdded(topicName);
|
||||
@@ -65,7 +52,6 @@ public class TopicStateListener implements StateChangeListener {
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("init topics metadata failed, clusterId:{}.", clusterId, e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -92,8 +78,6 @@ public class TopicStateListener implements StateChangeListener {
|
||||
private void processTopicDelete(String topicName) {
|
||||
LOGGER.warn("delete topic, clusterId:{} topicName:{}.", clusterId, topicName);
|
||||
PhysicalClusterMetadataManager.removeTopicMetadata(clusterId, topicName);
|
||||
topicDao.removeTopicInCache(clusterId, topicName);
|
||||
authorityDao.removeAuthorityInCache(clusterId, topicName);
|
||||
}
|
||||
|
||||
private void processTopicAdded(String topicName) {
|
||||
@@ -122,4 +106,4 @@ public class TopicStateListener implements StateChangeListener {
|
||||
LOGGER.error("add topic failed, clusterId:{} topicMetadata:{}.", clusterId, topicMetadata, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user