From 5ef8fff5bced878598a4b05cd252d53d35ad0061 Mon Sep 17 00:00:00 2001 From: eilenexuzhe Date: Mon, 27 Dec 2021 17:12:02 +0800 Subject: [PATCH 01/16] feat: update echarts v4 to v5 --- kafka-manager-console/package.json | 13 +++--- .../src/component/chart/bar-chart.tsx | 29 +++++++++--- .../src/component/chart/date-picker-chart.tsx | 43 ++++++++++++----- .../src/component/chart/doughnut-chart.tsx | 28 ++++++++--- .../src/component/chart/line-chart.tsx | 46 +++++++++++++------ .../src/container/admin/data-curve/config.ts | 3 +- .../src/container/admin/data-curve/parser.ts | 10 ++-- .../alarm/alarm-detail/history-detail.tsx | 6 +-- .../src/container/common-curve/config.ts | 11 ++--- .../src/container/common-curve/index.tsx | 10 ++-- kafka-manager-console/src/store/curve-info.ts | 8 ++-- 11 files changed, 137 insertions(+), 70 deletions(-) diff --git a/kafka-manager-console/package.json b/kafka-manager-console/package.json index d768bc73..c3c94acc 100644 --- a/kafka-manager-console/package.json +++ b/kafka-manager-console/package.json @@ -3,7 +3,8 @@ "version": "2.5.0", "description": "", "scripts": { - "start": "webpack-dev-server", + "prestart": "npm install --save-dev webpack-dev-server", + "start": "webpack serve", "daily-build": "cross-env NODE_ENV=production webpack", "pre-build": "cross-env NODE_ENV=production webpack", "prod-build": "cross-env NODE_ENV=production webpack", @@ -13,18 +14,19 @@ "license": "ISC", "devDependencies": { "@hot-loader/react-dom": "^16.8.6", - "@types/echarts": "^4.4.1", + "@types/events": "^3.0.0", "@types/lodash.debounce": "^4.0.6", "@types/react": "^16.8.8", "@types/react-dom": "^16.8.2", "@types/react-router-dom": "^4.3.1", "@types/spark-md5": "^3.0.2", + "@webpack-cli/serve": "^1.6.0", "antd": "^3.26.15", "clean-webpack-plugin": "^3.0.0", "clipboard": "^2.0.8", "cross-env": "^7.0.2", "css-loader": "^2.1.0", - "echarts": "^4.5.0", + "echarts": "^5.2.1", "file-loader": "^5.0.2", "html-webpack-plugin": "^3.2.0", "increase-memory-limit": "^1.0.7", @@ -50,11 +52,10 @@ "typescript": "^3.3.3333", "url-loader": "^4.1.1", "webpack": "^4.29.6", - "webpack-cli": "^3.2.3", - "webpack-dev-server": "^3.2.1", + "webpack-cli": "^4.9.1", "xlsx": "^0.16.1" }, "dependencies": { "format-to-json": "^1.0.4" } -} \ No newline at end of file +} diff --git a/kafka-manager-console/src/component/chart/bar-chart.tsx b/kafka-manager-console/src/component/chart/bar-chart.tsx index c2f67099..d31fcfd7 100644 --- a/kafka-manager-console/src/component/chart/bar-chart.tsx +++ b/kafka-manager-console/src/component/chart/bar-chart.tsx @@ -1,14 +1,29 @@ import * as React from 'react'; import { Spin, notification } from 'component/antd'; -import echarts, { EChartOption } from 'echarts/lib/echarts'; +import * as echarts from 'echarts/core'; // 引入柱状图 -import 'echarts/lib/chart/bar'; +import { BarChart } from 'echarts/charts'; // 引入提示框和标题组件 -import 'echarts/lib/component/tooltip'; -import 'echarts/lib/component/title'; -import 'echarts/lib/component/legend'; +import { + TitleComponent, + TooltipComponent, + LegendComponent, + GridComponent, +} from 'echarts/components'; +import { CanvasRenderer } from 'echarts/renderers'; +import { EChartsOption } from 'echarts'; + +// 注册必须的组件 +echarts.use([ + TitleComponent, + LegendComponent, + TooltipComponent, + BarChart, + GridComponent, + CanvasRenderer, +]); interface IChartProps { getChartData: any; @@ -38,7 +53,7 @@ export class BarChartComponet extends React.Component { this.chart.resize(); } - public isHasData = (data: EChartOption) => { + public isHasData = (data: any) => { const noData = !(data.series && data.series.length); this.setState({ noData }); return !noData; @@ -54,7 +69,7 @@ export class BarChartComponet extends React.Component { const chartOptions = getChartData(); if ((typeof chartOptions.then) === 'function') { - return chartOptions.then((data: EChartOption) => { + return chartOptions.then((data: EChartsOption) => { this.setState({ loading: false }); if (this.isHasData(data)) { diff --git a/kafka-manager-console/src/component/chart/date-picker-chart.tsx b/kafka-manager-console/src/component/chart/date-picker-chart.tsx index cda4a6d0..39878805 100644 --- a/kafka-manager-console/src/component/chart/date-picker-chart.tsx +++ b/kafka-manager-console/src/component/chart/date-picker-chart.tsx @@ -3,16 +3,34 @@ import { DatePicker, notification, Spin } from 'component/antd'; import moment, { Moment } from 'moment'; import { timeStampStr } from 'constants/strategy'; import { disabledDate } from 'lib/utils'; -import echarts from 'echarts'; +import * as echarts from 'echarts/core'; -// 引入柱状图和折线图 -import 'echarts/lib/chart/bar'; -import 'echarts/lib/chart/line'; +// 引入柱状图 +import { BarChart, LineChart } from 'echarts/charts'; // 引入提示框和标题组件 -import 'echarts/lib/component/tooltip'; -import 'echarts/lib/component/title'; -import 'echarts/lib/component/legend'; +import { + TitleComponent, + TooltipComponent, + LegendComponent, + GridComponent, + MarkLineComponent, + DatasetComponent, +} from 'echarts/components'; +import { CanvasRenderer } from 'echarts/renderers'; + +// 注册必须的组件 +echarts.use([ + TitleComponent, + LegendComponent, + TooltipComponent, + GridComponent, + BarChart, + LineChart, + CanvasRenderer, + DatasetComponent, + MarkLineComponent, +]); import './index.less'; const { RangePicker } = DatePicker; @@ -61,11 +79,12 @@ export class ChartWithDatePicker extends React.Component { const noData = options.series.length ? false : true; this.setState({ noData }); options.tooltip.formatter = (params: any) => { - var res = - "

" + + let res = + '

' + params[0].data.time + - "

"; - for (var i = 0; i < params.length; i++) { + '

'; + // tslint:disable-next-line:prefer-for-of + for (let i = 0; i < params.length; i++) { res += `
; } - public renderEchart = (options: EChartOption, loading = false) => { + public renderEchart = (options: EChartsOption, loading = false) => { const data = hasData(options); if (loading) return this.renderLoading(400); if (!data) return this.renderNoData(400); @@ -51,7 +51,7 @@ export class HistoryDetail extends React.Component { } public renderHistoricalTraffic(metric: IMonitorMetric) { - const option = this.getChartOption() as EChartOption; + const option = this.getChartOption() as EChartsOption; return ( <> diff --git a/kafka-manager-console/src/container/common-curve/config.ts b/kafka-manager-console/src/container/common-curve/config.ts index b5206f4f..79b0d1fe 100644 --- a/kafka-manager-console/src/container/common-curve/config.ts +++ b/kafka-manager-console/src/container/common-curve/config.ts @@ -1,5 +1,4 @@ -import { EChartOption } from 'echarts/lib/echarts'; -import moment from 'moment'; +import { EChartsOption } from 'echarts'; export interface ILineData { value: number; @@ -9,7 +8,7 @@ export interface ICurve { title?: string; path: string; colors: string[]; - parser?: (option: ICurve, data: ILineData) => EChartOption; + parser?: (option: ICurve, data: ILineData) => EChartsOption; message?: string; unit?: string; api?: any; @@ -69,13 +68,13 @@ export const noAxis = { }, }; -export const getHight = (options: EChartOption) => { - let grid = options ? options.grid as EChartOption.Grid : null; +export const getHight = (options: any) => { + let grid = options ? options.grid : null; if (!options || !grid) grid = baseLineGrid; return Number(grid.height) + getLegendHight(options) + Number(grid.top) + LEGEND_PADDING + UNIT_HEIGHT; }; -export const getLegendHight = (options: EChartOption) => { +export const getLegendHight = (options: any) => { if (!options) return 0; if (options.legend.show === false) return 0; const legendHight = options.legend.textStyle.lineHeight + defaultLegendPadding; diff --git a/kafka-manager-console/src/container/common-curve/index.tsx b/kafka-manager-console/src/container/common-curve/index.tsx index 910aa70d..6dc09b90 100644 --- a/kafka-manager-console/src/container/common-curve/index.tsx +++ b/kafka-manager-console/src/container/common-curve/index.tsx @@ -1,4 +1,4 @@ -import { EChartOption } from 'echarts'; +import { EChartsOption } from 'echarts'; import { observer } from 'mobx-react'; import React from 'react'; import { curveInfo } from 'store/curve-info'; @@ -10,7 +10,7 @@ import LineChart, { hasData } from 'component/chart/line-chart'; export interface ICommonCurveProps { options: ICurve; - parser?: (option: ICurve, data: any[]) => EChartOption; + parser?: (option: ICurve, data: any[]) => any; } @observer @@ -41,7 +41,7 @@ export class CommonCurve extends React.Component { fullScreen.show(this.renderCurve(options, loading, true)); } - public renderOpBtns = (options: EChartOption, expand = false) => { + public renderOpBtns = (options: EChartsOption, expand = false) => { const data = hasData(options); return (
@@ -85,7 +85,7 @@ export class CommonCurve extends React.Component { return
; } - public renderEchart = (options: EChartOption, loading = false) => { + public renderEchart = (options: EChartsOption, loading = false) => { const height = getHight(options); const data = hasData(options); @@ -94,7 +94,7 @@ export class CommonCurve extends React.Component { return ; } - public renderCurve = (options: EChartOption, loading: boolean, expand = false) => { + public renderCurve = (options: any, loading: boolean, expand = false) => { const data = hasData(options); return (
diff --git a/kafka-manager-console/src/store/curve-info.ts b/kafka-manager-console/src/store/curve-info.ts index 9531c849..fc4c57a9 100644 --- a/kafka-manager-console/src/store/curve-info.ts +++ b/kafka-manager-console/src/store/curve-info.ts @@ -1,6 +1,6 @@ import { observable, action } from 'mobx'; import moment = require('moment'); -import { EChartOption } from 'echarts/lib/echarts'; +import { EChartsOption } from 'echarts'; import { ICurve } from 'container/common-curve/config'; import { curveKeys, PERIOD_RADIO_MAP } from 'container/admin/data-curve/config'; import { timeFormat } from 'constants/strategy'; @@ -13,7 +13,7 @@ class CurveInfo { public timeRange: [moment.Moment, moment.Moment] = PERIOD_RADIO_MAP.get(this.periodKey).dateRange; @observable - public curveData: { [key: string]: EChartOption } = {}; + public curveData: { [key: string]: EChartsOption } = {}; @observable public curveLoading: { [key: string]: boolean } = {}; @@ -25,7 +25,7 @@ class CurveInfo { public currentOperator: string; @action.bound - public setCurveData(key: curveKeys | string, data: EChartOption) { + public setCurveData(key: curveKeys | string, data: EChartsOption) { this.curveData[key] = data; } @@ -59,7 +59,7 @@ class CurveInfo { public getCommonCurveData = ( options: ICurve, - parser: (option: ICurve, data: any[]) => EChartOption, + parser: (option: ICurve, data: any[]) => EChartsOption, reload?: boolean) => { const { path } = options; this.setCurveData(path, null); From ec28eba7811fef4ec6be7e9056df74439cbcfc8c Mon Sep 17 00:00:00 2001 From: eilenexuzhe Date: Mon, 27 Dec 2021 17:12:51 +0800 Subject: [PATCH 02/16] feat: move webpack-dev-server to scripts --- kafka-manager-console/webpack.config.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka-manager-console/webpack.config.js b/kafka-manager-console/webpack.config.js index a07d9990..d6d12fa8 100644 --- a/kafka-manager-console/webpack.config.js +++ b/kafka-manager-console/webpack.config.js @@ -122,11 +122,11 @@ module.exports = { }, }, devServer: { - contentBase: outPath, + // contentBase: outPath, host: '127.0.0.1', port: 1025, hot: true, - disableHostCheck: true, + // disableHostCheck: true, historyApiFallback: true, proxy: { '/api/v1/': { From 4df2dc09fea79e2238541f408202fef01d13ae56 Mon Sep 17 00:00:00 2001 From: xuguang Date: Wed, 12 Jan 2022 16:15:46 +0800 Subject: [PATCH 03/16] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AF=B9BrokerMetadata?= =?UTF-8?q?=E4=B8=ADendpoints=E4=B8=BAinternal|External=E6=96=B9=E5=BC=8F?= =?UTF-8?q?=E7=9A=84=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka-manager-common/pom.xml | 5 + .../common/constant/KafkaConstant.java | 4 + .../common/entity/ao/common/IpPortData.java | 18 ++ .../znode/brokers/BrokerMetadata.java | 160 +++++++++--------- .../zookeeper/BrokerStateListener.java | 11 +- 5 files changed, 110 insertions(+), 88 deletions(-) create mode 100644 kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java diff --git a/kafka-manager-common/pom.xml b/kafka-manager-common/pom.xml index 6a8ff0cb..c914ffeb 100644 --- a/kafka-manager-common/pom.xml +++ b/kafka-manager-common/pom.xml @@ -109,5 +109,10 @@ junit junit + + org.projectlombok + lombok + compile + \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java index 4d69f914..463e9b1a 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java @@ -17,6 +17,10 @@ public class KafkaConstant { public static final String RETENTION_MS_KEY = "retention.ms"; + public static final String EXTERNAL_KEY = "EXTERNAL"; + + public static final String INTERNAL_KEY = "INTERNAL"; + private KafkaConstant() { } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java new file mode 100644 index 00000000..a16b32b4 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java @@ -0,0 +1,18 @@ +package com.xiaojukeji.kafka.manager.common.entity.ao.common; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class IpPortData implements Serializable { + private static final long serialVersionUID = -428897032994630685L; + + private String ip; + + private String port; +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java index 3c179b4f..e4e5063d 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java @@ -1,6 +1,17 @@ package com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; +import com.xiaojukeji.kafka.manager.common.entity.ao.common.IpPortData; +import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; +import lombok.Data; + +import java.io.Serializable; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @author zengqiao @@ -18,22 +29,48 @@ import java.util.List; * "version":4, * "rack": "CY" * } + * + * { + * "listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT","PLAINTEXT":"PLAINTEXT"}, + * "endpoints":["SASL_PLAINTEXT://10.179.162.202:9093","PLAINTEXT://10.179.162.202:9092"], + * "jmx_port":8099, + * "host":"10.179.162.202", + * "timestamp":"1628833925822", + * "port":9092, + * "version":4 + * } + * + * { + * "listener_security_protocol_map":{"EXTERNAL":"SASL_PLAINTEXT","INTERNAL":"SASL_PLAINTEXT"}, + * "endpoints":["EXTERNAL://10.179.162.202:7092","INTERNAL://10.179.162.202:7093"], + * "jmx_port":8099, + * "host":null, + * "timestamp":"1627289710439", + * "port":-1, + * "version":4 + * } + * */ -public class BrokerMetadata implements Cloneable { +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class BrokerMetadata implements Serializable { + private static final long serialVersionUID = 3918113492423375809L; + private long clusterId; private int brokerId; private List endpoints; + // > + private Map endpointMap; + private String host; private int port; - /* - * ZK上对应的字段就是这个名字, 不要进行修改 - */ - private int jmx_port; + @JsonProperty("jmx_port") + private int jmxPort; private String version; @@ -41,91 +78,54 @@ public class BrokerMetadata implements Cloneable { private String rack; - public long getClusterId() { - return clusterId; + @JsonIgnore + public String getExternalHost() { + if (!endpointMap.containsKey(KafkaConstant.EXTERNAL_KEY)) { + return null; + } + return endpointMap.get(KafkaConstant.EXTERNAL_KEY).getIp(); } - public void setClusterId(long clusterId) { - this.clusterId = clusterId; + @JsonIgnore + public String getInternalHost() { + if (!endpointMap.containsKey(KafkaConstant.INTERNAL_KEY)) { + return null; + } + return endpointMap.get(KafkaConstant.INTERNAL_KEY).getIp(); } - public int getBrokerId() { - return brokerId; - } + public static void parseAndUpdateBrokerMetadata(BrokerMetadata brokerMetadata) { + brokerMetadata.setEndpointMap(new HashMap<>()); - public void setBrokerId(int brokerId) { - this.brokerId = brokerId; - } + if (brokerMetadata.getEndpoints().isEmpty()) { + return; + } - public List getEndpoints() { - return endpoints; - } + // example EXTERNAL://10.179.162.202:7092 + for (String endpoint: brokerMetadata.getEndpoints()) { + int idx1 = endpoint.indexOf("://"); + int idx2 = endpoint.lastIndexOf(":"); + if (idx1 == -1 || idx2 == -1 || idx1 == idx2) { + continue; + } - public void setEndpoints(List endpoints) { - this.endpoints = endpoints; - } + String brokerHost = endpoint.substring(idx1 + "://".length(), idx2); + String brokerPort = endpoint.substring(idx2 + 1); - public String getHost() { - return host; - } + brokerMetadata.getEndpointMap().put(endpoint.substring(0, idx1), new IpPortData(brokerHost, brokerPort)); - public void setHost(String host) { - this.host = host; - } + if (KafkaConstant.EXTERNAL_KEY.equals(endpoint.substring(0, idx1))) { + // 优先使用external的地址进行展示 + brokerMetadata.setHost(brokerHost); + brokerMetadata.setPort(NumberUtils.string2Integer(brokerPort)); + } - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public int getJmxPort() { - return jmx_port; - } - - public void setJmxPort(int jmxPort) { - this.jmx_port = jmxPort; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - public String getRack() { - return rack; - } - - public void setRack(String rack) { - this.rack = rack; - } - - @Override - public String toString() { - return "BrokerMetadata{" + - "clusterId=" + clusterId + - ", brokerId=" + brokerId + - ", endpoints=" + endpoints + - ", host='" + host + '\'' + - ", port=" + port + - ", jmxPort=" + jmx_port + - ", version='" + version + '\'' + - ", timestamp=" + timestamp + - ", rack='" + rack + '\'' + - '}'; + if (null == brokerMetadata.getHost()) { + brokerMetadata.setHost(brokerHost); + brokerMetadata.setPort(NumberUtils.string2Integer(brokerPort)); + } + } } } + diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java index a94ec9de..f5cdefe8 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java @@ -74,15 +74,10 @@ public class BrokerStateListener implements StateChangeListener { BrokerMetadata brokerMetadata = null; try { brokerMetadata = zkConfig.get(ZkPathUtil.getBrokerIdNodePath(brokerId), BrokerMetadata.class); - if (!brokerMetadata.getEndpoints().isEmpty()) { - String endpoint = brokerMetadata.getEndpoints().get(0); - int idx = endpoint.indexOf("://"); - endpoint = endpoint.substring(idx + "://".length()); - idx = endpoint.indexOf(":"); - brokerMetadata.setHost(endpoint.substring(0, idx)); - brokerMetadata.setPort(Integer.parseInt(endpoint.substring(idx + 1))); - } + // 解析并更新本次存储的broker元信息 + BrokerMetadata.parseAndUpdateBrokerMetadata(brokerMetadata); + brokerMetadata.setClusterId(clusterId); brokerMetadata.setBrokerId(brokerId); PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata, jmxConfig); From a8d7eb27d91b906f95cc77400e820ed873e33ca7 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 12 Jan 2022 18:03:16 +0800 Subject: [PATCH 04/16] ldap config add default value --- container/helm/templates/configmap.yaml | 11 ++++++++++- .../account/component/sso/BaseSessionSignOn.java | 8 ++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/container/helm/templates/configmap.yaml b/container/helm/templates/configmap.yaml index cefa9d0d..1428cf11 100644 --- a/container/helm/templates/configmap.yaml +++ b/container/helm/templates/configmap.yaml @@ -67,7 +67,16 @@ data: # ldap settings ldap: enabled: false - authUserRegistration: false + url: ldap://127.0.0.1:389/ + basedn: dc=tsign,dc=cn + factory: com.sun.jndi.ldap.LdapCtxFactory + filter: sAMAccountName + security: + authentication: simple + principal: cn=admin,dc=tsign,dc=cn + credentials: admin + auth-user-registration: false + auth-user-registration-role: normal kcm: enabled: false diff --git a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/component/sso/BaseSessionSignOn.java b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/component/sso/BaseSessionSignOn.java index 1ff36964..d826507d 100644 --- a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/component/sso/BaseSessionSignOn.java +++ b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/component/sso/BaseSessionSignOn.java @@ -31,16 +31,16 @@ public class BaseSessionSignOn extends AbstractSingleSignOn { private LdapAuthentication ldapAuthentication; //是否开启ldap验证 - @Value(value = "${account.ldap.enabled:}") + @Value(value = "${account.ldap.enabled:false}") private Boolean accountLdapEnabled; //ldap自动注册的默认角色。请注意:它通常来说都是低权限角色 - @Value(value = "${account.ldap.auth-user-registration-role:}") + @Value(value = "${account.ldap.auth-user-registration-role:normal}") private String authUserRegistrationRole; //ldap自动注册是否开启 - @Value(value = "${account.ldap.auth-user-registration:}") - private boolean authUserRegistration; + @Value(value = "${account.ldap.auth-user-registration:false}") + private Boolean authUserRegistration; @Override public Result loginAndGetLdap(HttpServletRequest request, HttpServletResponse response, LoginDTO dto) { From c0f3259cf66b04b39324cc9197f4d2fd7a2e2c3a Mon Sep 17 00:00:00 2001 From: xuguang Date: Wed, 12 Jan 2022 19:56:37 +0800 Subject: [PATCH 05/16] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E3=80=81=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=B1=A0=E5=8F=AF?= =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/cache/KafkaClientPool.java | 27 ++++++-- .../cache/PhysicalClusterMetadataManager.java | 5 +- .../manager/service/cache/ThreadPool.java | 68 +++++++++++++------ .../service/impl/BrokerServiceImpl.java | 5 +- .../service/service/impl/JmxServiceImpl.java | 7 +- .../service/impl/TopicServiceImpl.java | 7 +- .../healthscore/DidiHealthScoreStrategy.java | 5 +- .../service/zookeeper/TopicStateListener.java | 8 ++- .../openapi/impl/ThirdPartServiceImpl.java | 5 +- .../collect/CollectAndPublishCGData.java | 5 +- .../FlushZKConsumerGroupMetadata.java | 5 +- .../src/main/resources/application.yml | 15 ++++ 12 files changed, 123 insertions(+), 39 deletions(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java index 56e17ae5..2e1e9e71 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java @@ -14,6 +14,8 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; import java.util.Map; import java.util.Properties; @@ -25,9 +27,22 @@ import java.util.concurrent.locks.ReentrantLock; * @author zengqiao * @date 19/12/24 */ +@Service public class KafkaClientPool { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class); + @Value(value = "${client-pool.kafka-consumer.min-idle-client-num:24}") + private Integer kafkaConsumerMinIdleClientNum; + + @Value(value = "${client-pool.kafka-consumer.max-idle-client-num:24}") + private Integer kafkaConsumerMaxIdleClientNum; + + @Value(value = "${client-pool.kafka-consumer.max-total-client-num:24}") + private Integer kafkaConsumerMaxTotalClientNum; + + @Value(value = "${client-pool.kafka-consumer.borrow-timeout-unit-ms:3000}") + private Integer kafkaConsumerBorrowTimeoutUnitMs; + /** * AdminClient */ @@ -84,7 +99,7 @@ public class KafkaClientPool { return true; } - private static void initKafkaConsumerPool(ClusterDO clusterDO) { + private void initKafkaConsumerPool(ClusterDO clusterDO) { lock.lock(); try { GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId()); @@ -92,9 +107,9 @@ public class KafkaClientPool { return; } GenericObjectPoolConfig> config = new GenericObjectPoolConfig<>(); - config.setMaxIdle(24); - config.setMinIdle(24); - config.setMaxTotal(24); + config.setMaxIdle(kafkaConsumerMaxIdleClientNum); + config.setMinIdle(kafkaConsumerMinIdleClientNum); + config.setMaxTotal(kafkaConsumerMaxTotalClientNum); KAFKA_CONSUMER_POOL.put(clusterDO.getId(), new GenericObjectPool<>(new KafkaConsumerFactory(clusterDO), config)); } catch (Exception e) { LOGGER.error("create kafka consumer pool failed, clusterDO:{}.", clusterDO, e); @@ -118,7 +133,7 @@ public class KafkaClientPool { } } - public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) { + public KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) { if (ValidateUtils.isNull(clusterDO)) { return null; } @@ -132,7 +147,7 @@ public class KafkaClientPool { } try { - return objectPool.borrowObject(3000); + return objectPool.borrowObject(kafkaConsumerBorrowTimeoutUnitMs); } catch (Exception e) { LOGGER.error("borrow kafka consumer client failed, clusterDO:{}.", clusterDO, e); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index c5f09820..79ecada1 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -50,6 +50,9 @@ public class PhysicalClusterMetadataManager { @Autowired private ClusterService clusterService; + @Autowired + private ThreadPool threadPool; + private static final Map CLUSTER_MAP = new ConcurrentHashMap<>(); private static final Map CONTROLLER_DATA_MAP = new ConcurrentHashMap<>(); @@ -125,7 +128,7 @@ public class PhysicalClusterMetadataManager { zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener); //增加Topic监控 - TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig); + TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig, threadPool); topicListener.init(); zkConfig.watchChildren(ZkPathUtil.BROKER_TOPICS_ROOT, topicListener); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java index f1b685cb..34f94871 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java @@ -1,37 +1,63 @@ package com.xiaojukeji.kafka.manager.service.cache; import com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; -import java.util.concurrent.*; +import javax.annotation.PostConstruct; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * @author zengqiao * @date 20/8/24 */ +@Service public class ThreadPool { - private static final ExecutorService COLLECT_METRICS_THREAD_POOL = new ThreadPoolExecutor( - 256, - 256, - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new DefaultThreadFactory("Collect-Metrics-Thread") - ); - private static final ExecutorService API_CALL_THREAD_POOL = new ThreadPoolExecutor( - 16, - 16, - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new DefaultThreadFactory("Api-Call-Thread") - ); + @Value(value = "${thread-pool.collect-metrics.thread-num:256}") + private Integer collectMetricsThreadNum; - public static void submitCollectMetricsTask(Runnable collectMetricsTask) { - COLLECT_METRICS_THREAD_POOL.submit(collectMetricsTask); + @Value(value = "${thread-pool.collect-metrics.queue-size:10000}") + private Integer collectMetricsQueueSize; + + @Value(value = "${thread-pool.api-call.thread-num:16}") + private Integer apiCallThreadNum; + + @Value(value = "${thread-pool.api-call.queue-size:10000}") + private Integer apiCallQueueSize; + + private ThreadPoolExecutor collectMetricsThreadPool; + + private ThreadPoolExecutor apiCallThreadPool; + + @PostConstruct + public void init() { + collectMetricsThreadPool = new ThreadPoolExecutor( + collectMetricsThreadNum, + collectMetricsThreadNum, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(collectMetricsQueueSize), + new DefaultThreadFactory("Collect-Metrics-Thread") + ); + + apiCallThreadPool = new ThreadPoolExecutor( + apiCallThreadNum, + apiCallThreadNum, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(apiCallQueueSize), + new DefaultThreadFactory("Api-Call-Thread") + ); } - public static void submitApiCallTask(Runnable apiCallTask) { - API_CALL_THREAD_POOL.submit(apiCallTask); + public void submitCollectMetricsTask(Long clusterId, Runnable collectMetricsTask) { + collectMetricsThreadPool.submit(collectMetricsTask); + } + + public void submitApiCallTask(Long clusterId, Runnable apiCallTask) { + apiCallThreadPool.submit(apiCallTask); } } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java index 24eea55f..ac3e0593 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java @@ -61,6 +61,9 @@ public class BrokerServiceImpl implements BrokerService { @Autowired private PhysicalClusterMetadataManager physicalClusterMetadataManager; + @Autowired + private ThreadPool threadPool; + @Override public ClusterBrokerStatus getClusterBrokerStatus(Long clusterId) { // 副本同步状态 @@ -201,7 +204,7 @@ public class BrokerServiceImpl implements BrokerService { return getBrokerMetricsFromJmx(clusterId, brokerId, metricsCode); } }); - ThreadPool.submitApiCallTask(taskList[i]); + threadPool.submitApiCallTask(clusterId, taskList[i]); } List metricsList = new ArrayList<>(brokerIdSet.size()); for (int i = 0; i < brokerIdList.size(); i++) { diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java index 611dc203..d0f0c514 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java @@ -39,6 +39,9 @@ public class JmxServiceImpl implements JmxService { @Autowired private PhysicalClusterMetadataManager physicalClusterMetadataManager; + @Autowired + private ThreadPool threadPool; + @Override public BrokerMetrics getBrokerMetrics(Long clusterId, Integer brokerId, Integer metricsCode) { if (clusterId == null || brokerId == null || metricsCode == null) { @@ -98,7 +101,7 @@ public class JmxServiceImpl implements JmxService { ); } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } List metricsList = new ArrayList<>(); @@ -303,7 +306,7 @@ public class JmxServiceImpl implements JmxService { return metricsList; } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } Map metricsMap = new HashMap<>(); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 154faf77..aa4fe3fb 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -87,6 +87,9 @@ public class TopicServiceImpl implements TopicService { @Autowired private AbstractHealthScoreStrategy healthScoreStrategy; + @Autowired + private KafkaClientPool kafkaClientPool; + @Override public List getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) { try { @@ -340,7 +343,7 @@ public class TopicServiceImpl implements TopicService { Map topicPartitionLongMap = new HashMap<>(); KafkaConsumer kafkaConsumer = null; try { - kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO); + kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO); if ((offsetPosEnum.getCode() & OffsetPosEnum.END.getCode()) > 0) { topicPartitionLongMap = kafkaConsumer.endOffsets(topicPartitionList); } else if ((offsetPosEnum.getCode() & OffsetPosEnum.BEGINNING.getCode()) > 0) { @@ -541,7 +544,7 @@ public class TopicServiceImpl implements TopicService { List partitionOffsetDTOList = new ArrayList<>(); try { - kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO); + kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO); Map offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch); if (offsetAndTimestampMap == null) { return new ArrayList<>(); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java index d75dec5a..51295644 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java @@ -45,6 +45,9 @@ public class DidiHealthScoreStrategy extends AbstractHealthScoreStrategy { @Autowired private JmxService jmxService; + @Autowired + private ThreadPool threadPool; + @Override public Integer calBrokerHealthScore(Long clusterId, Integer brokerId) { BrokerMetadata brokerMetadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId); @@ -125,7 +128,7 @@ public class DidiHealthScoreStrategy extends AbstractHealthScoreStrategy { return calBrokerHealthScore(clusterId, brokerId); } }); - ThreadPool.submitApiCallTask(taskList[i]); + threadPool.submitApiCallTask(clusterId, taskList[i]); } Integer topicHealthScore = HEALTH_SCORE_HEALTHY; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java index 4314a101..6f3d33b3 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java @@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.service.cache.ThreadPool; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import java.util.HashSet; import java.util.List; @@ -28,9 +29,12 @@ public class TopicStateListener implements StateChangeListener { private ZkConfigImpl zkConfig; - public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig) { + private ThreadPool threadPool; + + public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig, ThreadPool threadPool) { this.clusterId = clusterId; this.zkConfig = zkConfig; + this.threadPool = threadPool; } @Override @@ -47,7 +51,7 @@ public class TopicStateListener implements StateChangeListener { return null; } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } } catch (Exception e) { LOGGER.error("init topics metadata failed, clusterId:{}.", clusterId, e); diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java index 5df7815e..07b0a3e3 100644 --- a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java @@ -42,6 +42,9 @@ public class ThirdPartServiceImpl implements ThirdPartService { @Autowired private ConsumerService consumerService; + @Autowired + private KafkaClientPool kafkaClientPool; + @Override public Result checkConsumeHealth(Long clusterId, String topicName, @@ -109,7 +112,7 @@ public class ThirdPartServiceImpl implements ThirdPartService { Long timestamp) { KafkaConsumer kafkaConsumer = null; try { - kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO); + kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO); if (ValidateUtils.isNull(kafkaConsumer)) { return null; } diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java index cc67428f..28bb1612 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java @@ -44,6 +44,9 @@ public class CollectAndPublishCGData extends AbstractScheduledTask { @Autowired private ConsumerService consumerService; + @Autowired + private ThreadPool threadPool; + @Override protected List listAllTasks() { return clusterService.list(); @@ -82,7 +85,7 @@ public class CollectAndPublishCGData extends AbstractScheduledTask { return getTopicConsumerMetrics(clusterDO, topicName, startTimeUnitMs); } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterDO.getId(), taskList[i]); } List consumerMetricsList = new ArrayList<>(); diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java index a7d196af..54321240 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java @@ -32,6 +32,9 @@ public class FlushZKConsumerGroupMetadata { @Autowired private ClusterService clusterService; + @Autowired + private ThreadPool threadPool; + @Scheduled(cron="35 0/1 * * * ?") public void schedule() { List doList = clusterService.list(); @@ -95,7 +98,7 @@ public class FlushZKConsumerGroupMetadata { return new ArrayList<>(); } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } Map> topicNameConsumerGroupMap = new HashMap<>(); diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 4463d746..6b776773 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -96,3 +96,18 @@ notify: topic-name: didi-kafka-notify order: detail-url: http://127.0.0.1 + +thread-pool: + collect-metrics: + thread-num: 256 # 收集指标线程池大小 + queue-size: 5000 # 收集指标线程池的queue大小 + api-call: + thread-num: 16 # api服务线程池大小 + queue-size: 5000 # api服务线程池的queue大小 + +client-pool: + kafka-consumer: + min-idle-client-num: 24 # 最小空闲客户端数 + max-idle-client-num: 24 # 最大空闲客户端数 + max-total-client-num: 24 # 最大客户端数 + borrow-timeout-unit-ms: 3000 # 租借超时时间,单位秒 From ada2718b5e9ac0d150ce9c7c1256aecdbb03d276 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E8=B6=85?= Date: Thu, 13 Jan 2022 11:13:26 +0800 Subject: [PATCH 06/16] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=9B=BE=E7=89=87?= =?UTF-8?q?=E5=90=8D=E7=A7=B0=E5=A4=A7=E5=B0=8F=E5=86=99=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka-manager-console/src/container/header/index.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-manager-console/src/container/header/index.tsx b/kafka-manager-console/src/container/header/index.tsx index d0b8febe..0205e1be 100644 --- a/kafka-manager-console/src/container/header/index.tsx +++ b/kafka-manager-console/src/container/header/index.tsx @@ -7,7 +7,7 @@ import { urlPrefix } from 'constants/left-menu'; import { region, IRegionIdcs } from 'store/region'; import logoUrl from '../../assets/image/kafka-logo.png'; import userIcon from '../../assets/image/normal.png'; -import weChat from '../../assets/image/wechat.png'; +import weChat from '../../assets/image/weChat.png'; import { users } from 'store/users'; import { observer } from 'mobx-react'; import { Link } from 'react-router-dom'; From 52ccaeffd59ec8a27fcf28a67a35e7cc22c4fa51 Mon Sep 17 00:00:00 2001 From: xuguang Date: Thu, 13 Jan 2022 11:48:43 +0800 Subject: [PATCH 07/16] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E4=BE=9D=E8=B5=96?= =?UTF-8?q?=E5=86=B2=E7=AA=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka-manager-common/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kafka-manager-common/pom.xml b/kafka-manager-common/pom.xml index 6a8ff0cb..c914ffeb 100644 --- a/kafka-manager-common/pom.xml +++ b/kafka-manager-common/pom.xml @@ -109,5 +109,10 @@ junit junit + + org.projectlombok + lombok + compile + \ No newline at end of file From 9e3bc80495d360b8c10f0605f84126f827e1fc54 Mon Sep 17 00:00:00 2001 From: xuguang Date: Thu, 13 Jan 2022 15:35:11 +0800 Subject: [PATCH 08/16] =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=20&=20BrokerM?= =?UTF-8?q?etadata=20=E9=97=AE=E9=A2=98=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- distribution/conf/application.yml.example | 17 ++++++++++++++++- .../zookeeper/znode/brokers/BrokerMetadata.java | 8 ++++---- .../kafka/manager/service/cache/ThreadPool.java | 4 ++-- .../src/main/resources/application.yml | 6 +++--- 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/distribution/conf/application.yml.example b/distribution/conf/application.yml.example index 138a44fe..d4d57777 100644 --- a/distribution/conf/application.yml.example +++ b/distribution/conf/application.yml.example @@ -121,4 +121,19 @@ notify: # 通知的功能 cluster-id: 95 # Topic的集群ID topic-name: didi-kafka-notify # Topic名称 order: # 部署的KM的地址 - detail-url: http://127.0.0.1 \ No newline at end of file + detail-url: http://127.0.0.1 + +thread-pool: + collect-metrics: + thread-num: 256 # 收集指标线程池大小 + queue-size: 5000 # 收集指标线程池的queue大小 + api-call: + thread-num: 16 # api服务线程池大小 + queue-size: 5000 # api服务线程池的queue大小 + +client-pool: + kafka-consumer: + min-idle-client-num: 24 # 最小空闲客户端数 + max-idle-client-num: 24 # 最大空闲客户端数 + max-total-client-num: 24 # 最大客户端数 + borrow-timeout-unit-ms: 3000 # 租借超时时间,单位秒 \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java index e4e5063d..598784ca 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java @@ -21,7 +21,7 @@ import java.util.Map; * 节点结构: * { * "listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT"}, - * "endpoints":["SASL_PLAINTEXT://10.179.162.202:9093"], + * "endpoints":["SASL_PLAINTEXT://127.0.0.1:9093"], * "jmx_port":9999, * "host":null, * "timestamp":"1546632983233", @@ -32,9 +32,9 @@ import java.util.Map; * * { * "listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT","PLAINTEXT":"PLAINTEXT"}, - * "endpoints":["SASL_PLAINTEXT://10.179.162.202:9093","PLAINTEXT://10.179.162.202:9092"], + * "endpoints":["SASL_PLAINTEXT://127.0.0.1:9093","PLAINTEXT://127.0.0.1:9092"], * "jmx_port":8099, - * "host":"10.179.162.202", + * "host":"127.0.0.1", * "timestamp":"1628833925822", * "port":9092, * "version":4 @@ -42,7 +42,7 @@ import java.util.Map; * * { * "listener_security_protocol_map":{"EXTERNAL":"SASL_PLAINTEXT","INTERNAL":"SASL_PLAINTEXT"}, - * "endpoints":["EXTERNAL://10.179.162.202:7092","INTERNAL://10.179.162.202:7093"], + * "endpoints":["EXTERNAL://127.0.0.1:7092","INTERNAL://127.0.0.1:7093"], * "jmx_port":8099, * "host":null, * "timestamp":"1627289710439", diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java index 34f94871..ba870465 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java @@ -40,7 +40,7 @@ public class ThreadPool { 120L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(collectMetricsQueueSize), - new DefaultThreadFactory("Collect-Metrics-Thread") + new DefaultThreadFactory("TaskThreadPool") ); apiCallThreadPool = new ThreadPoolExecutor( @@ -49,7 +49,7 @@ public class ThreadPool { 120L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(apiCallQueueSize), - new DefaultThreadFactory("Api-Call-Thread") + new DefaultThreadFactory("ApiThreadPool") ); } diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 6f297554..0bfa8972 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -105,15 +105,15 @@ notify: thread-pool: collect-metrics: - thread-num: 256 # 收集指标线程池大小 + thread-num: 256 # 收集指标线程池大小 queue-size: 5000 # 收集指标线程池的queue大小 api-call: - thread-num: 16 # api服务线程池大小 + thread-num: 16 # api服务线程池大小 queue-size: 5000 # api服务线程池的queue大小 client-pool: kafka-consumer: - min-idle-client-num: 24 # 最小空闲客户端数 + min-idle-client-num: 24 # 最小空闲客户端数 max-idle-client-num: 24 # 最大空闲客户端数 max-total-client-num: 24 # 最大客户端数 borrow-timeout-unit-ms: 3000 # 租借超时时间,单位秒 From 373680d85466e56de6aabf75b6f32641f46209c9 Mon Sep 17 00:00:00 2001 From: xuguang Date: Thu, 13 Jan 2022 15:39:39 +0800 Subject: [PATCH 09/16] =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=20&=20BrokerM?= =?UTF-8?q?etadata=20=E9=97=AE=E9=A2=98=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka-manager-common/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka-manager-common/pom.xml b/kafka-manager-common/pom.xml index c914ffeb..f6c33def 100644 --- a/kafka-manager-common/pom.xml +++ b/kafka-manager-common/pom.xml @@ -109,6 +109,7 @@ junit junit + org.projectlombok lombok From fe0f6fcd0b3f15aff72fa0524f93e26fe45a401b Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 13 Jan 2022 16:02:33 +0800 Subject: [PATCH 10/16] fix config incorrectly comment --- distribution/conf/application.yml.example | 2 +- kafka-manager-web/src/main/resources/application.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/distribution/conf/application.yml.example b/distribution/conf/application.yml.example index d4d57777..1278e3d2 100644 --- a/distribution/conf/application.yml.example +++ b/distribution/conf/application.yml.example @@ -136,4 +136,4 @@ client-pool: min-idle-client-num: 24 # 最小空闲客户端数 max-idle-client-num: 24 # 最大空闲客户端数 max-total-client-num: 24 # 最大客户端数 - borrow-timeout-unit-ms: 3000 # 租借超时时间,单位秒 \ No newline at end of file + borrow-timeout-unit-ms: 3000 # 租借超时时间,单位毫秒 \ No newline at end of file diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 0bfa8972..19ba8593 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -116,4 +116,4 @@ client-pool: min-idle-client-num: 24 # 最小空闲客户端数 max-idle-client-num: 24 # 最大空闲客户端数 max-total-client-num: 24 # 最大客户端数 - borrow-timeout-unit-ms: 3000 # 租借超时时间,单位秒 + borrow-timeout-unit-ms: 3000 # 租借超时时间,单位毫秒 From 52c52b2a0df956541c2797c72f36e06bd2ceed20 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Fri, 14 Jan 2022 14:24:04 +0800 Subject: [PATCH 11/16] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=AE=80=E5=8D=95?= =?UTF-8?q?=E5=9B=9E=E9=80=80=E5=B7=A5=E5=85=B7=E7=B1=BB=EF=BC=8C=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0Jmx=E8=BF=9E=E6=8E=A5=E5=A4=B1=E8=B4=A5=E5=9B=9E?= =?UTF-8?q?=E9=80=80=E5=8A=9F=E8=83=BD=E6=9C=BA=E5=88=B6=EF=BC=8C=E4=BC=98?= =?UTF-8?q?=E5=8C=96Jmx=E8=BF=9E=E6=8E=A5=E5=A4=B1=E8=B4=A5=E6=97=A5?= =?UTF-8?q?=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../manager/common/utils/BackoffUtils.java | 55 +++++++++++++ .../manager/common/utils/jmx/JmxConfig.java | 50 +++--------- .../common/utils/jmx/JmxConnectorWrap.java | 80 ++++++++++++++++--- 3 files changed, 135 insertions(+), 50 deletions(-) diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/BackoffUtils.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/BackoffUtils.java index f4218020..084ea5a6 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/BackoffUtils.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/BackoffUtils.java @@ -1,9 +1,18 @@ package com.xiaojukeji.kafka.manager.common.utils; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + public class BackoffUtils { private BackoffUtils() { } + /** + * 需要进行延迟的事件 + * <事件名,延迟结束事件> + */ + private static final Map NEED_BACK_OFF_EVENT_MAP = new ConcurrentHashMap<>(); + public static void backoff(long timeUnitMs) { if (timeUnitMs <= 0) { return; @@ -17,4 +26,50 @@ public class BackoffUtils { // ignore } } + + /** + * 记录延迟设置 + * @param backoffEventKey 回退事件key + * @param backoffTimeUnitMs 回退时间(ms) + */ + public static void putNeedBackoffEvent(String backoffEventKey, Long backoffTimeUnitMs) { + if (backoffEventKey == null || backoffTimeUnitMs == null || backoffTimeUnitMs <= 0) { + return; + } + + NEED_BACK_OFF_EVENT_MAP.put(backoffEventKey, backoffTimeUnitMs + System.currentTimeMillis()); + } + + /** + * 移除回退设置 + * @param backoffEventKey 回退事件key + */ + public static void removeNeedBackoffEvent(String backoffEventKey) { + NEED_BACK_OFF_EVENT_MAP.remove(backoffEventKey); + } + + /** + * 检查是否需要回退 + * @param backoffEventKey 回退事件key + * @return + */ + public static boolean isNeedBackoff(String backoffEventKey) { + Long backoffEventEndTimeUnitMs = NEED_BACK_OFF_EVENT_MAP.get(backoffEventKey); + if (backoffEventEndTimeUnitMs == null) { + return false; + } + + if (backoffEventEndTimeUnitMs > System.currentTimeMillis()) { + return true; + } + + // 移除 + try { + NEED_BACK_OFF_EVENT_MAP.remove(backoffEventKey, backoffEventEndTimeUnitMs); + } catch (Exception e) { + // 如果key不存在,这里可能出现NPE,不过不管什么异常都可以忽略 + } + + return false; + } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConfig.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConfig.java index bbc913c4..f5c380c2 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConfig.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConfig.java @@ -1,5 +1,10 @@ package com.xiaojukeji.kafka.manager.common.utils.jmx; +import lombok.Data; +import lombok.ToString; + +@Data +@ToString public class JmxConfig { /** * 单台最大连接数 @@ -21,45 +26,8 @@ public class JmxConfig { */ private Boolean openSSL; - public Integer getMaxConn() { - return maxConn; - } - - public void setMaxConn(Integer maxConn) { - this.maxConn = maxConn; - } - - public String getUsername() { - return username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public Boolean isOpenSSL() { - return openSSL; - } - - public void setOpenSSL(Boolean openSSL) { - this.openSSL = openSSL; - } - - @Override - public String toString() { - return "JmxConfig{" + - "maxConn=" + maxConn + - ", username='" + username + '\'' + - ", password='" + password + '\'' + - ", openSSL=" + openSSL + - '}'; - } + /** + * 连接重试回退事件 + */ + private Long retryConnectBackoffTimeUnitMs; } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java index db1e2341..c66c7bc6 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; /** * JMXConnector包装类 @@ -41,6 +42,8 @@ public class JmxConnectorWrap { private JmxConfig jmxConfig; + private final ReentrantLock modifyJMXConnectorLock = new ReentrantLock(); + public JmxConnectorWrap(Long physicalClusterId, Integer brokerId, String host, int port, JmxConfig jmxConfig) { this.physicalClusterId = physicalClusterId; this.brokerId = brokerId; @@ -51,7 +54,12 @@ public class JmxConnectorWrap { this.jmxConfig = new JmxConfig(); } if (ValidateUtils.isNullOrLessThanZero(this.jmxConfig.getMaxConn())) { - this.jmxConfig.setMaxConn(1); + // 默认设置20 + this.jmxConfig.setMaxConn(20); + } + if (ValidateUtils.isNullOrLessThanZero(this.jmxConfig.getRetryConnectBackoffTimeUnitMs())) { + // 默认回退10分钟 + this.jmxConfig.setRetryConnectBackoffTimeUnitMs(10 * 60 * 1000L); } this.atomicInteger = new AtomicInteger(this.jmxConfig.getMaxConn()); } @@ -63,17 +71,40 @@ public class JmxConnectorWrap { if (port == -1) { return false; } - return createJmxConnector(); + return safeCreateJmxConnector(); } - public synchronized void close() { + public void close() { + this.closeJmxConnect(); + } + + public void closeJmxConnect() { if (jmxConnector == null) { return; } + try { + modifyJMXConnectorLock.lock(); + + // 移除设置的backoff事件 + BackoffUtils.removeNeedBackoffEvent(buildConnectJmxFailedBackoffEventKey(physicalClusterId, brokerId)); + jmxConnector.close(); - } catch (IOException e) { - LOGGER.warn("close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e); + } catch (Exception e) { + LOGGER.error("close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e); + } finally { + jmxConnector = null; + + modifyJMXConnectorLock.unlock(); + } + } + + private boolean safeCreateJmxConnector() { + try { + modifyJMXConnectorLock.lock(); + return createJmxConnector(); + } finally { + modifyJMXConnectorLock.unlock(); } } @@ -81,6 +112,12 @@ public class JmxConnectorWrap { if (jmxConnector != null) { return true; } + + if (BackoffUtils.isNeedBackoff(buildConnectJmxFailedBackoffEventKey(physicalClusterId, brokerId))) { + // 被设置了需要进行回退,则本次不进行创建 + return false; + } + String jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", host, port); try { Map environment = new HashMap(); @@ -88,7 +125,9 @@ public class JmxConnectorWrap { // fixed by riyuetianmu environment.put(JMXConnector.CREDENTIALS, new String[]{this.jmxConfig.getUsername(), this.jmxConfig.getPassword()}); } - if (jmxConfig.isOpenSSL() != null && this.jmxConfig.isOpenSSL()) { + + if (jmxConfig.getOpenSSL() != null && this.jmxConfig.getOpenSSL()) { + // 开启ssl environment.put(Context.SECURITY_PROTOCOL, "ssl"); SslRMIClientSocketFactory clientSocketFactory = new SslRMIClientSocketFactory(); environment.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, clientSocketFactory); @@ -96,13 +135,17 @@ public class JmxConnectorWrap { } jmxConnector = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment); - LOGGER.info("JMX connect success, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port); + LOGGER.info("connect JMX success, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port); return true; } catch (MalformedURLException e) { - LOGGER.error("JMX url exception, physicalClusterId:{} brokerId:{} host:{} port:{} jmxUrl:{}", physicalClusterId, brokerId, host, port, jmxUrl, e); + LOGGER.error("connect JMX failed, JMX url exception, physicalClusterId:{} brokerId:{} host:{} port:{} jmxUrl:{}.", physicalClusterId, brokerId, host, port, jmxUrl, e); } catch (Exception e) { - LOGGER.error("JMX connect exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e); + LOGGER.error("connect JMX failed, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e); } + + // 设置连接backoff + BackoffUtils.putNeedBackoffEvent(buildConnectJmxFailedBackoffEventKey(physicalClusterId, brokerId), this.jmxConfig.getRetryConnectBackoffTimeUnitMs()); + return false; } @@ -116,6 +159,11 @@ public class JmxConnectorWrap { acquire(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); return mBeanServerConnection.getAttribute(name, attribute); + } catch (IOException ioe) { + // io错误,则重置连接 + this.closeJmxConnect(); + + throw ioe; } finally { atomicInteger.incrementAndGet(); } @@ -131,6 +179,11 @@ public class JmxConnectorWrap { acquire(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); return mBeanServerConnection.getAttributes(name, attributes); + } catch (IOException ioe) { + // io错误,则重置连接 + this.closeJmxConnect(); + + throw ioe; } finally { atomicInteger.incrementAndGet(); } @@ -143,6 +196,11 @@ public class JmxConnectorWrap { acquire(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); return mBeanServerConnection.queryNames(name, query); + } catch (IOException ioe) { + // io错误,则重置连接 + this.closeJmxConnect(); + + throw ioe; } finally { atomicInteger.incrementAndGet(); } @@ -165,4 +223,8 @@ public class JmxConnectorWrap { } } } + + private static String buildConnectJmxFailedBackoffEventKey(Long physicalClusterId, Integer brokerId) { + return "CONNECT_JMX_FAILED_BACK_OFF_EVENT_PHY_" + physicalClusterId + "_BROKER_" + brokerId; + } } From e4371b5d029fbefef0be874c1905b464c817011a Mon Sep 17 00:00:00 2001 From: zengqiao Date: Fri, 14 Jan 2022 14:28:45 +0800 Subject: [PATCH 12/16] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../xiaojukeji/kafka/manager/common/utils/BackoffUtils.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/BackoffUtils.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/BackoffUtils.java index 084ea5a6..afbf8fc4 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/BackoffUtils.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/BackoffUtils.java @@ -8,8 +8,8 @@ public class BackoffUtils { } /** - * 需要进行延迟的事件 - * <事件名,延迟结束事件> + * 需要进行回退的事件信息 + * <回退事件名,回退结束时间> */ private static final Map NEED_BACK_OFF_EVENT_MAP = new ConcurrentHashMap<>(); @@ -28,7 +28,7 @@ public class BackoffUtils { } /** - * 记录延迟设置 + * 记录回退设置 * @param backoffEventKey 回退事件key * @param backoffTimeUnitMs 回退时间(ms) */ From d6181522c0042c0b441305be8d0285f9d6efee74 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 17 Jan 2022 11:42:30 +0800 Subject: [PATCH 13/16] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dswagger=E6=8A=9B?= =?UTF-8?q?=E5=87=BA=E7=9A=84NumberFormatException=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pom.xml b/pom.xml index 9d226f87..e790bfd3 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,11 @@ swagger-annotations ${swagger.version} + + io.swagger + swagger-models + ${swagger.version} + From f6ba8bc95e9baadc4cfa57aa4aa1dd82110dddc0 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 17 Jan 2022 13:17:07 +0800 Subject: [PATCH 14/16] =?UTF-8?q?Swagger=E6=8F=90=E7=A4=BA=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E5=92=8CPOM=E4=B8=AD=E7=89=88=E6=9C=AC=E9=80=9A?= =?UTF-8?q?=E8=BF=87=E9=85=8D=E7=BD=AE=E4=BF=9D=E6=8C=81=E4=B8=80=E8=87=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- distribution/conf/application.yml | 1 + distribution/conf/application.yml.example | 1 + .../kafka/manager/service/utils/ConfigUtils.java | 5 ++++- .../xiaojukeji/kafka/manager/web/config/SwaggerConfig.java | 7 ++++++- kafka-manager-web/src/main/resources/application.yml | 1 + 5 files changed, 13 insertions(+), 2 deletions(-) diff --git a/distribution/conf/application.yml b/distribution/conf/application.yml index 6b78c104..a11cb737 100644 --- a/distribution/conf/application.yml +++ b/distribution/conf/application.yml @@ -15,6 +15,7 @@ server: spring: application: name: kafkamanager + version: @project.version@ profiles: active: dev datasource: diff --git a/distribution/conf/application.yml.example b/distribution/conf/application.yml.example index 1278e3d2..3894a570 100644 --- a/distribution/conf/application.yml.example +++ b/distribution/conf/application.yml.example @@ -15,6 +15,7 @@ server: spring: application: name: kafkamanager + version: @project.version@ profiles: active: dev datasource: diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java index 751e08c2..40b73868 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java @@ -18,6 +18,9 @@ public class ConfigUtils { @Value(value = "${custom.idc:cn}") private String idc; - @Value(value = "${spring.profiles.active}") + @Value(value = "${spring.profiles.active:dev}") private String kafkaManagerEnv; + + @Value(value = "${spring.application.version:unknown}") + private String applicationVersion; } diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/SwaggerConfig.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/SwaggerConfig.java index f4ae13a4..f8406cfe 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/SwaggerConfig.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/SwaggerConfig.java @@ -1,5 +1,7 @@ package com.xiaojukeji.kafka.manager.web.config; +import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.*; @@ -20,6 +22,9 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2; @EnableWebMvc @EnableSwagger2 public class SwaggerConfig implements WebMvcConfigurer { + @Autowired + private ConfigUtils configUtils; + @Override public void addResourceHandlers(ResourceHandlerRegistry registry) { registry.addResourceHandler("swagger-ui.html").addResourceLocations("classpath:/META-INF/resources/"); @@ -41,7 +46,7 @@ public class SwaggerConfig implements WebMvcConfigurer { return new ApiInfoBuilder() .title("LogiKM接口文档") .description("欢迎使用滴滴LogiKM") - .version("2.5.0") + .version(configUtils.getApplicationVersion()) .build(); } diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 19ba8593..3cce7463 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -9,6 +9,7 @@ server: spring: application: name: kafkamanager + version: @project.version@ profiles: active: dev datasource: From 2790099efa55c84dc76cbda92d009fc1bb6dba2f Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 17 Jan 2022 15:28:36 +0800 Subject: [PATCH 15/16] =?UTF-8?q?=E6=A2=B3=E7=90=86Task=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1-BrokerMetrics=E4=BB=BB=E5=8A=A1=E6=A2=B3?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- distribution/conf/application.yml.example | 144 ++++++++--------- .../metrics/BaseMetricsCollectedEvent.java | 33 ++++ .../BatchBrokerMetricsCollectedEvent.java | 22 +++ .../task/component/AbstractScheduledTask.java | 3 +- .../manager/task/component/BaseBizTask.java | 8 +- .../CollectAndPublishBrokerMetrics.java | 93 +++++++++++ .../metrics/delete/DeleteMetrics.java | 14 +- .../metrics/store/StoreBrokerMetrics.java | 146 ------------------ .../sink/db/SinkBrokerMetrics2DB.java | 55 +++++++ .../sink/db/SinkClusterMetrics2DB.java | 80 ++++++++++ .../db}/StoreCommunityTopicMetrics2DB.java | 2 +- .../db}/StoreTopicThrottledMetrics2DB.java | 2 +- .../SinkCommunityTopicMetrics2Kafka.java | 2 +- .../kafka}/SinkConsumerMetrics2Kafka.java | 2 +- .../SinkCommunityTopicMetrics2Monitor.java | 2 +- .../monitor}/SinkConsumerMetrics2Monitor.java | 2 +- .../SinkTopicThrottledMetrics2Monitor.java | 2 +- .../src/main/resources/application.yml | 50 +++--- 18 files changed, 398 insertions(+), 264 deletions(-) create mode 100644 kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/events/metrics/BaseMetricsCollectedEvent.java create mode 100644 kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/events/metrics/BatchBrokerMetricsCollectedEvent.java create mode 100644 kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishBrokerMetrics.java delete mode 100644 kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java create mode 100644 kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkBrokerMetrics2DB.java create mode 100644 kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkClusterMetrics2DB.java rename kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/{ => sink/db}/StoreCommunityTopicMetrics2DB.java (97%) rename kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/{ => sink/db}/StoreTopicThrottledMetrics2DB.java (98%) rename kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/{ => sink/kafka}/SinkCommunityTopicMetrics2Kafka.java (98%) rename kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/{ => sink/kafka}/SinkConsumerMetrics2Kafka.java (98%) rename kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/{ => sink/monitor}/SinkCommunityTopicMetrics2Monitor.java (98%) rename kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/{ => sink/monitor}/SinkConsumerMetrics2Monitor.java (99%) rename kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/{ => sink/monitor}/SinkTopicThrottledMetrics2Monitor.java (98%) diff --git a/distribution/conf/application.yml.example b/distribution/conf/application.yml.example index 3894a570..ee0290d8 100644 --- a/distribution/conf/application.yml.example +++ b/distribution/conf/application.yml.example @@ -27,7 +27,6 @@ spring: main: allow-bean-definition-overriding: true - servlet: multipart: max-file-size: 100MB @@ -37,28 +36,32 @@ logging: config: classpath:logback-spring.xml custom: - idc: cn # 部署的数据中心, 忽略该配置, 后续会进行删除 - jmx: - max-conn: 10 # 2.3版本配置不在这个地方生效 + idc: cn store-metrics-task: community: - broker-metrics-enabled: true # 社区部分broker metrics信息收集开关, 关闭之后metrics信息将不会进行收集及写DB - topic-metrics-enabled: true # 社区部分topic的metrics信息收集开关, 关闭之后metrics信息将不会进行收集及写DB - didi: - app-topic-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭 - topic-request-time-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭 - topic-throttled-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭 + topic-metrics-enabled: true + didi: # 滴滴Kafka特有的指标 + app-topic-metrics-enabled: false + topic-request-time-metrics-enabled: false + topic-throttled-metrics-enabled: false -# 任务相关的开关 +# 任务相关的配置 task: op: - sync-topic-enabled: false # 未落盘的Topic定期同步到DB中 - order-auto-exec: # 工单自动化审批线程的开关 - topic-enabled: false # Topic工单自动化审批开关, false:关闭自动化审批, true:开启 - app-enabled: false # App工单自动化审批开关, false:关闭自动化审批, true:开启 + sync-topic-enabled: false # 未落盘的Topic定期同步到DB中 + order-auto-exec: # 工单自动化审批线程的开关 + topic-enabled: false # Topic工单自动化审批开关, false:关闭自动化审批, true:开启 + app-enabled: false # App工单自动化审批开关, false:关闭自动化审批, true:开启 metrics: - delete-metrics: - delete-limit-size: 1000 + collect: # 收集指标 + broker-metrics-enabled: true # 收集Broker指标 + sink: # 上报指标 + cluster-metrics: # 上报cluster指标 + sink-db-enabled: true # 上报到db + broker-metrics: # 上报broker指标 + sink-db-enabled: true # 上报到db + delete: # 删除指标 + delete-limit-size: 1000 # 单次删除的批大小 cluster-metrics-save-days: 14 # 集群指标保存天数 broker-metrics-save-days: 14 # Broker指标保存天数 topic-metrics-save-days: 7 # Topic指标保存天数 @@ -66,64 +69,6 @@ task: topic-throttled-metrics-save-days: 7 # Topic限流指标保存天数 app-topic-metrics-save-days: 7 # App+Topic指标保存天数 -# ldap相关的配置 -account: - ldap: - enabled: false - url: ldap://127.0.0.1:389/ - basedn: dc=tsign,dc=cn - factory: com.sun.jndi.ldap.LdapCtxFactory - filter: sAMAccountName - security: - authentication: simple - principal: cn=admin,dc=tsign,dc=cn - credentials: admin - auth-user-registration: true - auth-user-registration-role: normal - -# 集群升级部署相关的功能,需要配合夜莺及S3进行使用 -kcm: - enabled: false - s3: - endpoint: s3.didiyunapi.com - access-key: 1234567890 - secret-key: 0987654321 - bucket: logi-kafka - n9e: - base-url: http://127.0.0.1:8004 - user-token: 12345678 - timeout: 300 - account: root - script-file: kcm_script.sh - -# 监控告警相关的功能,需要配合夜莺进行使用 -# enabled: 表示是否开启监控告警的功能, true: 开启, false: 不开启 -# n9e.nid: 夜莺的节点ID -# n9e.user-token: 用户的密钥,在夜莺的个人设置中 -# n9e.mon.base-url: 监控地址 -# n9e.sink.base-url: 数据上报地址 -# n9e.rdb.base-url: 用户资源中心地址 - -monitor: - enabled: false - n9e: - nid: 2 - user-token: 1234567890 - mon: - base-url: http://127.0.0.1:8000 # 夜莺v4版本,默认端口统一调整为了8000 - sink: - base-url: http://127.0.0.1:8000 # 夜莺v4版本,默认端口统一调整为了8000 - rdb: - base-url: http://127.0.0.1:8000 # 夜莺v4版本,默认端口统一调整为了8000 - - -notify: # 通知的功能 - kafka: # 默认通知发送到kafka的指定Topic中 - cluster-id: 95 # Topic的集群ID - topic-name: didi-kafka-notify # Topic名称 - order: # 部署的KM的地址 - detail-url: http://127.0.0.1 - thread-pool: collect-metrics: thread-num: 256 # 收集指标线程池大小 @@ -137,4 +82,51 @@ client-pool: min-idle-client-num: 24 # 最小空闲客户端数 max-idle-client-num: 24 # 最大空闲客户端数 max-total-client-num: 24 # 最大客户端数 - borrow-timeout-unit-ms: 3000 # 租借超时时间,单位毫秒 \ No newline at end of file + borrow-timeout-unit-ms: 3000 # 租借超时时间,单位毫秒 + +account: + ldap: + enabled: false + url: ldap://127.0.0.1:389/ + basedn: dc=tsign,dc=cn + factory: com.sun.jndi.ldap.LdapCtxFactory + filter: sAMAccountName + security: + authentication: simple + principal: cn=admin,dc=tsign,dc=cn + credentials: admin + auth-user-registration: true + auth-user-registration-role: normal + +kcm: + enabled: false + s3: + endpoint: s3.didiyunapi.com + access-key: 1234567890 + secret-key: 0987654321 + bucket: logi-kafka + n9e: + base-url: http://127.0.0.1:8004 + user-token: 12345678 + timeout: 300 + account: root + script-file: kcm_script.sh + +monitor: + enabled: false + n9e: + nid: 2 + user-token: 1234567890 + mon: + base-url: http://127.0.0.1:8000 # 夜莺v4版本,默认端口统一调整为了8000 + sink: + base-url: http://127.0.0.1:8000 # 夜莺v4版本,默认端口统一调整为了8000 + rdb: + base-url: http://127.0.0.1:8000 # 夜莺v4版本,默认端口统一调整为了8000 + +notify: + kafka: + cluster-id: 95 + topic-name: didi-kafka-notify + order: + detail-url: http://127.0.0.1 diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/events/metrics/BaseMetricsCollectedEvent.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/events/metrics/BaseMetricsCollectedEvent.java new file mode 100644 index 00000000..730e14c9 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/events/metrics/BaseMetricsCollectedEvent.java @@ -0,0 +1,33 @@ +package com.xiaojukeji.kafka.manager.common.events.metrics; + +import org.springframework.context.ApplicationEvent; + +/** + * @author zengqiao + * @date 22/01/17 + */ +public class BaseMetricsCollectedEvent extends ApplicationEvent { + /** + * 物理集群ID + */ + protected final Long physicalClusterId; + + /** + * 收集时间,依据业务需要来设置,可以设置任务开始时间,也可以设置任务结束时间 + */ + protected final Long collectTime; + + public BaseMetricsCollectedEvent(Object source, Long physicalClusterId, Long collectTime) { + super(source); + this.physicalClusterId = physicalClusterId; + this.collectTime = collectTime; + } + + public Long getPhysicalClusterId() { + return physicalClusterId; + } + + public Long getCollectTime() { + return collectTime; + } +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/events/metrics/BatchBrokerMetricsCollectedEvent.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/events/metrics/BatchBrokerMetricsCollectedEvent.java new file mode 100644 index 00000000..629a44ea --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/events/metrics/BatchBrokerMetricsCollectedEvent.java @@ -0,0 +1,22 @@ +package com.xiaojukeji.kafka.manager.common.events.metrics; + +import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics; + +import java.util.List; + +/** + * @author zengqiao + * @date 20/8/31 + */ +public class BatchBrokerMetricsCollectedEvent extends BaseMetricsCollectedEvent { + private final List metricsList; + + public BatchBrokerMetricsCollectedEvent(Object source, Long physicalClusterId, Long collectTime, List metricsList) { + super(source, physicalClusterId, collectTime); + this.metricsList = metricsList; + } + + public List getMetricsList() { + return metricsList; + } +} \ No newline at end of file diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/AbstractScheduledTask.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/AbstractScheduledTask.java index 7eddb926..564094d5 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/AbstractScheduledTask.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/AbstractScheduledTask.java @@ -1,7 +1,6 @@ package com.xiaojukeji.kafka.manager.task.component; import com.google.common.collect.Lists; -import com.xiaojukeji.kafka.manager.common.constant.LogConstant; import com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory; import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.NetUtils; @@ -29,7 +28,7 @@ import java.util.concurrent.*; * @date 20/8/10 */ public abstract class AbstractScheduledTask implements SchedulingConfigurer { - private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractScheduledTask.class); @Autowired private HeartbeatDao heartbeatDao; diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/BaseBizTask.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/BaseBizTask.java index 37a36238..b4cfdd47 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/BaseBizTask.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/BaseBizTask.java @@ -1,6 +1,5 @@ package com.xiaojukeji.kafka.manager.task.component; -import com.xiaojukeji.kafka.manager.common.constant.LogConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -9,11 +8,11 @@ import org.slf4j.LoggerFactory; * @date 20/8/10 */ public class BaseBizTask implements Runnable { - private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractScheduledTask.class); - private E task; + private final E task; - private AbstractScheduledTask scheduledTask; + private final AbstractScheduledTask scheduledTask; public BaseBizTask(E task, AbstractScheduledTask scheduledTask) { this.task = task; @@ -30,6 +29,7 @@ public class BaseBizTask implements Runnable { } catch (Throwable t) { LOGGER.error("scheduled task scheduleName:{} execute failed, task:{}", scheduledTask.getScheduledName(), task, t); } + LOGGER.info("scheduled task scheduleName:{} finished, cost-time:{}ms.", scheduledTask.getScheduledName(), System.currentTimeMillis() - startTime); } } \ No newline at end of file diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishBrokerMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishBrokerMetrics.java new file mode 100644 index 00000000..47aa60d4 --- /dev/null +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishBrokerMetrics.java @@ -0,0 +1,93 @@ +package com.xiaojukeji.kafka.manager.task.dispatch.metrics.collect; + +import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections; +import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics; +import com.xiaojukeji.kafka.manager.common.events.metrics.BatchBrokerMetricsCollectedEvent; +import com.xiaojukeji.kafka.manager.common.utils.SpringTool; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.service.ClusterService; +import com.xiaojukeji.kafka.manager.service.service.JmxService; +import com.xiaojukeji.kafka.manager.service.strategy.AbstractHealthScoreStrategy; +import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask; +import com.xiaojukeji.kafka.manager.task.component.CustomScheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; + +import java.util.ArrayList; +import java.util.List; + +/** + * Broker指标信息收集 + * @author zengqiao + * @date 20/5/7 + */ +@CustomScheduled(name = "collectAndPublishBrokerMetrics", cron = "21 0/1 * * * ?", threadNum = 2) +@ConditionalOnProperty(prefix = "task.metrics.collect", name = "broker-metrics-enabled", havingValue = "true", matchIfMissing = true) +public class CollectAndPublishBrokerMetrics extends AbstractScheduledTask { + private static final Logger LOGGER = LoggerFactory.getLogger(CollectAndPublishBrokerMetrics.class); + + @Autowired + private JmxService jmxService; + + @Autowired + private ClusterService clusterService; + + @Autowired + private AbstractHealthScoreStrategy healthScoreStrategy; + + @Override + protected List listAllTasks() { + return clusterService.list(); + } + + @Override + public void processTask(ClusterDO clusterDO) { + long startTime = System.currentTimeMillis(); + + try { + SpringTool.publish(new BatchBrokerMetricsCollectedEvent( + this, + clusterDO.getId(), + startTime, + this.getBrokerMetrics(clusterDO.getId())) + ); + } catch (Exception e) { + LOGGER.error("collect broker-metrics failed, physicalClusterId:{}.", clusterDO.getId(), e); + } + + LOGGER.info("collect broker-metrics finished, physicalClusterId:{} costTime:{}", clusterDO.getId(), System.currentTimeMillis() - startTime); + } + + private List getBrokerMetrics(Long clusterId) { + List metricsList = new ArrayList<>(); + for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterId)) { + BrokerMetrics metrics = jmxService.getBrokerMetrics( + clusterId, + brokerId, + KafkaMetricsCollections.BROKER_TO_DB_METRICS + ); + + if (ValidateUtils.isNull(metrics)) { + continue; + } + + metrics.getMetricsMap().put( + JmxConstant.HEALTH_SCORE, + healthScoreStrategy.calBrokerHealthScore(clusterId, brokerId, metrics) + ); + + metricsList.add(metrics); + } + + if (ValidateUtils.isEmptyList(metricsList)) { + return new ArrayList<>(); + } + + return metricsList; + } +} \ No newline at end of file diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java index 16c2a012..89d7e516 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java @@ -42,25 +42,25 @@ public class DeleteMetrics extends AbstractScheduledTask { @Autowired private TopicThrottledMetricsDao topicThrottledMetricsDao; - @Value(value = "${task.metrics.delete-metrics.delete-limit-size:1000}") + @Value(value = "${task.metrics.delete.delete-limit-size:1000}") private Integer deleteLimitSize; - @Value(value = "${task.metrics.delete-metrics.cluster-metrics-save-days:14}") + @Value(value = "${task.metrics.delete.cluster-metrics-save-days:14}") private Integer clusterMetricsSaveDays; - @Value(value = "${task.metrics.delete-metrics.broker-metrics-save-days:14}") + @Value(value = "${task.metrics.delete.broker-metrics-save-days:14}") private Integer brokerMetricsSaveDays; - @Value(value = "${task.metrics.delete-metrics.topic-metrics-save-days:7}") + @Value(value = "${task.metrics.delete.topic-metrics-save-days:7}") private Integer topicMetricsSaveDays; - @Value(value = "${task.metrics.delete-metrics.topic-request-time-metrics-save-days:7}") + @Value(value = "${task.metrics.delete.topic-request-time-metrics-save-days:7}") private Integer topicRequestTimeMetricsSaveDays; - @Value(value = "${task.metrics.delete-metrics.topic-throttled-metrics-save-days:7}") + @Value(value = "${task.metrics.delete.topic-throttled-metrics-save-days:7}") private Integer topicThrottledMetricsSaveDays; - @Value(value = "${task.metrics.delete-metrics.app-topic-metrics-save-days:7}") + @Value(value = "${task.metrics.delete.app-topic-metrics-save-days:7}") private Integer appTopicMetricsSaveDays; @Override diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java deleted file mode 100644 index 22aeaf2a..00000000 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java +++ /dev/null @@ -1,146 +0,0 @@ -package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store; - -import com.xiaojukeji.kafka.manager.common.constant.Constant; -import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections; -import com.xiaojukeji.kafka.manager.common.constant.LogConstant; -import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; -import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics; -import com.xiaojukeji.kafka.manager.common.entity.metrics.ClusterMetrics; -import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; -import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant; -import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; -import com.xiaojukeji.kafka.manager.dao.BrokerMetricsDao; -import com.xiaojukeji.kafka.manager.dao.ClusterMetricsDao; -import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerMetricsDO; -import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; -import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterMetricsDO; -import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; -import com.xiaojukeji.kafka.manager.service.service.ClusterService; -import com.xiaojukeji.kafka.manager.service.service.JmxService; -import com.xiaojukeji.kafka.manager.service.strategy.AbstractHealthScoreStrategy; -import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils; -import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask; -import com.xiaojukeji.kafka.manager.task.component.CustomScheduled; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * Broker指标信息存DB, Broker流量, 集群流量 - * @author zengqiao - * @date 20/5/7 - */ -@CustomScheduled(name = "storeBrokerMetrics", cron = "21 0/1 * * * ?", threadNum = 2) -@ConditionalOnProperty(prefix = "custom.store-metrics-task.community", name = "broker-metrics-enabled", havingValue = "true", matchIfMissing = true) -public class StoreBrokerMetrics extends AbstractScheduledTask { - private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); - - @Autowired - private JmxService jmxService; - - @Autowired - private ClusterService clusterService; - - @Autowired - private BrokerMetricsDao brokerMetricsDao; - - @Autowired - private ClusterMetricsDao clusterMetricsDao; - - @Autowired - private AbstractHealthScoreStrategy healthScoreStrategy; - - @Override - protected List listAllTasks() { - return clusterService.list(); - } - - @Override - public void processTask(ClusterDO clusterDO) { - long startTime = System.currentTimeMillis(); - List clusterMetricsList = new ArrayList<>(); - - try { - List brokerMetricsList = getAndBatchAddMetrics(startTime, clusterDO.getId()); - clusterMetricsList.add(supplyAndConvert2ClusterMetrics( - clusterDO.getId(), - MetricsConvertUtils.merge2BaseMetricsByAdd(brokerMetricsList)) - ); - } catch (Exception t) { - LOGGER.error("collect failed, clusterId:{}.", clusterDO.getId(), t); - } - long endTime = System.currentTimeMillis(); - LOGGER.info("collect finish, clusterId:{} costTime:{}", clusterDO.getId(), endTime - startTime); - - List doList = MetricsConvertUtils.convertAndUpdateCreateTime2ClusterMetricsDOList( - startTime, - clusterMetricsList - ); - - if (ValidateUtils.isEmptyList(doList)) { - return; - } - - clusterMetricsDao.batchAdd(doList); - } - - private List getAndBatchAddMetrics(Long startTime, Long clusterId) { - List metricsList = new ArrayList<>(); - for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterId)) { - BrokerMetrics metrics = jmxService.getBrokerMetrics( - clusterId, - brokerId, - KafkaMetricsCollections.BROKER_TO_DB_METRICS - ); - if (ValidateUtils.isNull(metrics)) { - continue; - } - metrics.getMetricsMap().put( - JmxConstant.HEALTH_SCORE, - healthScoreStrategy.calBrokerHealthScore(clusterId, brokerId, metrics) - ); - metricsList.add(metrics); - } - if (ValidateUtils.isEmptyList(metricsList)) { - return new ArrayList<>(); - } - - List doList = - MetricsConvertUtils.convertAndUpdateCreateTime2BrokerMetricsDOList(startTime, metricsList); - int i = 0; - do { - List subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())); - if (ValidateUtils.isEmptyList(subDOList)) { - break; - } - - brokerMetricsDao.batchAdd(subDOList); - i += Constant.BATCH_INSERT_SIZE; - } while (i < doList.size()); - - return metricsList; - } - - private ClusterMetrics supplyAndConvert2ClusterMetrics(Long clusterId, BaseMetrics baseMetrics) { - ClusterMetrics metrics = new ClusterMetrics(clusterId); - Map metricsMap = metrics.getMetricsMap(); - metricsMap.putAll(baseMetrics.getMetricsMap()); - metricsMap.put(JmxConstant.TOPIC_NUM, PhysicalClusterMetadataManager.getTopicNameList(clusterId).size()); - metricsMap.put(JmxConstant.BROKER_NUM, PhysicalClusterMetadataManager.getBrokerIdList(clusterId).size()); - Integer partitionNum = 0; - for (String topicName : PhysicalClusterMetadataManager.getTopicNameList(clusterId)) { - TopicMetadata topicMetaData = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName); - if (ValidateUtils.isNull(topicMetaData)) { - continue; - } - partitionNum += topicMetaData.getPartitionNum(); - } - metricsMap.put(JmxConstant.PARTITION_NUM, partitionNum); - return metrics; - } -} \ No newline at end of file diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkBrokerMetrics2DB.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkBrokerMetrics2DB.java new file mode 100644 index 00000000..923d26b6 --- /dev/null +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkBrokerMetrics2DB.java @@ -0,0 +1,55 @@ +package com.xiaojukeji.kafka.manager.task.listener.sink.db; + +import com.xiaojukeji.kafka.manager.common.constant.Constant; +import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics; +import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerMetricsDO; +import com.xiaojukeji.kafka.manager.common.events.metrics.BatchBrokerMetricsCollectedEvent; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.dao.BrokerMetricsDao; +import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author zengqiao + * @date 22/01/17 + */ +@Component +@ConditionalOnProperty(prefix = "task.metrics.sink.broker-metrics", name = "sink-db-enabled", havingValue = "true", matchIfMissing = true) +public class SinkBrokerMetrics2DB implements ApplicationListener { + private static final Logger logger = LoggerFactory.getLogger(SinkBrokerMetrics2DB.class); + + @Autowired + private BrokerMetricsDao metricsDao; + + @Override + public void onApplicationEvent(BatchBrokerMetricsCollectedEvent event) { + logger.debug("sink broker-metrics to db start, event:{}.", event); + + List metricsList = event.getMetricsList(); + if (ValidateUtils.isEmptyList(metricsList)) { + logger.warn("sink broker-metrics to db finished, without need sink, event:{}.", event); + return; + } + + List doList = MetricsConvertUtils.convertAndUpdateCreateTime2BrokerMetricsDOList(event.getCollectTime(), metricsList); + int i = 0; + while (i < doList.size()) { + List subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())); + if (ValidateUtils.isEmptyList(subDOList)) { + break; + } + + metricsDao.batchAdd(subDOList); + i += Constant.BATCH_INSERT_SIZE; + } + + logger.debug("sink broker-metrics to db finished, event:{}.", event); + } +} \ No newline at end of file diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkClusterMetrics2DB.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkClusterMetrics2DB.java new file mode 100644 index 00000000..a1aab09c --- /dev/null +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkClusterMetrics2DB.java @@ -0,0 +1,80 @@ +package com.xiaojukeji.kafka.manager.task.listener.sink.db; + +import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; +import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics; +import com.xiaojukeji.kafka.manager.common.entity.metrics.ClusterMetrics; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterMetricsDO; +import com.xiaojukeji.kafka.manager.common.events.metrics.BatchBrokerMetricsCollectedEvent; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant; +import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; +import com.xiaojukeji.kafka.manager.dao.ClusterMetricsDao; +import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * @author zengqiao + * @date 22/01/17 + */ +@Component +@ConditionalOnProperty(prefix = "task.metrics.sink.cluster-metrics", name = "sink-db-enabled", havingValue = "true", matchIfMissing = true) +public class SinkClusterMetrics2DB implements ApplicationListener { + private static final Logger logger = LoggerFactory.getLogger(SinkClusterMetrics2DB.class); + + @Autowired + private ClusterMetricsDao clusterMetricsDao; + + @Override + public void onApplicationEvent(BatchBrokerMetricsCollectedEvent event) { + logger.debug("sink cluster-metrics to db start, event:{}.", event); + + List metricsList = event.getMetricsList(); + if (ValidateUtils.isEmptyList(metricsList)) { + logger.warn("sink cluster-metrics to db finished, without need sink, event:{}.", event); + return; + } + + List doList = MetricsConvertUtils.convertAndUpdateCreateTime2ClusterMetricsDOList( + event.getCollectTime(), + // 合并broker-metrics为cluster-metrics + Arrays.asList(supplyAndConvert2ClusterMetrics(event.getPhysicalClusterId(), MetricsConvertUtils.merge2BaseMetricsByAdd(event.getMetricsList()))) + ); + + if (ValidateUtils.isEmptyList(doList)) { + logger.warn("sink cluster-metrics to db finished, without need sink, event:{}.", event); + return; + } + + clusterMetricsDao.batchAdd(doList); + + logger.debug("sink cluster-metrics to db finished, event:{}.", event); + } + + private ClusterMetrics supplyAndConvert2ClusterMetrics(Long clusterId, BaseMetrics baseMetrics) { + ClusterMetrics metrics = new ClusterMetrics(clusterId); + Map metricsMap = metrics.getMetricsMap(); + metricsMap.putAll(baseMetrics.getMetricsMap()); + metricsMap.put(JmxConstant.TOPIC_NUM, PhysicalClusterMetadataManager.getTopicNameList(clusterId).size()); + metricsMap.put(JmxConstant.BROKER_NUM, PhysicalClusterMetadataManager.getBrokerIdList(clusterId).size()); + Integer partitionNum = 0; + for (String topicName : PhysicalClusterMetadataManager.getTopicNameList(clusterId)) { + TopicMetadata topicMetaData = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName); + if (ValidateUtils.isNull(topicMetaData)) { + continue; + } + partitionNum += topicMetaData.getPartitionNum(); + } + metricsMap.put(JmxConstant.PARTITION_NUM, partitionNum); + return metrics; + } +} \ No newline at end of file diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreCommunityTopicMetrics2DB.java similarity index 97% rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreCommunityTopicMetrics2DB.java index f75368d2..267e32b7 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreCommunityTopicMetrics2DB.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.kafka.manager.task.listener; +package com.xiaojukeji.kafka.manager.task.listener.sink.db; import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.LogConstant; diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreTopicThrottledMetrics2DB.java similarity index 98% rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreTopicThrottledMetrics2DB.java index e94a2793..c2d74df3 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreTopicThrottledMetrics2DB.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.kafka.manager.task.listener; +package com.xiaojukeji.kafka.manager.task.listener.sink.db; import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum; import com.xiaojukeji.kafka.manager.common.constant.Constant; diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Kafka.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkCommunityTopicMetrics2Kafka.java similarity index 98% rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Kafka.java rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkCommunityTopicMetrics2Kafka.java index ad80ceb2..5f3a0e5c 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Kafka.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkCommunityTopicMetrics2Kafka.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.kafka.manager.task.listener; +package com.xiaojukeji.kafka.manager.task.listener.sink.kafka; import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant; import com.xiaojukeji.kafka.manager.common.constant.LogConstant; diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Kafka.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkConsumerMetrics2Kafka.java similarity index 98% rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Kafka.java rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkConsumerMetrics2Kafka.java index 7070dae1..eb6c2d37 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Kafka.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkConsumerMetrics2Kafka.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.kafka.manager.task.listener; +package com.xiaojukeji.kafka.manager.task.listener.sink.kafka; import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant; import com.xiaojukeji.kafka.manager.common.constant.LogConstant; diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Monitor.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkCommunityTopicMetrics2Monitor.java similarity index 98% rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Monitor.java rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkCommunityTopicMetrics2Monitor.java index e2ac74a9..80b3eccd 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Monitor.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkCommunityTopicMetrics2Monitor.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.kafka.manager.task.listener; +package com.xiaojukeji.kafka.manager.task.listener.sink.monitor; import com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum; import com.xiaojukeji.kafka.manager.common.constant.LogConstant; diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Monitor.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkConsumerMetrics2Monitor.java similarity index 99% rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Monitor.java rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkConsumerMetrics2Monitor.java index 4ca276f9..a5c2e008 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Monitor.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkConsumerMetrics2Monitor.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.kafka.manager.task.listener; +package com.xiaojukeji.kafka.manager.task.listener.sink.monitor; import com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum; import com.xiaojukeji.kafka.manager.common.constant.LogConstant; diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkTopicThrottledMetrics2Monitor.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkTopicThrottledMetrics2Monitor.java similarity index 98% rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkTopicThrottledMetrics2Monitor.java rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkTopicThrottledMetrics2Monitor.java index fb95947c..ff1cb823 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkTopicThrottledMetrics2Monitor.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkTopicThrottledMetrics2Monitor.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.kafka.manager.task.listener; +package com.xiaojukeji.kafka.manager.task.listener.sink.monitor; import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum; import com.xiaojukeji.kafka.manager.monitor.common.MonitorSinkConstant; diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 3cce7463..4f83afb5 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -33,7 +33,6 @@ custom: idc: cn store-metrics-task: community: - broker-metrics-enabled: true topic-metrics-enabled: true didi: # 滴滴Kafka特有的指标 app-topic-metrics-enabled: false @@ -43,13 +42,20 @@ custom: # 任务相关的配置 task: op: - sync-topic-enabled: false # 未落盘的Topic定期同步到DB中 - order-auto-exec: # 工单自动化审批线程的开关 - topic-enabled: false # Topic工单自动化审批开关, false:关闭自动化审批, true:开启 - app-enabled: false # App工单自动化审批开关, false:关闭自动化审批, true:开启 + sync-topic-enabled: false # 未落盘的Topic定期同步到DB中 + order-auto-exec: # 工单自动化审批线程的开关 + topic-enabled: false # Topic工单自动化审批开关, false:关闭自动化审批, true:开启 + app-enabled: false # App工单自动化审批开关, false:关闭自动化审批, true:开启 metrics: - delete-metrics: - delete-limit-size: 1000 + collect: # 收集指标 + broker-metrics-enabled: true # 收集Broker指标 + sink: # 上报指标 + cluster-metrics: # 上报cluster指标 + sink-db-enabled: true # 上报到db + broker-metrics: # 上报broker指标 + sink-db-enabled: true # 上报到db + delete: # 删除指标 + delete-limit-size: 1000 # 单次删除的批大小 cluster-metrics-save-days: 14 # 集群指标保存天数 broker-metrics-save-days: 14 # Broker指标保存天数 topic-metrics-save-days: 7 # Topic指标保存天数 @@ -57,6 +63,21 @@ task: topic-throttled-metrics-save-days: 7 # Topic限流指标保存天数 app-topic-metrics-save-days: 7 # App+Topic指标保存天数 +thread-pool: + collect-metrics: + thread-num: 256 # 收集指标线程池大小 + queue-size: 5000 # 收集指标线程池的queue大小 + api-call: + thread-num: 16 # api服务线程池大小 + queue-size: 5000 # api服务线程池的queue大小 + +client-pool: + kafka-consumer: + min-idle-client-num: 24 # 最小空闲客户端数 + max-idle-client-num: 24 # 最大空闲客户端数 + max-total-client-num: 24 # 最大客户端数 + borrow-timeout-unit-ms: 3000 # 租借超时时间,单位毫秒 + account: ldap: enabled: false @@ -103,18 +124,3 @@ notify: topic-name: didi-kafka-notify order: detail-url: http://127.0.0.1 - -thread-pool: - collect-metrics: - thread-num: 256 # 收集指标线程池大小 - queue-size: 5000 # 收集指标线程池的queue大小 - api-call: - thread-num: 16 # api服务线程池大小 - queue-size: 5000 # api服务线程池的queue大小 - -client-pool: - kafka-consumer: - min-idle-client-num: 24 # 最小空闲客户端数 - max-idle-client-num: 24 # 最大空闲客户端数 - max-total-client-num: 24 # 最大客户端数 - borrow-timeout-unit-ms: 3000 # 租借超时时间,单位毫秒 From f3f0432c65ed4e5cb68aa2c4f09ffed95fcab8c5 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 17 Jan 2022 20:41:53 +0800 Subject: [PATCH 16/16] =?UTF-8?q?1.=E5=AE=89=E8=A3=85=E9=83=A8=E7=BD=B2?= =?UTF-8?q?=E8=84=9A=E6=9C=ACLogiKM=E5=8F=AF=E9=85=8D=E7=BD=AE;=202.?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=BD=91=E5=85=B3=E6=8E=A5=E5=8F=A3=E5=8F=8A?= =?UTF-8?q?=E7=AC=AC=E4=B8=89=E6=96=B9=E6=8E=A5=E5=8F=A3=E5=8F=AF=E7=9B=B4?= =?UTF-8?q?=E6=8E=A5=E8=B0=83=E7=94=A8=E7=9A=84=E5=BC=80=E5=85=B3;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- distribution/conf/application.yml.example | 22 +++++++++++-------- .../manager/common/constant/ApiPrefix.java | 6 ----- .../account/impl/LoginServiceImpl.java | 15 ++++++++++--- .../manager/kcm/component/agent/n9e/N9e.java | 16 +++++++++----- .../src/main/resources/kcm_script.sh | 12 +++++----- .../thirdpart/ThirdPartBrokerController.java | 6 ++--- .../src/main/resources/application.yml | 22 +++++++++++-------- 7 files changed, 58 insertions(+), 41 deletions(-) diff --git a/distribution/conf/application.yml.example b/distribution/conf/application.yml.example index ee0290d8..f777ce31 100644 --- a/distribution/conf/application.yml.example +++ b/distribution/conf/application.yml.example @@ -85,6 +85,9 @@ client-pool: borrow-timeout-unit-ms: 3000 # 租借超时时间,单位毫秒 account: + jump-login: + gateway-api: false # 网关接口 + third-part-api: false # 第三方接口 ldap: enabled: false url: ldap://127.0.0.1:389/ @@ -98,19 +101,20 @@ account: auth-user-registration: true auth-user-registration-role: normal -kcm: - enabled: false - s3: +kcm: # 集群安装部署,仅安装broker + enabled: false # 是否开启 + s3: # s3 存储服务 endpoint: s3.didiyunapi.com access-key: 1234567890 secret-key: 0987654321 bucket: logi-kafka - n9e: - base-url: http://127.0.0.1:8004 - user-token: 12345678 - timeout: 300 - account: root - script-file: kcm_script.sh + n9e: # 夜莺 + base-url: http://127.0.0.1:8004 # 夜莺job服务地址 + user-token: 12345678 # 用户的token + timeout: 300 # 当台操作的超时时间 + account: root # 操作时使用的账号 + script-file: kcm_script.sh # 脚本,已内置好,在源码的kcm模块内,此处配置无需修改 + logikm-url: http://127.0.0.1:8080 # logikm部署地址,部署时kcm_script.sh会调用logikm检查部署中的一些状态 monitor: enabled: false diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java index b0f84405..5422076c 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java @@ -20,12 +20,6 @@ public class ApiPrefix { // open public static final String API_V1_THIRD_PART_PREFIX = API_V1_PREFIX + "third-part/"; - // 开放给OP的接口, 后续对 应的接口的集群都需要是物理集群 - public static final String API_V1_THIRD_PART_OP_PREFIX = API_V1_THIRD_PART_PREFIX + "op/"; - - // 开放给Normal的接口, 后续对应的接口的集群,都需要是逻辑集群 - public static final String API_V1_THIRD_PART_NORMAL_PREFIX = API_V1_THIRD_PART_PREFIX + "normal/"; - // gateway public static final String GATEWAY_API_V1_PREFIX = "/gateway" + API_V1_PREFIX; diff --git a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/LoginServiceImpl.java b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/LoginServiceImpl.java index f49f7dca..f0299d87 100644 --- a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/LoginServiceImpl.java +++ b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/LoginServiceImpl.java @@ -14,6 +14,7 @@ import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.servlet.http.Cookie; @@ -27,7 +28,13 @@ import javax.servlet.http.HttpSession; */ @Service("loginService") public class LoginServiceImpl implements LoginService { - private final static Logger LOGGER = LoggerFactory.getLogger(LoginServiceImpl.class); + private static final Logger LOGGER = LoggerFactory.getLogger(LoginServiceImpl.class); + + @Value(value = "${account.jump-login.gateway-api:false}") + private Boolean jumpLoginGatewayApi; + + @Value(value = "${account.jump-login.third-part-api:false}") + private Boolean jumpLoginThirdPartApi; @Autowired private AccountService accountService; @@ -75,8 +82,10 @@ public class LoginServiceImpl implements LoginService { return false; } - if (classRequestMappingValue.equals(ApiPrefix.API_V1_SSO_PREFIX)) { - // 白名单接口直接true + if (classRequestMappingValue.equals(ApiPrefix.API_V1_SSO_PREFIX) || + (jumpLoginGatewayApi != null && jumpLoginGatewayApi && classRequestMappingValue.equals(ApiPrefix.GATEWAY_API_V1_PREFIX)) || + (jumpLoginThirdPartApi != null && jumpLoginThirdPartApi && classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_PREFIX))) { + // 登录接口 or 允许跳过且是跳过类型的接口,则直接跳过登录 return true; } diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java index 6e3fa677..d0a2503b 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java @@ -37,21 +37,24 @@ import java.util.Map; public class N9e extends AbstractAgent { private static final Logger LOGGER = LoggerFactory.getLogger(N9e.class); - @Value("${kcm.n9e.base-url}") + @Value("${kcm.n9e.base-url:}") private String baseUrl; - @Value("${kcm.n9e.user-token}") + @Value("${kcm.n9e.user-token:12345678}") private String userToken; - @Value("${kcm.n9e.account}") + @Value("${kcm.n9e.account:root}") private String account; - @Value("${kcm.n9e.timeout}") + @Value("${kcm.n9e.timeout:300}") private Integer timeout; - @Value("${kcm.n9e.script-file}") + @Value("${kcm.n9e.script-file:kcm_script.sh}") private String scriptFile; + @Value("${kcm.n9e.logikm-url:}") + private String logiKMUrl; + private String script; private static final String CREATE_TASK_URI = "/api/job-ce/tasks"; @@ -219,7 +222,8 @@ public class N9e extends AbstractAgent { sb.append(creationTaskData.getKafkaPackageUrl()).append(",,"); sb.append(creationTaskData.getServerPropertiesName().replace(KafkaFileEnum.SERVER_CONFIG.getSuffix(), "")).append(",,"); sb.append(creationTaskData.getServerPropertiesMd5()).append(",,"); - sb.append(creationTaskData.getServerPropertiesUrl()); + sb.append(creationTaskData.getServerPropertiesUrl()).append(",,"); + sb.append(this.logiKMUrl); N9eCreationTask n9eCreationTask = new N9eCreationTask(); n9eCreationTask.setTitle(Constant.TASK_TITLE_PREFIX + "-集群ID:" + creationTaskData.getClusterId()); diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh b/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh index ffd54a20..16ffb80c 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh @@ -18,12 +18,13 @@ p_kafka_server_properties_name=${7} #server配置名 p_kafka_server_properties_md5=${8} #server配置MD5 p_kafka_server_properties_url=${9} #server配置文件下载地址 +p_kafka_manager_url=${10} #LogiKM地址 + #----------------------------------------配置信息------------------------------------------------------# g_base_dir='/home' g_cluster_task_dir=${g_base_dir}"/kafka_cluster_task/task_${p_task_id}" #部署升级路径 g_rollback_version=${g_cluster_task_dir}"/rollback_version" #回滚版本 g_new_kafka_package_name='' #最终的包名 -g_kafka_manager_addr='' #kafka-manager地址 g_local_ip=`ifconfig -a|grep inet|grep -v 127.0.0.1|grep -v inet6|awk '{print $2}'|tr -d "addr:"` g_hostname=${g_local_ip} @@ -47,7 +48,7 @@ function dchat_alarm() { # 检查并初始化环境 function check_and_init_env() { - if [ -z "${p_task_id}" -o -z "${p_cluster_task_type}" -o -z "${p_kafka_package_url}" -o -z "${p_cluster_id}" -o -z "${p_kafka_package_name}" -o -z "${p_kafka_package_md5}" -o -z "${p_kafka_server_properties_name}" -o -z "${p_kafka_server_properties_md5}" ]; then + if [ -z "${p_task_id}" -o -z "${p_cluster_task_type}" -o -z "${p_kafka_package_url}" -o -z "${p_cluster_id}" -o -z "${p_kafka_package_name}" -o -z "${p_kafka_package_md5}" -o -z "${p_kafka_server_properties_name}" -o -z "${p_kafka_server_properties_md5}" -o -z "${p_kafka_manager_url}" ]; then ECHO_LOG "存在为空的参数不合法, 退出集群任务" dchat_alarm "存在为空的参数不合法, 退出集群任务" exit 1 @@ -72,11 +73,11 @@ function check_and_init_env() { # 检查并等待集群所有的副本处于同步的状态 function check_and_wait_broker_stabled() { - under_replication_count=`curl -s -G -d "hostname="${g_hostname} ${g_kafka_manager_addr}/api/v1/third-part/${p_cluster_id}/broker-stabled | python -m json.tool | grep true |wc -l` + under_replication_count=`curl -s -G -d "hostname="${g_hostname} ${p_kafka_manager_url}/api/v1/third-part/${p_cluster_id}/broker-stabled | python -m json.tool | grep true |wc -l` while [ "$under_replication_count" -ne 1 ]; do ECHO_LOG "存在${under_replication_count}个副本未同步, sleep 10s" sleep 10 - under_replication_count=`curl -s -G -d "hostname="${g_hostname} ${g_kafka_manager_addr}/api/v1/third-part/${p_cluster_id}/broker-stabled | python -m json.tool | grep true |wc -l` + under_replication_count=`curl -s -G -d "hostname="${g_hostname} ${p_kafka_manager_url}/api/v1/third-part/${p_cluster_id}/broker-stabled | python -m json.tool | grep true |wc -l` done ECHO_LOG "集群副本都已经处于同步的状态, 可以进行集群升级" } @@ -324,6 +325,7 @@ ECHO_LOG " p_kafka_package_name=${p_kafka_package_name}" ECHO_LOG " p_kafka_package_md5=${p_kafka_package_md5}" ECHO_LOG " p_kafka_server_properties_name=${p_kafka_server_properties_name}" ECHO_LOG " p_kafka_server_properties_md5=${p_kafka_server_properties_md5}" +ECHO_LOG " p_kafka_manager_url=${p_kafka_manager_url}" @@ -342,7 +344,7 @@ fi ECHO_LOG "停kafka服务" stop_kafka_server -ECHO_LOG "停5秒, 确保" +ECHO_LOG "再停5秒, 确保端口已释放" sleep 5 if [ "${p_cluster_task_type}" == "0" ];then diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartBrokerController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartBrokerController.java index 790b85be..8469afec 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartBrokerController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartBrokerController.java @@ -32,7 +32,7 @@ import java.util.stream.Collectors; */ @Api(tags = "开放接口-Broker相关接口(REST)") @RestController -@RequestMapping(ApiPrefix.API_V1_THIRD_PART_OP_PREFIX) +@RequestMapping(ApiPrefix.API_V1_THIRD_PART_PREFIX) public class ThirdPartBrokerController { @Autowired private BrokerService brokerService; @@ -44,7 +44,7 @@ public class ThirdPartBrokerController { private ClusterService clusterService; @ApiOperation(value = "Broker信息概览", notes = "") - @RequestMapping(value = "{clusterId}/brokers/{brokerId}/overview", method = RequestMethod.GET) + @GetMapping(value = "{clusterId}/brokers/{brokerId}/overview") @ResponseBody public Result getBrokerOverview(@PathVariable Long clusterId, @PathVariable Integer brokerId) { @@ -70,7 +70,7 @@ public class ThirdPartBrokerController { } @ApiOperation(value = "BrokerRegion信息", notes = "所有集群的") - @RequestMapping(value = "broker-regions", method = RequestMethod.GET) + @GetMapping(value = "broker-regions") @ResponseBody public Result> getBrokerRegions() { List clusterDOList = clusterService.list(); diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 4f83afb5..9cd51d46 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -79,6 +79,9 @@ client-pool: borrow-timeout-unit-ms: 3000 # 租借超时时间,单位毫秒 account: + jump-login: + gateway-api: false # 网关接口 + third-part-api: false # 第三方接口 ldap: enabled: false url: ldap://127.0.0.1:389/ @@ -92,19 +95,20 @@ account: auth-user-registration: true auth-user-registration-role: normal -kcm: - enabled: false - s3: +kcm: # 集群安装部署,仅安装broker + enabled: false # 是否开启 + s3: # s3 存储服务 endpoint: s3.didiyunapi.com access-key: 1234567890 secret-key: 0987654321 bucket: logi-kafka - n9e: - base-url: http://127.0.0.1:8004 - user-token: 12345678 - timeout: 300 - account: root - script-file: kcm_script.sh + n9e: # 夜莺 + base-url: http://127.0.0.1:8004 # 夜莺job服务地址 + user-token: 12345678 # 用户的token + timeout: 300 # 当台操作的超时时间 + account: root # 操作时使用的账号 + script-file: kcm_script.sh # 脚本,已内置好,在源码的kcm模块内,此处配置无需修改 + logikm-url: http://127.0.0.1:8080 # logikm部署地址,部署时kcm_script.sh会调用logikm检查部署中的一些状态 monitor: enabled: false