mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 20:22:12 +08:00
文档更新 & 问题修复
This commit is contained in:
@@ -55,6 +55,13 @@ public interface BrokerService {
|
||||
*/
|
||||
String getBrokerVersionFromKafka(Long clusterPhyId, Integer brokerId);
|
||||
|
||||
/**
|
||||
* 优先从本地缓存中获取Broker的版本信息
|
||||
* @param
|
||||
* @return
|
||||
*/
|
||||
String getBrokerVersionFromKafkaWithCacheFirst(Long clusterPhyId, Integer brokerId,Long startTime);
|
||||
|
||||
/**
|
||||
* 获取总的Broker数
|
||||
*/
|
||||
|
||||
@@ -59,7 +59,7 @@ import static com.xiaojukeji.know.streaming.km.common.jmx.JmxName.JMX_SERVER_APP
|
||||
public class BrokerServiceImpl extends BaseVersionControlService implements BrokerService {
|
||||
private static final ILog log = LogFactory.getLog(BrokerServiceImpl.class);
|
||||
|
||||
private static final String BROKER_LOG_DIR = "getLogDir";
|
||||
private static final String BROKER_LOG_DIR = "getLogDir";
|
||||
|
||||
@Autowired
|
||||
private TopicService topicService;
|
||||
@@ -84,6 +84,12 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
|
||||
return SERVICE_SEARCH_BROKER;
|
||||
}
|
||||
|
||||
private static final Cache<String, String> brokerVersionCache = Caffeine.newBuilder()
|
||||
.expireAfterWrite(1, TimeUnit.DAYS)
|
||||
.maximumSize(5000)
|
||||
.build();
|
||||
|
||||
|
||||
private final Cache<Long, List<Broker>> brokersCache = Caffeine.newBuilder()
|
||||
.expireAfterWrite(90, TimeUnit.SECONDS)
|
||||
.maximumSize(200)
|
||||
@@ -225,6 +231,22 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getBrokerVersionFromKafkaWithCacheFirst(Long clusterPhyId, Integer brokerId,Long startTime) {
|
||||
//id唯一确定一个broker
|
||||
String id = String.valueOf(clusterPhyId) + String.valueOf(brokerId)+String.valueOf(startTime);
|
||||
//先尝试读缓存
|
||||
String brokerVersion = brokerVersionCache.getIfPresent(id);
|
||||
if (brokerVersion != null) {
|
||||
return brokerVersion;
|
||||
}
|
||||
String version = getBrokerVersionFromKafka(clusterPhyId, brokerId);
|
||||
brokerVersionCache.put(id,version);
|
||||
return version;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public Integer countAllBrokers() {
|
||||
LambdaQueryWrapper<BrokerPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
|
||||
@@ -9,6 +9,7 @@ import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.ParamErrorException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
@@ -67,4 +68,10 @@ public interface ClusterPhyService {
|
||||
DuplicateException,
|
||||
NotExistException,
|
||||
AdminOperateException;
|
||||
|
||||
/**
|
||||
* 获取系统已存在的kafka版本列表
|
||||
* @return
|
||||
*/
|
||||
Set<String> getClusterVersionSet();
|
||||
}
|
||||
|
||||
@@ -25,6 +25,8 @@ import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
@@ -201,4 +203,11 @@ public class ClusterPhyServiceImpl implements ClusterPhyService {
|
||||
throw new AdminOperateException("modify cluster failed", e, ResultStatus.MYSQL_OPERATE_FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getClusterVersionSet() {
|
||||
List<ClusterPhy> clusterPhyList = listAllClusters();
|
||||
Set<String> versionSet = clusterPhyList.stream().map(elem -> elem.getKafkaVersion()).collect(Collectors.toSet());
|
||||
return versionSet;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user