diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/KafkaRebalanceMain.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/KafkaRebalanceMain.java index a0e9e1c9..4990f2a9 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/KafkaRebalanceMain.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/KafkaRebalanceMain.java @@ -67,8 +67,11 @@ public class KafkaRebalanceMain { Properties kafkaConfig = new Properties(); kafkaConfig.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("bootstrap-servers").toString()); balanceParameter.setKafkaConfig(kafkaConfig); - balanceParameter.setEsRestURL(options.valueOf("es-rest-url").toString()); - balanceParameter.setEsIndexPrefix(options.valueOf("es-index-prefix").toString()); + if (options.has("es-password")) { + balanceParameter.setEsInfo(options.valueOf("es-rest-url").toString(), options.valueOf("es-password").toString(), options.valueOf("es-index-prefix").toString()); + } else { + balanceParameter.setEsInfo(options.valueOf("es-rest-url").toString(), "", options.valueOf("es-index-prefix").toString()); + } balanceParameter.setBeforeSeconds((Integer) options.valueOf("before-seconds")); String envFile = options.valueOf("hardware-env-file").toString(); String envJson = FileUtils.readFileToString(new File(envFile), "UTF-8"); @@ -89,6 +92,7 @@ public class KafkaRebalanceMain { OptionParser parser = new OptionParser(); parser.accepts("bootstrap-servers", "Kafka cluster boot server").withRequiredArg().ofType(String.class); parser.accepts("es-rest-url", "The url of elasticsearch").withRequiredArg().ofType(String.class); + parser.accepts("es-password", "The password of elasticsearch").withRequiredArg().ofType(String.class); parser.accepts("es-index-prefix", "The Index Prefix of elasticsearch").withRequiredArg().ofType(String.class); parser.accepts("goals", "Balanced goals include TopicLeadersDistributionGoal,TopicReplicaDistributionGoal,DiskDistributionGoal,NetworkInboundDistributionGoal,NetworkOutboundDistributionGoal").withRequiredArg().ofType(String.class); parser.accepts("cluster", "Balanced cluster name").withRequiredArg().ofType(String.class); diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/executor/common/BalanceParameter.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/executor/common/BalanceParameter.java index 90dcacf5..e9c5f3fc 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/executor/common/BalanceParameter.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/executor/common/BalanceParameter.java @@ -10,6 +10,10 @@ public class BalanceParameter { private Properties kafkaConfig; //ES访问地址 private String esRestURL; + + //ES访问密码 + private String esPassword; + //ES存储索引前缀 private String esIndexPrefix; //均衡目标 @@ -51,8 +55,14 @@ public class BalanceParameter { return esRestURL; } - public void setEsRestURL(String esRestURL) { + public void setEsInfo(String esRestURL, String esPassword, String esIndexPrefix) { this.esRestURL = esRestURL; + this.esPassword = esPassword; + this.esIndexPrefix = esIndexPrefix; + } + + public String getEsPassword() { + return esPassword; } public List getGoals() { @@ -147,10 +157,6 @@ public class BalanceParameter { return esIndexPrefix; } - public void setEsIndexPrefix(String esIndexPrefix) { - this.esIndexPrefix = esIndexPrefix; - } - public String getOfflineBrokers() { return offlineBrokers; } @@ -181,9 +187,11 @@ public class BalanceParameter { "cluster='" + cluster + '\'' + ", kafkaConfig=" + kafkaConfig + ", esRestURL='" + esRestURL + '\'' + + ", esPassword='" + esPassword + '\'' + ", esIndexPrefix='" + esIndexPrefix + '\'' + ", goals=" + goals + ", excludedTopics='" + excludedTopics + '\'' + + ", ignoredTopics='" + ignoredTopics + '\'' + ", offlineBrokers='" + offlineBrokers + '\'' + ", balanceBrokers='" + balanceBrokers + '\'' + ", topicReplicaThreshold=" + topicReplicaThreshold + diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/metric/elasticsearch/ElasticsearchMetricStore.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/metric/elasticsearch/ElasticsearchMetricStore.java index c07bcb67..3a800997 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/metric/elasticsearch/ElasticsearchMetricStore.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/metric/elasticsearch/ElasticsearchMetricStore.java @@ -6,7 +6,10 @@ import com.xiaojukeji.know.streaming.km.rebalance.algorithm.metric.Metric; import com.xiaojukeji.know.streaming.km.rebalance.algorithm.metric.MetricStore; import com.xiaojukeji.know.streaming.km.rebalance.algorithm.metric.Metrics; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.Header; import org.apache.http.HttpHost; +import org.apache.http.message.BasicHeader; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; @@ -17,9 +20,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Set; -import java.util.TreeSet; +import java.util.*; /** * @author leewei @@ -30,15 +31,19 @@ public class ElasticsearchMetricStore implements MetricStore { private final ObjectMapper objectMapper = new ObjectMapper(); private final String hosts; + + private final String password; + private final String indexPrefix; private final String format; - public ElasticsearchMetricStore(String hosts, String indexPrefix) { - this(hosts, indexPrefix, "yyyy-MM-dd"); + public ElasticsearchMetricStore(String hosts, String password, String indexPrefix) { + this(hosts, password, indexPrefix, "yyyy-MM-dd"); } - public ElasticsearchMetricStore(String hosts, String indexPrefix, String format) { + public ElasticsearchMetricStore(String hosts, String password, String indexPrefix, String format) { this.hosts = hosts; + this.password = password; this.indexPrefix = indexPrefix; this.format = format; } @@ -50,7 +55,17 @@ public class ElasticsearchMetricStore implements MetricStore { String metricsQueryJson = IOUtils.resourceToString("/MetricsQuery.json", StandardCharsets.UTF_8); metricsQueryJson = metricsQueryJson.replaceAll("", Integer.toString(beforeSeconds)) .replaceAll("", clusterName); - try (RestClient restClient = RestClient.builder(toHttpHosts(this.hosts)).build()) { + + List
defaultHeaders = new ArrayList<>(); + if (StringUtils.isNotBlank(password)) { + String encode = Base64.getEncoder().encodeToString(String.format("%s", this.password).getBytes(StandardCharsets.UTF_8)); + Header header = new BasicHeader("Authorization", "Basic " + encode); + defaultHeaders.add(header); + } + + Header[] headers = new Header[defaultHeaders.size()]; + defaultHeaders.toArray(headers); + try (RestClient restClient = RestClient.builder(toHttpHosts(this.hosts)).setDefaultHeaders(headers).build()) { Request request = new Request( "GET", "/" + indices(beforeSeconds) + "/_search"); diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Supplier.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Supplier.java index 54bc0e76..70db965c 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Supplier.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Supplier.java @@ -25,14 +25,14 @@ public class Supplier { Map.Entry::getValue)); } - public static ClusterModel load(String clusterName, int beforeSeconds, String kafkaBootstrapServer, String esUrls, String esIndexPrefix, Map capacitiesById, Set ignoredTopics) { + public static ClusterModel load(String clusterName, int beforeSeconds, String kafkaBootstrapServer, String esUrls, String esPassword, String esIndexPrefix, Map capacitiesById, Set ignoredTopics) { Properties kafkaProperties = new Properties(); kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer); - return load(clusterName, beforeSeconds, kafkaProperties, esUrls, esIndexPrefix, capacitiesById, ignoredTopics); + return load(clusterName, beforeSeconds, kafkaProperties, esUrls, esPassword, esIndexPrefix, capacitiesById, ignoredTopics); } - public static ClusterModel load(String clusterName, int beforeSeconds, Properties kafkaProperties, String esUrls, String esIndexPrefix, Map capacitiesById, Set ignoredTopics) { - MetricStore store = new ElasticsearchMetricStore(esUrls, esIndexPrefix); + public static ClusterModel load(String clusterName, int beforeSeconds, Properties kafkaProperties, String esUrls, String esPassword, String esIndexPrefix, Map capacitiesById, Set ignoredTopics) { + MetricStore store = new ElasticsearchMetricStore(esUrls, esPassword, esIndexPrefix); Metrics metrics = store.getMetrics(clusterName, beforeSeconds); return load(kafkaProperties, capacitiesById, metrics, ignoredTopics); } diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/utils/GoalUtils.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/utils/GoalUtils.java index edf9dc00..9d517fd7 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/utils/GoalUtils.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/utils/GoalUtils.java @@ -44,8 +44,16 @@ public class GoalUtils { capacity.setCapacity(Resource.NW_OUT, env.getNetwork()); capacities.put(env.getId(), capacity); } - return Supplier.load(parameter.getCluster(), parameter.getBeforeSeconds(), parameter.getKafkaConfig(), - parameter.getEsRestURL(), parameter.getEsIndexPrefix(), capacities, AnalyzerUtils.getSplitTopics(parameter.getIgnoredTopics())); + return Supplier.load( + parameter.getCluster(), + parameter.getBeforeSeconds(), + parameter.getKafkaConfig(), + parameter.getEsRestURL(), + parameter.getEsPassword(), + parameter.getEsIndexPrefix(), + capacities, + AnalyzerUtils.getSplitTopics(parameter.getIgnoredTopics()) + ); } public static Map getBalanceThreshold(BalanceParameter parameter, double[] clusterAvgResource) { diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/common/converter/ClusterBalanceConverter.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/common/converter/ClusterBalanceConverter.java index 90788165..2783954d 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/common/converter/ClusterBalanceConverter.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/common/converter/ClusterBalanceConverter.java @@ -1,6 +1,7 @@ package com.xiaojukeji.know.streaming.km.rebalance.common.converter; import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance; +import com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant; import com.xiaojukeji.know.streaming.km.rebalance.common.bean.dto.ClusterBalanceIntervalDTO; import com.xiaojukeji.know.streaming.km.rebalance.common.bean.dto.ClusterBalancePreviewDTO; import com.xiaojukeji.know.streaming.km.rebalance.common.bean.dto.ClusterBalanceStrategyDTO; @@ -34,13 +35,16 @@ import java.util.stream.Collectors; @EnterpriseLoadReBalance public class ClusterBalanceConverter { - - public final static String PARTITION_INDEX = "ks_kafka_partition_metric"; - private ClusterBalanceConverter() { } - public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobConfigPO configPO, Map brokerMap, Map brokerSpecMap, ClusterPhy clusterPhy, String esUrl, List topicNames) { + public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobConfigPO configPO, + Map brokerMap, + Map brokerSpecMap, + ClusterPhy clusterPhy, + String esUrl, + String esPassword, + List topicNames) { BalanceParameter balanceParameter = new BalanceParameter(); List clusterBalanceIntervalDTOS = ConvertUtil.str2ObjArrayByJson(configPO.getBalanceIntervalJson(), ClusterBalanceIntervalDTO.class); @@ -63,8 +67,7 @@ public class ClusterBalanceConverter { balanceParameter.setGoals(goals); balanceParameter.setCluster(clusterPhy.getId().toString()); balanceParameter.setExcludedTopics(configPO.getTopicBlackList()); - balanceParameter.setEsIndexPrefix(PARTITION_INDEX + "_"); - balanceParameter.setEsRestURL(esUrl); + balanceParameter.setEsInfo(esUrl, esPassword, TemplateConstant.PARTITION_INDEX + "_"); balanceParameter.setBalanceBrokers(CommonUtils.intSet2String(brokerMap.keySet())); balanceParameter.setHardwareEnv(convert2ListHostEnv(brokerMap, brokerSpecMap)); balanceParameter.setBeforeSeconds(configPO.getMetricCalculationPeriod()); @@ -78,7 +81,13 @@ public class ClusterBalanceConverter { } - public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobPO clusterBalanceJobPO, Map brokerMap, Map brokerSpecMap, ClusterPhy clusterPhy, String esUrl, List topicNames) { + public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobPO clusterBalanceJobPO, + Map brokerMap, + Map brokerSpecMap, + ClusterPhy clusterPhy, + String esUrl, + String esPassword, + List topicNames) { BalanceParameter balanceParameter = new BalanceParameter(); List clusterBalanceIntervalDTOS = ConvertUtil.str2ObjArrayByJson(clusterBalanceJobPO.getBalanceIntervalJson(), ClusterBalanceIntervalDTO.class); @@ -101,8 +110,7 @@ public class ClusterBalanceConverter { balanceParameter.setGoals(goals); balanceParameter.setCluster(clusterPhy.getId().toString()); balanceParameter.setExcludedTopics(clusterBalanceJobPO.getTopicBlackList()); - balanceParameter.setEsIndexPrefix(PARTITION_INDEX + "_"); - balanceParameter.setEsRestURL(esUrl); + balanceParameter.setEsInfo(esUrl, esPassword, TemplateConstant.PARTITION_INDEX + "_"); balanceParameter.setBalanceBrokers(clusterBalanceJobPO.getBrokers()); balanceParameter.setHardwareEnv(convert2ListHostEnv(brokerMap, brokerSpecMap)); balanceParameter.setBeforeSeconds(clusterBalanceJobPO.getMetricCalculationPeriod()); @@ -116,7 +124,13 @@ public class ClusterBalanceConverter { } - public static BalanceParameter convert2BalanceParameter(JobClusterBalanceContent dto, List brokers, Map brokerSpecMap, ClusterPhy clusterPhy, String esUrl, List topicNames) { + public static BalanceParameter convert2BalanceParameter(JobClusterBalanceContent dto, + List brokers, + Map brokerSpecMap, + ClusterPhy clusterPhy, + String esUrl, + String esPassword, + List topicNames) { BalanceParameter balanceParameter = new BalanceParameter(); List clusterBalanceIntervalDTOS = dto.getClusterBalanceIntervalList().stream() .sorted(Comparator.comparing(ClusterBalanceIntervalDTO::getPriority)).collect(Collectors.toList()); @@ -141,8 +155,7 @@ public class ClusterBalanceConverter { balanceParameter.setGoals(goals); balanceParameter.setCluster(clusterPhy.getId().toString()); balanceParameter.setExcludedTopics(CommonUtils.strList2String(dto.getTopicBlackList())); - balanceParameter.setEsIndexPrefix(PARTITION_INDEX + "_"); - balanceParameter.setEsRestURL(esUrl); + balanceParameter.setEsInfo(esUrl, esPassword, TemplateConstant.PARTITION_INDEX + "_"); balanceParameter.setBalanceBrokers(CommonUtils.intSet2String(brokerMap.keySet())); balanceParameter.setHardwareEnv(convert2ListHostEnv(brokerMap, brokerSpecMap)); balanceParameter.setBeforeSeconds(dto.getMetricCalculationPeriod()); @@ -156,7 +169,13 @@ public class ClusterBalanceConverter { } - public static BalanceParameter convert2BalanceParameter(ClusterBalancePreviewDTO dto, Map brokerMap, Map brokerSpecMap, ClusterPhy clusterPhy, String esUrl, List topicNames) { + public static BalanceParameter convert2BalanceParameter(ClusterBalancePreviewDTO dto, + Map brokerMap, + Map brokerSpecMap, + ClusterPhy clusterPhy, + String esUrl, + String esPassword, + List topicNames) { BalanceParameter balanceParameter = new BalanceParameter(); List clusterBalanceIntervalDTOS = dto.getClusterBalanceIntervalList().stream() .sorted(Comparator.comparing(ClusterBalanceIntervalDTO::getPriority)).collect(Collectors.toList()); @@ -179,8 +198,7 @@ public class ClusterBalanceConverter { balanceParameter.setGoals(goals); balanceParameter.setCluster(clusterPhy.getId().toString()); balanceParameter.setExcludedTopics(CommonUtils.strList2String(dto.getTopicBlackList())); - balanceParameter.setEsIndexPrefix(PARTITION_INDEX + "_"); - balanceParameter.setEsRestURL(esUrl); + balanceParameter.setEsInfo(esUrl, esPassword, TemplateConstant.PARTITION_INDEX + "_"); balanceParameter.setBalanceBrokers(CommonUtils.intList2String(dto.getBrokers())); balanceParameter.setHardwareEnv(convert2ListHostEnv(brokerMap, brokerSpecMap)); balanceParameter.setBeforeSeconds(dto.getMetricCalculationPeriod()); diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/core/job/ClusterBalanceJobHandler.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/core/job/ClusterBalanceJobHandler.java index 4c0fac11..5892f0f1 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/core/job/ClusterBalanceJobHandler.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/core/job/ClusterBalanceJobHandler.java @@ -63,6 +63,9 @@ public class ClusterBalanceJobHandler implements JobHandler { @Value("${es.client.address:}") private String esAddress; + @Value("${es.client.pass:}") + private String esPassword; + @Autowired private ClusterBalanceJobService clusterBalanceJobService; @@ -116,7 +119,7 @@ public class ClusterBalanceJobHandler implements JobHandler { //获取任务计划 List topicNames = topicService.listRecentUpdateTopicNamesFromDB(dto.getClusterId(), configUtils.getClusterBalanceIgnoredTopicsTimeSecond()); - BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, topicNames); + BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames); try { ExecutionRebalance executionRebalance = new ExecutionRebalance(); OptimizerResult optimizerResult = executionRebalance.optimizations(balanceParameter); @@ -202,7 +205,7 @@ public class ClusterBalanceJobHandler implements JobHandler { List topicNames = topicService.listRecentUpdateTopicNamesFromDB(job.getClusterId(), configUtils.getClusterBalanceIgnoredTopicsTimeSecond()); JobClusterBalanceContent dto = ConvertUtil.str2ObjByJson(job.getJobData(), JobClusterBalanceContent.class); - BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, topicNames); + BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames); ExecutionRebalance executionRebalance = new ExecutionRebalance(); try { OptimizerResult optimizerResult = executionRebalance.optimizations(balanceParameter); diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/core/service/impl/ClusterBalanceJobServiceImpl.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/core/service/impl/ClusterBalanceJobServiceImpl.java index e8c77c33..75e22075 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/core/service/impl/ClusterBalanceJobServiceImpl.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/core/service/impl/ClusterBalanceJobServiceImpl.java @@ -68,6 +68,9 @@ public class ClusterBalanceJobServiceImpl implements ClusterBalanceJobService { @Value("${es.client.address}") private String esAddress; + @Value("${es.client.pass:}") + private String esPassword; + @Autowired private ClusterBalanceJobDao clusterBalanceJobDao; @@ -303,7 +306,7 @@ public class ClusterBalanceJobServiceImpl implements ClusterBalanceJobService { //更新平衡任务状态信息 List topicNames = topicService.listRecentUpdateTopicNamesFromDB(clusterPhy.getId(), configUtils.getClusterBalanceIgnoredTopicsTimeSecond()); Map brokerBalanceStateMap = ExecutionRebalance - .getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(clusterBalanceJobPO, brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames)); + .getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(clusterBalanceJobPO, brokerMap, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames)); List oldDetails = ConvertUtil.str2ObjArrayByJson(clusterBalanceJobPO.getBrokerBalanceDetail(), ClusterBalancePlanDetail.class); List newDetails = ClusterBalanceConverter.convert2ClusterBalancePlanDetail(oldDetails, brokerBalanceStateMap); clusterBalanceJobPO.setBrokerBalanceDetail(ConvertUtil.obj2Json(newDetails)); diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/core/service/impl/ClusterBalanceServiceImpl.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/core/service/impl/ClusterBalanceServiceImpl.java index 6bc98e34..d5ccf398 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/core/service/impl/ClusterBalanceServiceImpl.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/core/service/impl/ClusterBalanceServiceImpl.java @@ -68,6 +68,9 @@ public class ClusterBalanceServiceImpl implements ClusterBalanceService { @Value("${es.client.address}") private String esAddress; + @Value("${es.client.pass:}") + private String esPassword; + @Autowired private JobService jobService; @@ -137,9 +140,9 @@ public class ClusterBalanceServiceImpl implements ClusterBalanceService { Map resourceDoubleMap; Map brokerBalanceStateMap; try { - resourceDoubleMap = ExecutionRebalance.getClusterAvgResourcesState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames)); + resourceDoubleMap = ExecutionRebalance.getClusterAvgResourcesState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames)); brokerBalanceStateMap = ExecutionRebalance - .getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames)); + .getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames)); }catch (Exception e){ logger.error("method=state||clusterPhyId={}||errMsg=exception", clusterPhyId, e); return Result.buildFailure(e.getMessage()); @@ -189,7 +192,7 @@ public class ClusterBalanceServiceImpl implements ClusterBalanceService { try { List topicNames = topicService.listRecentUpdateTopicNamesFromDB(clusterPhyId, configUtils.getClusterBalanceIgnoredTopicsTimeSecond()); brokerBalanceStateMap = ExecutionRebalance - .getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames)); + .getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames)); } catch (Exception e) { logger.error("method=overview||clusterBalanceOverviewDTO={}||errMsg=exception", dto, e); return PaginationResult.buildFailure(e.getMessage(), dto); @@ -280,6 +283,7 @@ public class ClusterBalanceServiceImpl implements ClusterBalanceService { brokerSpecMap, clusterPhy, esAddress, + esPassword, recentTopicNameList ) ); @@ -379,7 +383,7 @@ public class ClusterBalanceServiceImpl implements ClusterBalanceService { //获取任务计划 Map brokerMap = allBrokers.stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity())); List topicNames = topicService.listRecentUpdateTopicNamesFromDB(clusterPhyId, configUtils.getClusterBalanceIgnoredTopicsTimeSecond()); - BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(clusterBalancePreviewDTO, brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames); + BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(clusterBalancePreviewDTO, brokerMap, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames); ExecutionRebalance executionRebalance = new ExecutionRebalance(); try { OptimizerResult optimizerResult = executionRebalance.optimizations(balanceParameter);