[Bugfix]修复Balance功能,ES密码未生效的问题(#992)

This commit is contained in:
ZQKC
2023-04-02 20:30:19 +08:00
parent 77b87f1dbe
commit d3cc0cb687
9 changed files with 105 additions and 42 deletions

View File

@@ -67,8 +67,11 @@ public class KafkaRebalanceMain {
Properties kafkaConfig = new Properties();
kafkaConfig.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("bootstrap-servers").toString());
balanceParameter.setKafkaConfig(kafkaConfig);
balanceParameter.setEsRestURL(options.valueOf("es-rest-url").toString());
balanceParameter.setEsIndexPrefix(options.valueOf("es-index-prefix").toString());
if (options.has("es-password")) {
balanceParameter.setEsInfo(options.valueOf("es-rest-url").toString(), options.valueOf("es-password").toString(), options.valueOf("es-index-prefix").toString());
} else {
balanceParameter.setEsInfo(options.valueOf("es-rest-url").toString(), "", options.valueOf("es-index-prefix").toString());
}
balanceParameter.setBeforeSeconds((Integer) options.valueOf("before-seconds"));
String envFile = options.valueOf("hardware-env-file").toString();
String envJson = FileUtils.readFileToString(new File(envFile), "UTF-8");
@@ -89,6 +92,7 @@ public class KafkaRebalanceMain {
OptionParser parser = new OptionParser();
parser.accepts("bootstrap-servers", "Kafka cluster boot server").withRequiredArg().ofType(String.class);
parser.accepts("es-rest-url", "The url of elasticsearch").withRequiredArg().ofType(String.class);
parser.accepts("es-password", "The password of elasticsearch").withRequiredArg().ofType(String.class);
parser.accepts("es-index-prefix", "The Index Prefix of elasticsearch").withRequiredArg().ofType(String.class);
parser.accepts("goals", "Balanced goals include TopicLeadersDistributionGoal,TopicReplicaDistributionGoal,DiskDistributionGoal,NetworkInboundDistributionGoal,NetworkOutboundDistributionGoal").withRequiredArg().ofType(String.class);
parser.accepts("cluster", "Balanced cluster name").withRequiredArg().ofType(String.class);

View File

@@ -10,6 +10,10 @@ public class BalanceParameter {
private Properties kafkaConfig;
//ES访问地址
private String esRestURL;
//ES访问密码
private String esPassword;
//ES存储索引前缀
private String esIndexPrefix;
//均衡目标
@@ -51,8 +55,14 @@ public class BalanceParameter {
return esRestURL;
}
public void setEsRestURL(String esRestURL) {
public void setEsInfo(String esRestURL, String esPassword, String esIndexPrefix) {
this.esRestURL = esRestURL;
this.esPassword = esPassword;
this.esIndexPrefix = esIndexPrefix;
}
public String getEsPassword() {
return esPassword;
}
public List<String> getGoals() {
@@ -147,10 +157,6 @@ public class BalanceParameter {
return esIndexPrefix;
}
public void setEsIndexPrefix(String esIndexPrefix) {
this.esIndexPrefix = esIndexPrefix;
}
public String getOfflineBrokers() {
return offlineBrokers;
}
@@ -181,9 +187,11 @@ public class BalanceParameter {
"cluster='" + cluster + '\'' +
", kafkaConfig=" + kafkaConfig +
", esRestURL='" + esRestURL + '\'' +
", esPassword='" + esPassword + '\'' +
", esIndexPrefix='" + esIndexPrefix + '\'' +
", goals=" + goals +
", excludedTopics='" + excludedTopics + '\'' +
", ignoredTopics='" + ignoredTopics + '\'' +
", offlineBrokers='" + offlineBrokers + '\'' +
", balanceBrokers='" + balanceBrokers + '\'' +
", topicReplicaThreshold=" + topicReplicaThreshold +

View File

@@ -6,7 +6,10 @@ import com.xiaojukeji.know.streaming.km.rebalance.algorithm.metric.Metric;
import com.xiaojukeji.know.streaming.km.rebalance.algorithm.metric.MetricStore;
import com.xiaojukeji.know.streaming.km.rebalance.algorithm.metric.Metrics;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
@@ -17,9 +20,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Set;
import java.util.TreeSet;
import java.util.*;
/**
* @author leewei
@@ -30,15 +31,19 @@ public class ElasticsearchMetricStore implements MetricStore {
private final ObjectMapper objectMapper = new ObjectMapper();
private final String hosts;
private final String password;
private final String indexPrefix;
private final String format;
public ElasticsearchMetricStore(String hosts, String indexPrefix) {
this(hosts, indexPrefix, "yyyy-MM-dd");
public ElasticsearchMetricStore(String hosts, String password, String indexPrefix) {
this(hosts, password, indexPrefix, "yyyy-MM-dd");
}
public ElasticsearchMetricStore(String hosts, String indexPrefix, String format) {
public ElasticsearchMetricStore(String hosts, String password, String indexPrefix, String format) {
this.hosts = hosts;
this.password = password;
this.indexPrefix = indexPrefix;
this.format = format;
}
@@ -50,7 +55,17 @@ public class ElasticsearchMetricStore implements MetricStore {
String metricsQueryJson = IOUtils.resourceToString("/MetricsQuery.json", StandardCharsets.UTF_8);
metricsQueryJson = metricsQueryJson.replaceAll("<var_before_time>", Integer.toString(beforeSeconds))
.replaceAll("<var_cluster_name>", clusterName);
try (RestClient restClient = RestClient.builder(toHttpHosts(this.hosts)).build()) {
List<Header> defaultHeaders = new ArrayList<>();
if (StringUtils.isNotBlank(password)) {
String encode = Base64.getEncoder().encodeToString(String.format("%s", this.password).getBytes(StandardCharsets.UTF_8));
Header header = new BasicHeader("Authorization", "Basic " + encode);
defaultHeaders.add(header);
}
Header[] headers = new Header[defaultHeaders.size()];
defaultHeaders.toArray(headers);
try (RestClient restClient = RestClient.builder(toHttpHosts(this.hosts)).setDefaultHeaders(headers).build()) {
Request request = new Request(
"GET",
"/" + indices(beforeSeconds) + "/_search");

View File

@@ -25,14 +25,14 @@ public class Supplier {
Map.Entry::getValue));
}
public static ClusterModel load(String clusterName, int beforeSeconds, String kafkaBootstrapServer, String esUrls, String esIndexPrefix, Map<Integer, Capacity> capacitiesById, Set<String> ignoredTopics) {
public static ClusterModel load(String clusterName, int beforeSeconds, String kafkaBootstrapServer, String esUrls, String esPassword, String esIndexPrefix, Map<Integer, Capacity> capacitiesById, Set<String> ignoredTopics) {
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
return load(clusterName, beforeSeconds, kafkaProperties, esUrls, esIndexPrefix, capacitiesById, ignoredTopics);
return load(clusterName, beforeSeconds, kafkaProperties, esUrls, esPassword, esIndexPrefix, capacitiesById, ignoredTopics);
}
public static ClusterModel load(String clusterName, int beforeSeconds, Properties kafkaProperties, String esUrls, String esIndexPrefix, Map<Integer, Capacity> capacitiesById, Set<String> ignoredTopics) {
MetricStore store = new ElasticsearchMetricStore(esUrls, esIndexPrefix);
public static ClusterModel load(String clusterName, int beforeSeconds, Properties kafkaProperties, String esUrls, String esPassword, String esIndexPrefix, Map<Integer, Capacity> capacitiesById, Set<String> ignoredTopics) {
MetricStore store = new ElasticsearchMetricStore(esUrls, esPassword, esIndexPrefix);
Metrics metrics = store.getMetrics(clusterName, beforeSeconds);
return load(kafkaProperties, capacitiesById, metrics, ignoredTopics);
}

View File

@@ -44,8 +44,16 @@ public class GoalUtils {
capacity.setCapacity(Resource.NW_OUT, env.getNetwork());
capacities.put(env.getId(), capacity);
}
return Supplier.load(parameter.getCluster(), parameter.getBeforeSeconds(), parameter.getKafkaConfig(),
parameter.getEsRestURL(), parameter.getEsIndexPrefix(), capacities, AnalyzerUtils.getSplitTopics(parameter.getIgnoredTopics()));
return Supplier.load(
parameter.getCluster(),
parameter.getBeforeSeconds(),
parameter.getKafkaConfig(),
parameter.getEsRestURL(),
parameter.getEsPassword(),
parameter.getEsIndexPrefix(),
capacities,
AnalyzerUtils.getSplitTopics(parameter.getIgnoredTopics())
);
}
public static Map<String, BalanceThreshold> getBalanceThreshold(BalanceParameter parameter, double[] clusterAvgResource) {

View File

@@ -1,6 +1,7 @@
package com.xiaojukeji.know.streaming.km.rebalance.common.converter;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant;
import com.xiaojukeji.know.streaming.km.rebalance.common.bean.dto.ClusterBalanceIntervalDTO;
import com.xiaojukeji.know.streaming.km.rebalance.common.bean.dto.ClusterBalancePreviewDTO;
import com.xiaojukeji.know.streaming.km.rebalance.common.bean.dto.ClusterBalanceStrategyDTO;
@@ -34,13 +35,16 @@ import java.util.stream.Collectors;
@EnterpriseLoadReBalance
public class ClusterBalanceConverter {
public final static String PARTITION_INDEX = "ks_kafka_partition_metric";
private ClusterBalanceConverter() {
}
public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobConfigPO configPO, Map<Integer, Broker> brokerMap, Map<Integer, BrokerSpec> brokerSpecMap, ClusterPhy clusterPhy, String esUrl, List<String> topicNames) {
public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobConfigPO configPO,
Map<Integer, Broker> brokerMap,
Map<Integer, BrokerSpec> brokerSpecMap,
ClusterPhy clusterPhy,
String esUrl,
String esPassword,
List<String> topicNames) {
BalanceParameter balanceParameter = new BalanceParameter();
List<ClusterBalanceIntervalDTO> clusterBalanceIntervalDTOS = ConvertUtil.str2ObjArrayByJson(configPO.getBalanceIntervalJson(), ClusterBalanceIntervalDTO.class);
@@ -63,8 +67,7 @@ public class ClusterBalanceConverter {
balanceParameter.setGoals(goals);
balanceParameter.setCluster(clusterPhy.getId().toString());
balanceParameter.setExcludedTopics(configPO.getTopicBlackList());
balanceParameter.setEsIndexPrefix(PARTITION_INDEX + "_");
balanceParameter.setEsRestURL(esUrl);
balanceParameter.setEsInfo(esUrl, esPassword, TemplateConstant.PARTITION_INDEX + "_");
balanceParameter.setBalanceBrokers(CommonUtils.intSet2String(brokerMap.keySet()));
balanceParameter.setHardwareEnv(convert2ListHostEnv(brokerMap, brokerSpecMap));
balanceParameter.setBeforeSeconds(configPO.getMetricCalculationPeriod());
@@ -78,7 +81,13 @@ public class ClusterBalanceConverter {
}
public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobPO clusterBalanceJobPO, Map<Integer, Broker> brokerMap, Map<Integer, BrokerSpec> brokerSpecMap, ClusterPhy clusterPhy, String esUrl, List<String> topicNames) {
public static BalanceParameter convert2BalanceParameter(ClusterBalanceJobPO clusterBalanceJobPO,
Map<Integer, Broker> brokerMap,
Map<Integer, BrokerSpec> brokerSpecMap,
ClusterPhy clusterPhy,
String esUrl,
String esPassword,
List<String> topicNames) {
BalanceParameter balanceParameter = new BalanceParameter();
List<ClusterBalanceIntervalDTO> clusterBalanceIntervalDTOS = ConvertUtil.str2ObjArrayByJson(clusterBalanceJobPO.getBalanceIntervalJson(), ClusterBalanceIntervalDTO.class);
@@ -101,8 +110,7 @@ public class ClusterBalanceConverter {
balanceParameter.setGoals(goals);
balanceParameter.setCluster(clusterPhy.getId().toString());
balanceParameter.setExcludedTopics(clusterBalanceJobPO.getTopicBlackList());
balanceParameter.setEsIndexPrefix(PARTITION_INDEX + "_");
balanceParameter.setEsRestURL(esUrl);
balanceParameter.setEsInfo(esUrl, esPassword, TemplateConstant.PARTITION_INDEX + "_");
balanceParameter.setBalanceBrokers(clusterBalanceJobPO.getBrokers());
balanceParameter.setHardwareEnv(convert2ListHostEnv(brokerMap, brokerSpecMap));
balanceParameter.setBeforeSeconds(clusterBalanceJobPO.getMetricCalculationPeriod());
@@ -116,7 +124,13 @@ public class ClusterBalanceConverter {
}
public static BalanceParameter convert2BalanceParameter(JobClusterBalanceContent dto, List<Broker> brokers, Map<Integer, BrokerSpec> brokerSpecMap, ClusterPhy clusterPhy, String esUrl, List<String> topicNames) {
public static BalanceParameter convert2BalanceParameter(JobClusterBalanceContent dto,
List<Broker> brokers,
Map<Integer, BrokerSpec> brokerSpecMap,
ClusterPhy clusterPhy,
String esUrl,
String esPassword,
List<String> topicNames) {
BalanceParameter balanceParameter = new BalanceParameter();
List<ClusterBalanceIntervalDTO> clusterBalanceIntervalDTOS = dto.getClusterBalanceIntervalList().stream()
.sorted(Comparator.comparing(ClusterBalanceIntervalDTO::getPriority)).collect(Collectors.toList());
@@ -141,8 +155,7 @@ public class ClusterBalanceConverter {
balanceParameter.setGoals(goals);
balanceParameter.setCluster(clusterPhy.getId().toString());
balanceParameter.setExcludedTopics(CommonUtils.strList2String(dto.getTopicBlackList()));
balanceParameter.setEsIndexPrefix(PARTITION_INDEX + "_");
balanceParameter.setEsRestURL(esUrl);
balanceParameter.setEsInfo(esUrl, esPassword, TemplateConstant.PARTITION_INDEX + "_");
balanceParameter.setBalanceBrokers(CommonUtils.intSet2String(brokerMap.keySet()));
balanceParameter.setHardwareEnv(convert2ListHostEnv(brokerMap, brokerSpecMap));
balanceParameter.setBeforeSeconds(dto.getMetricCalculationPeriod());
@@ -156,7 +169,13 @@ public class ClusterBalanceConverter {
}
public static BalanceParameter convert2BalanceParameter(ClusterBalancePreviewDTO dto, Map<Integer, Broker> brokerMap, Map<Integer, BrokerSpec> brokerSpecMap, ClusterPhy clusterPhy, String esUrl, List<String> topicNames) {
public static BalanceParameter convert2BalanceParameter(ClusterBalancePreviewDTO dto,
Map<Integer, Broker> brokerMap,
Map<Integer, BrokerSpec> brokerSpecMap,
ClusterPhy clusterPhy,
String esUrl,
String esPassword,
List<String> topicNames) {
BalanceParameter balanceParameter = new BalanceParameter();
List<ClusterBalanceIntervalDTO> clusterBalanceIntervalDTOS = dto.getClusterBalanceIntervalList().stream()
.sorted(Comparator.comparing(ClusterBalanceIntervalDTO::getPriority)).collect(Collectors.toList());
@@ -179,8 +198,7 @@ public class ClusterBalanceConverter {
balanceParameter.setGoals(goals);
balanceParameter.setCluster(clusterPhy.getId().toString());
balanceParameter.setExcludedTopics(CommonUtils.strList2String(dto.getTopicBlackList()));
balanceParameter.setEsIndexPrefix(PARTITION_INDEX + "_");
balanceParameter.setEsRestURL(esUrl);
balanceParameter.setEsInfo(esUrl, esPassword, TemplateConstant.PARTITION_INDEX + "_");
balanceParameter.setBalanceBrokers(CommonUtils.intList2String(dto.getBrokers()));
balanceParameter.setHardwareEnv(convert2ListHostEnv(brokerMap, brokerSpecMap));
balanceParameter.setBeforeSeconds(dto.getMetricCalculationPeriod());

View File

@@ -63,6 +63,9 @@ public class ClusterBalanceJobHandler implements JobHandler {
@Value("${es.client.address:}")
private String esAddress;
@Value("${es.client.pass:}")
private String esPassword;
@Autowired
private ClusterBalanceJobService clusterBalanceJobService;
@@ -116,7 +119,7 @@ public class ClusterBalanceJobHandler implements JobHandler {
//获取任务计划
List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(dto.getClusterId(), configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, topicNames);
BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames);
try {
ExecutionRebalance executionRebalance = new ExecutionRebalance();
OptimizerResult optimizerResult = executionRebalance.optimizations(balanceParameter);
@@ -202,7 +205,7 @@ public class ClusterBalanceJobHandler implements JobHandler {
List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(job.getClusterId(), configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
JobClusterBalanceContent dto = ConvertUtil.str2ObjByJson(job.getJobData(), JobClusterBalanceContent.class);
BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, topicNames);
BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames);
ExecutionRebalance executionRebalance = new ExecutionRebalance();
try {
OptimizerResult optimizerResult = executionRebalance.optimizations(balanceParameter);

View File

@@ -68,6 +68,9 @@ public class ClusterBalanceJobServiceImpl implements ClusterBalanceJobService {
@Value("${es.client.address}")
private String esAddress;
@Value("${es.client.pass:}")
private String esPassword;
@Autowired
private ClusterBalanceJobDao clusterBalanceJobDao;
@@ -303,7 +306,7 @@ public class ClusterBalanceJobServiceImpl implements ClusterBalanceJobService {
//更新平衡任务状态信息
List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(clusterPhy.getId(), configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
Map<Integer, BrokerBalanceState> brokerBalanceStateMap = ExecutionRebalance
.getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(clusterBalanceJobPO, brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames));
.getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(clusterBalanceJobPO, brokerMap, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames));
List<ClusterBalancePlanDetail> oldDetails = ConvertUtil.str2ObjArrayByJson(clusterBalanceJobPO.getBrokerBalanceDetail(), ClusterBalancePlanDetail.class);
List<ClusterBalancePlanDetail> newDetails = ClusterBalanceConverter.convert2ClusterBalancePlanDetail(oldDetails, brokerBalanceStateMap);
clusterBalanceJobPO.setBrokerBalanceDetail(ConvertUtil.obj2Json(newDetails));

View File

@@ -68,6 +68,9 @@ public class ClusterBalanceServiceImpl implements ClusterBalanceService {
@Value("${es.client.address}")
private String esAddress;
@Value("${es.client.pass:}")
private String esPassword;
@Autowired
private JobService jobService;
@@ -137,9 +140,9 @@ public class ClusterBalanceServiceImpl implements ClusterBalanceService {
Map<Resource, Double> resourceDoubleMap;
Map<Integer, BrokerBalanceState> brokerBalanceStateMap;
try {
resourceDoubleMap = ExecutionRebalance.getClusterAvgResourcesState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames));
resourceDoubleMap = ExecutionRebalance.getClusterAvgResourcesState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames));
brokerBalanceStateMap = ExecutionRebalance
.getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames));
.getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames));
}catch (Exception e){
logger.error("method=state||clusterPhyId={}||errMsg=exception", clusterPhyId, e);
return Result.buildFailure(e.getMessage());
@@ -189,7 +192,7 @@ public class ClusterBalanceServiceImpl implements ClusterBalanceService {
try {
List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(clusterPhyId, configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
brokerBalanceStateMap = ExecutionRebalance
.getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames));
.getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames));
} catch (Exception e) {
logger.error("method=overview||clusterBalanceOverviewDTO={}||errMsg=exception", dto, e);
return PaginationResult.buildFailure(e.getMessage(), dto);
@@ -280,6 +283,7 @@ public class ClusterBalanceServiceImpl implements ClusterBalanceService {
brokerSpecMap,
clusterPhy,
esAddress,
esPassword,
recentTopicNameList
)
);
@@ -379,7 +383,7 @@ public class ClusterBalanceServiceImpl implements ClusterBalanceService {
//获取任务计划
Map<Integer, Broker> brokerMap = allBrokers.stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity()));
List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(clusterPhyId, configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(clusterBalancePreviewDTO, brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames);
BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(clusterBalancePreviewDTO, brokerMap, brokerSpecMap, clusterPhy, esAddress, esPassword, topicNames);
ExecutionRebalance executionRebalance = new ExecutionRebalance();
try {
OptimizerResult optimizerResult = executionRebalance.optimizations(balanceParameter);