From b4cc31c459683727cc9986a4b4a27c0f0ccc441d Mon Sep 17 00:00:00 2001 From: zengqiao Date: Sat, 8 Oct 2022 15:31:59 +0800 Subject: [PATCH] =?UTF-8?q?ZK-=E6=8C=87=E6=A0=87=E9=87=87=E9=9B=86?= =?UTF-8?q?=E5=85=A5ES?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../metric/ZookeeperMetricCollector.java | 122 ++++++++ .../sink/ZookeeperMetricESSender.java | 28 ++ .../bean/entity/metrics/ZookeeperMetrics.java | 28 ++ .../param/metric/ZookeeperMetricParam.java | 47 +++ .../version/VersionMetricControlItem.java | 2 + .../bean/event/metric/BaseMetricEvent.java | 2 - .../event/metric/ZookeeperMetricEvent.java | 20 ++ .../bean/po/metrice/ZookeeperMetricPO.java | 24 ++ .../vo/metrics/line/MetricMultiLinesVO.java | 19 -- .../km/common/constant/ESConstant.java | 2 + .../km/common/constant/ESIndexConstant.java | 85 ++++++ .../enums/version/VersionItemTypeEnum.java | 4 +- .../streaming/km/common/jmx/JmxAttribute.java | 6 + .../know/streaming/km/common/jmx/JmxName.java | 6 + .../broker/impl/BrokerServiceImpl.java | 12 +- .../metrics/ZookeeperMetricVersionItems.java | 141 +++++++++ .../zookeeper/ZookeeperMetricService.java | 21 ++ .../impl/ZookeeperMetricServiceImpl.java | 281 ++++++++++++++++++ .../init/template/ks_kafka_zookeeper_metric | 85 ++++++ .../persistence/es/dao/BaseMetricESDAO.java | 3 +- .../es/dao/ZookeeperMetricESDAO.java | 106 +++++++ .../km/persistence/es/dsls/DslsConstant.java | 2 + .../streaming/km/persistence/jmx/JmxDAO.java | 4 +- .../km/persistence/jmx/impl/JmxDAOImpl.java | 41 ++- .../getAggListZookeeperMetrics | 44 +++ .../metrics/ZookeeperMetricCollectorTask.java | 33 ++ 26 files changed, 1126 insertions(+), 42 deletions(-) create mode 100644 km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/ZookeeperMetricCollector.java create mode 100644 km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ZookeeperMetricESSender.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/ZookeeperMetrics.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/metric/ZookeeperMetricParam.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/ZookeeperMetricEvent.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/metrice/ZookeeperMetricPO.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ZookeeperMetricVersionItems.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperMetricService.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperMetricServiceImpl.java create mode 100644 km-dist/init/template/ks_kafka_zookeeper_metric create mode 100644 km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ZookeeperMetricESDAO.java create mode 100644 km-persistence/src/main/resources/dsl/ZookeeperMetricESDAO/getAggListZookeeperMetrics create mode 100644 km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ZookeeperMetricCollectorTask.java diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/ZookeeperMetricCollector.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/ZookeeperMetricCollector.java new file mode 100644 index 00000000..37f86d4e --- /dev/null +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/ZookeeperMetricCollector.java @@ -0,0 +1,122 @@ +package com.xiaojukeji.know.streaming.km.collector.metric; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ZookeeperMetricEvent; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO; +import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; +import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_ZOOKEEPER; + +/** + * @author didi + */ +@Component +public class ZookeeperMetricCollector extends AbstractMetricCollector { + protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + + @Autowired + private VersionControlService versionControlService; + + @Autowired + private ZookeeperMetricService zookeeperMetricService; + + @Autowired + private ZookeeperService zookeeperService; + + @Autowired + private KafkaControllerService kafkaControllerService; + + @Override + public void collectMetrics(ClusterPhy clusterPhy) { + Long startTime = System.currentTimeMillis(); + Long clusterPhyId = clusterPhy.getId(); + List items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); + List aliveZKList = zookeeperService.listFromDBByCluster(clusterPhyId) + .stream() + .filter(elem -> Constant.ALIVE.equals(elem.getStatus())) + .collect(Collectors.toList()); + KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId); + + ZookeeperMetrics metrics = ZookeeperMetrics.initWithMetric(clusterPhyId, Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (float)Constant.INVALID_CODE); + if (ValidateUtils.isEmptyList(aliveZKList)) { + // 没有存活的ZK时,发布事件,然后直接返回 + publishMetric(new ZookeeperMetricEvent(this, Arrays.asList(metrics))); + return; + } + + // 构造参数 + ZookeeperMetricParam param = new ZookeeperMetricParam( + clusterPhyId, + aliveZKList.stream().map(elem -> new Tuple(elem.getHost(), elem.getPort())).collect(Collectors.toList()), + ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class), + kafkaController == null? Constant.INVALID_CODE: kafkaController.getBrokerId(), + null + ); + + for(VersionControlItem v : items) { + try { + if(null != metrics.getMetrics().get(v.getName())) { + continue; + } + param.setMetricName(v.getName()); + + Result ret = zookeeperMetricService.collectMetricsFromZookeeper(param); + if(null == ret || ret.failed() || null == ret.getData()){ + continue; + } + + metrics.putMetric(ret.getData().getMetrics()); + + if(!EnvUtil.isOnline()){ + LOGGER.info( + "class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||metricName={}||metricValue={}", + clusterPhyId, v.getName(), ConvertUtil.obj2Json(ret.getData().getMetrics()) + ); + } + } catch (Exception e){ + LOGGER.error( + "class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||metricName={}||errMsg=exception!", + clusterPhyId, v.getName(), e + ); + } + } + + metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f); + + publishMetric(new ZookeeperMetricEvent(this, Arrays.asList(metrics))); + + LOGGER.info( + "class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||startTime={}||costTime={}||msg=msg=collect finished.", + clusterPhyId, startTime, System.currentTimeMillis() - startTime + ); + } + + @Override + public VersionItemTypeEnum collectorType() { + return METRIC_ZOOKEEPER; + } +} diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ZookeeperMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ZookeeperMetricESSender.java new file mode 100644 index 00000000..4f9dad53 --- /dev/null +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ZookeeperMetricESSender.java @@ -0,0 +1,28 @@ +package com.xiaojukeji.know.streaming.km.collector.sink; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ZookeeperMetricEvent; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_INDEX; + +@Component +public class ZookeeperMetricESSender extends AbstractMetricESSender implements ApplicationListener { + protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + + @PostConstruct + public void init(){ + LOGGER.info("class=ZookeeperMetricESSender||method=init||msg=init finished"); + } + + @Override + public void onApplicationEvent(ZookeeperMetricEvent event) { + send2es(ZOOKEEPER_INDEX, ConvertUtil.list2List(event.getZookeeperMetrics(), ZookeeperMetricPO.class)); + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/ZookeeperMetrics.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/ZookeeperMetrics.java new file mode 100644 index 00000000..823125b5 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/ZookeeperMetrics.java @@ -0,0 +1,28 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics; + +import lombok.Data; +import lombok.ToString; + +/** + * @author zengqiao + * @date 20/6/17 + */ +@Data +@ToString +public class ZookeeperMetrics extends BaseMetrics { + public ZookeeperMetrics(Long clusterPhyId) { + super(clusterPhyId); + } + + public static ZookeeperMetrics initWithMetric(Long clusterPhyId, String metric, Float value) { + ZookeeperMetrics metrics = new ZookeeperMetrics(clusterPhyId); + metrics.setClusterPhyId( clusterPhyId ); + metrics.putMetric(metric, value); + return metrics; + } + + @Override + public String unique() { + return "ZK@" + clusterPhyId; + } +} \ No newline at end of file diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/metric/ZookeeperMetricParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/metric/ZookeeperMetricParam.java new file mode 100644 index 00000000..ef2b09c8 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/metric/ZookeeperMetricParam.java @@ -0,0 +1,47 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author didi + */ +@Data +@NoArgsConstructor +public class ZookeeperMetricParam extends MetricParam { + private Long clusterPhyId; + + private List> zkAddressList; + + private ZKConfig zkConfig; + + private String metricName; + + private Integer kafkaControllerId; + + public ZookeeperMetricParam(Long clusterPhyId, + List> zkAddressList, + ZKConfig zkConfig, + String metricName) { + this.clusterPhyId = clusterPhyId; + this.zkAddressList = zkAddressList; + this.zkConfig = zkConfig; + this.metricName = metricName; + } + + public ZookeeperMetricParam(Long clusterPhyId, + List> zkAddressList, + ZKConfig zkConfig, + Integer kafkaControllerId, + String metricName) { + this.clusterPhyId = clusterPhyId; + this.zkAddressList = zkAddressList; + this.zkConfig = zkConfig; + this.kafkaControllerId = kafkaControllerId; + this.metricName = metricName; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/version/VersionMetricControlItem.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/version/VersionMetricControlItem.java index c7409104..5c3f6506 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/version/VersionMetricControlItem.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/version/VersionMetricControlItem.java @@ -23,6 +23,8 @@ public class VersionMetricControlItem extends VersionControlItem{ public static final String CATEGORY_PERFORMANCE = "Performance"; public static final String CATEGORY_FLOW = "Flow"; + public static final String CATEGORY_CLIENT = "Client"; + /** * 指标单位名称,非指标的没有 */ diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/BaseMetricEvent.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/BaseMetricEvent.java index df1fe834..cfe5995a 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/BaseMetricEvent.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/BaseMetricEvent.java @@ -8,8 +8,6 @@ import org.springframework.context.ApplicationEvent; */ @Getter public class BaseMetricEvent extends ApplicationEvent { - - public BaseMetricEvent(Object source) { super( source ); } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/ZookeeperMetricEvent.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/ZookeeperMetricEvent.java new file mode 100644 index 00000000..19279d53 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/ZookeeperMetricEvent.java @@ -0,0 +1,20 @@ +package com.xiaojukeji.know.streaming.km.common.bean.event.metric; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; +import lombok.Getter; + +import java.util.List; + +/** + * @author didi + */ +@Getter +public class ZookeeperMetricEvent extends BaseMetricEvent { + + private List zookeeperMetrics; + + public ZookeeperMetricEvent(Object source, List zookeeperMetrics) { + super( source ); + this.zookeeperMetrics = zookeeperMetrics; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/metrice/ZookeeperMetricPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/metrice/ZookeeperMetricPO.java new file mode 100644 index 00000000..96921739 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/metrice/ZookeeperMetricPO.java @@ -0,0 +1,24 @@ +package com.xiaojukeji.know.streaming.km.common.bean.po.metrice; + +import lombok.Data; +import lombok.NoArgsConstructor; + +import static com.xiaojukeji.know.streaming.km.common.utils.CommonUtils.monitorTimestamp2min; + +@Data +@NoArgsConstructor +public class ZookeeperMetricPO extends BaseMetricESPO { + public ZookeeperMetricPO(Long clusterPhyId){ + super(clusterPhyId); + } + + @Override + public String getKey() { + return "ZK@" + clusterPhyId + "@" + monitorTimestamp2min(timestamp); + } + + @Override + public String getRoutingValue() { + return String.valueOf(clusterPhyId); + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/line/MetricMultiLinesVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/line/MetricMultiLinesVO.java index a3874292..917769d2 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/line/MetricMultiLinesVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/line/MetricMultiLinesVO.java @@ -1,16 +1,12 @@ package com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line; -import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; -import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; /** * @author didi @@ -26,19 +22,4 @@ public class MetricMultiLinesVO { @ApiModelProperty(value = "指标名称对应的指标线") private List metricLines; - - public List getMetricPoints(String resName) { - if (ValidateUtils.isNull(metricLines)) { - return new ArrayList<>(); - } - - List voList = metricLines.stream().filter(elem -> elem.getName().equals(resName)).collect(Collectors.toList()); - if (ValidateUtils.isEmptyList(voList)) { - return new ArrayList<>(); - } - - // 仅获取idx=0的指标 - return voList.get(0).getMetricPoints(); - } - } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESConstant.java index af8bd2c3..1b8a7740 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESConstant.java @@ -34,6 +34,8 @@ public class ESConstant { public static final String TOTAL = "total"; + public static final Integer DEFAULT_RETRY_TIME = 3; + private ESConstant() { } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java index 0de516f7..64aef24f 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java @@ -644,4 +644,89 @@ public class ESIndexConstant { " \"aliases\" : { }\n" + " }"; + public final static String ZOOKEEPER_INDEX = "ks_kafka_zookeeper_metric"; + public final static String ZOOKEEPER_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_zookeeper_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"AvgRequestLatency\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"MinRequestLatency\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"MaxRequestLatency\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"OutstandingRequests\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"NodeCount\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"WatchCount\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"NumAliveConnections\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"PacketsReceived\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"PacketsSent\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"EphemeralsCount\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"ApproximateDataSize\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"OpenFileDescriptorCount\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"MaxFileDescriptorCount\" : {\n" + + " \"type\" : \"double\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"type\" : \"date\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java index 15f13175..004dad6d 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java @@ -9,7 +9,9 @@ public enum VersionItemTypeEnum { METRIC_GROUP(102, "group_metric"), METRIC_BROKER(103, "broker_metric"), METRIC_PARTITION(104, "partition_metric"), - METRIC_REPLICATION (105, "replication_metric"), + METRIC_REPLICATION(105, "replication_metric"), + + METRIC_ZOOKEEPER(110, "zookeeper_metric"), /** * 服务端查询 diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxAttribute.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxAttribute.java index cc7bfcb4..a9bea1c3 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxAttribute.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxAttribute.java @@ -22,6 +22,12 @@ public class JmxAttribute { public static final String PERCENTILE_99 = "99thPercentile"; + public static final String MAX = "Max"; + + public static final String MEAN = "Mean"; + + public static final String MIN = "Min"; + public static final String VALUE = "Value"; public static final String CONNECTION_COUNT = "connection-count"; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxName.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxName.java index d2d1651e..db8b3197 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxName.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxName.java @@ -63,6 +63,12 @@ public class JmxName { /*********************************************************** cluster ***********************************************************/ public static final String JMX_CLUSTER_PARTITION_UNDER_REPLICATED = "kafka.cluster:type=Partition,name=UnderReplicated"; + /*********************************************************** zookeeper ***********************************************************/ + + public static final String JMX_ZK_REQUEST_LATENCY_MS = "kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs"; + public static final String JMX_ZK_SYNC_CONNECTS_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperSyncConnectsPerSec"; + public static final String JMX_ZK_DISCONNECTORS_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec"; + private JmxName() { } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index fbede23c..7fc4f4f2 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -343,17 +343,9 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok private Broker getStartTimeAndBuildBroker(Long clusterPhyId, Node newNode, JmxConfig jmxConfig) { try { - Object object = jmxDAO.getJmxValue( - clusterPhyId, - newNode.id(), - newNode.host(), - null, - jmxConfig, - new ObjectName("java.lang:type=Runtime"), - "StartTime" - ); + Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), null, jmxConfig); - return Broker.buildFrom(clusterPhyId, newNode, object != null? (Long) object: null); + return Broker.buildFrom(clusterPhyId, newNode, startTime); } catch (Exception e) { log.error("class=BrokerServiceImpl||method=getStartTimeAndBuildBroker||clusterPhyId={}||brokerNode={}||jmxConfig={}||errMsg=exception!", clusterPhyId, newNode, jmxConfig, e); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ZookeeperMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ZookeeperMetricVersionItems.java new file mode 100644 index 00000000..9b0d4d2b --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ZookeeperMetricVersionItems.java @@ -0,0 +1,141 @@ +package com.xiaojukeji.know.streaming.km.core.service.version.metrics; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +import static com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem.*; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_ZOOKEEPER; +import static com.xiaojukeji.know.streaming.km.common.jmx.JmxAttribute.*; +import static com.xiaojukeji.know.streaming.km.common.jmx.JmxName.*; +import static com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl.ZookeeperMetricServiceImpl.*; + +@Component +public class ZookeeperMetricVersionItems extends BaseMetricVersionMetric { + + /** + * 性能 + */ + public static final String ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY = "AvgRequestLatency"; + public static final String ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY = "MinRequestLatency"; + public static final String ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY = "MaxRequestLatency"; + public static final String ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS = "OutstandingRequests"; + public static final String ZOOKEEPER_METRIC_NODE_COUNT = "NodeCount"; + public static final String ZOOKEEPER_METRIC_WATCH_COUNT = "WatchCount"; + public static final String ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS = "NumAliveConnections"; + public static final String ZOOKEEPER_METRIC_PACKETS_RECEIVED = "PacketsReceived"; + public static final String ZOOKEEPER_METRIC_PACKETS_SENT = "PacketsSent"; + public static final String ZOOKEEPER_METRIC_EPHEMERALS_COUNT = "EphemeralsCount"; + public static final String ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE = "ApproximateDataSize"; + public static final String ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT = "OpenFileDescriptorCount"; + public static final String ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT = "MaxFileDescriptorCount"; + + public static final String ZOOKEEPER_METRIC_KAFKA_ZK_DISCONNECTS_PER_SEC = "KafkaZKDisconnectsPerSec"; + public static final String ZOOKEEPER_METRIC_KAFKA_ZK_SYNC_CONNECTS_PER_SEC = "KafkaZKSyncConnectsPerSec"; + public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_99TH = "KafkaZKRequestLatencyMs_99thPercentile"; + public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MAX = "KafkaZKRequestLatencyMs_Max"; + public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MEAN = "KafkaZKRequestLatencyMs_Mean"; + public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MIN = "KafkaZKRequestLatencyMs_Min"; + + + public static final String ZOOKEEPER_METRIC_COLLECT_COST_TIME = Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME; + + @Override + public int versionItemType() { + return METRIC_ZOOKEEPER.getCode(); + } + + @Override + public List init(){ + List items = new ArrayList<>(); + + // 性能指标 + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY).unit("ms").desc("平均响应延迟").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY).unit("ms").desc("最小响应延迟").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY).unit("ms").desc("最大响应延迟").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS).unit("个").desc("堆积请求数").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_NODE_COUNT).unit("个").desc("ZNode数量").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_WATCH_COUNT).unit("个").desc("Watch数量").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS).unit("个").desc("客户端连接数量").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_PACKETS_RECEIVED).unit("个").desc("接受包的数量").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_PACKETS_SENT).unit("个").desc("发送包的数量").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_EPHEMERALS_COUNT).unit("个").desc("临时节点数").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE).unit("byte").desc("文件大小(近似值)").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT).unit("个").desc("已打开的文件描述符数").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT).unit("个").desc("允许打开的最大文件描述符数").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD)); + + // JMX指标 + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_99TH).unit("ms").desc("ZK请求99分位延迟").category(CATEGORY_CLIENT) + .extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX ) + .jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(PERCENTILE_99))); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MAX).unit("ms").desc("ZK请求最大延迟").category(CATEGORY_CLIENT) + .extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX ) + .jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MAX))); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MIN).unit("ms").desc("ZK请求最小延迟").category(CATEGORY_CLIENT) + .extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX ) + .jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MIN))); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MEAN).unit("ms").desc("ZK请求平均延迟").category(CATEGORY_CLIENT) + .extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX ) + .jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MEAN))); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_KAFKA_ZK_DISCONNECTS_PER_SEC).unit("个").desc("断开连接数").category(CATEGORY_CLIENT) + .extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX ) + .jmxObjectName( JMX_ZK_DISCONNECTORS_PER_SEC ).jmxAttribute(RATE_MIN_1))); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_KAFKA_ZK_SYNC_CONNECTS_PER_SEC).unit("个").desc("同步连接数").category(CATEGORY_CLIENT) + .extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX ) + .jmxObjectName( JMX_ZK_SYNC_CONNECTS_PER_SEC ).jmxAttribute(RATE_MIN_1))); + return items; + } +} + diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperMetricService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperMetricService.java new file mode 100644 index 00000000..2dc48851 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperMetricService.java @@ -0,0 +1,21 @@ +package com.xiaojukeji.know.streaming.km.core.service.zookeeper; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO; + +import java.util.List; + +public interface ZookeeperMetricService { + /** + * ZK指标获取 + * @param param 参数,因为ZK 四字命令在使用时,是短连接,所以参数内容会复杂一些,后续可以考虑优化为长连接 + * @return + */ + Result collectMetricsFromZookeeper(ZookeeperMetricParam param); + Result batchCollectMetricsFromZookeeper(Long clusterPhyId, List metricNameList); + + Result> listMetricsFromES(Long clusterPhyId, MetricDTO dto); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperMetricServiceImpl.java new file mode 100644 index 00000000..dea1d877 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperMetricServiceImpl.java @@ -0,0 +1,281 @@ +package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionJmxInfo; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.MonitorCmdDataParser; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.ServerCmdDataParser; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; +import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; +import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; +import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap; +import com.xiaojukeji.know.streaming.km.common.utils.*; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.MonitorCmdData; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO; +import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; +import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; +import com.xiaojukeji.know.streaming.km.persistence.es.dao.ZookeeperMetricESDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.management.ObjectName; +import java.util.*; +import java.util.stream.Collectors; + +import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*; +import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_JMX_CONNECT_ERROR; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems.*; + + +@Service +public class ZookeeperMetricServiceImpl extends BaseMetricService implements ZookeeperMetricService { + private static final ILog LOGGER = LogFactory.getLog(ZookeeperMetricServiceImpl.class); + + public static final String ZOOKEEPER_METHOD_DO_NOTHING = "doNothing"; + public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD = "getMetricFromMonitorCmd"; + public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD = "getMetricFromServerCmd"; + public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX = "getMetricFromKafkaByJMX"; + + @Autowired + private ClusterPhyService clusterPhyService; + + @Autowired + private ZookeeperService zookeeperService; + + @Autowired + private ZookeeperMetricESDAO zookeeperMetricESDAO; + + @Autowired + private KafkaJMXClient kafkaJMXClient; + + @Autowired + private KafkaControllerService kafkaControllerService; + + @Override + protected VersionItemTypeEnum getVersionItemType() { + return VersionItemTypeEnum.METRIC_ZOOKEEPER; + } + + @Override + protected List listMetricPOFields(){ + return BeanUtil.listBeanFields(ZookeeperMetricPO.class); + } + + @Override + protected void initRegisterVCHandler(){ + registerVCHandler( ZOOKEEPER_METHOD_DO_NOTHING, this::doNothing); + registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD, this::getMetricFromMonitorCmd); + registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD, this::getMetricFromServerCmd); + registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX, this::getMetricFromKafkaByJMX); + } + + @Override + public Result collectMetricsFromZookeeper(ZookeeperMetricParam param) { + try { + return (Result)doVCHandler(param.getClusterPhyId(), param.getMetricName(), param); + } catch (VCHandlerNotExistException e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } + + @Override + public Result batchCollectMetricsFromZookeeper(Long clusterPhyId, List metricNameList) { + ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId); + if (null == clusterPhy) { + return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId)); + } + + List aliveZKList = zookeeperService.listFromDBByCluster(clusterPhyId).stream() + .filter(elem -> Constant.ALIVE.equals(elem.getStatus())) + .collect(Collectors.toList()); + + if (ValidateUtils.isEmptyList(aliveZKList)) { + // 没有指标可以获取 + return Result.buildSuc(new ZookeeperMetrics(clusterPhyId)); + } + + // 构造参数 + ZookeeperMetricParam param = new ZookeeperMetricParam( + clusterPhyId, + aliveZKList.stream().map(elem -> new Tuple(elem.getHost(), elem.getPort())).collect(Collectors.toList()), + ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class), + null + ); + + ZookeeperMetrics metrics = new ZookeeperMetrics(clusterPhyId); + for(String metricName : metricNameList) { + try { + if(metrics.getMetrics().containsKey(metricName)) { + continue; + } + param.setMetricName(metricName); + + Result ret = this.collectMetricsFromZookeeper(param); + if(null == ret || ret.failed() || null == ret.getData()){ + continue; + } + + metrics.putMetric(ret.getData().getMetrics()); + } catch (Exception e){ + LOGGER.error( + "class=ZookeeperMetricServiceImpl||method=collectMetricsFromZookeeper||clusterPhyId={}||metricName={}||errMsg=exception!", + clusterPhyId, metricName, e + ); + } + } + + return Result.buildSuc(metrics); + } + + @Override + public Result> listMetricsFromES(Long clusterPhyId, MetricDTO dto) { + Map> pointVOMap = zookeeperMetricESDAO.listMetricsByClusterPhyId( + clusterPhyId, + dto.getMetricsNames(), + dto.getAggType(), + dto.getStartTime(), + dto.getEndTime() + ); + + // 格式转化 + List voList = new ArrayList<>(); + pointVOMap.entrySet().stream().forEach(entry -> + voList.add(new MetricLineVO(String.valueOf(clusterPhyId), entry.getKey(), entry.getValue())) + ); + return Result.buildSuc(voList); + } + + + /**************************************************** private method ****************************************************/ + + private Result getMetricFromServerCmd(VersionItemParam metricParam) { + ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam; + + Result rz = null; + for (Tuple hostPort: param.getZkAddressList()) { + Result cmdDataResult = FourLetterWordUtil.executeFourLetterCmd( + param.getClusterPhyId(), + hostPort.getV1(), + hostPort.getV2(), + param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false, + param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS, + new ServerCmdDataParser() + ); + + if (cmdDataResult.failed()) { + rz = Result.buildFromIgnoreData(cmdDataResult); + continue; + } + + ServerCmdData cmdData = cmdDataResult.getData(); + + ZookeeperMetrics metrics = new ZookeeperMetrics(param.getClusterPhyId()); + metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY, cmdData.getZkMinLatency().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY, cmdData.getZkMaxLatency().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS, cmdData.getZkOutstandingRequests().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_NODE_COUNT, cmdData.getZkZnodeCount().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS, cmdData.getZkNumAliveConnections().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_RECEIVED, cmdData.getZkPacketsReceived().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_SENT, cmdData.getZkPacketsSent().floatValue()); + + return Result.buildSuc(metrics); + } + + return rz != null? rz: Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId())); + } + + private Result getMetricFromMonitorCmd(VersionItemParam metricParam) { + ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam; + + Result rz = null; + for (Tuple hostPort: param.getZkAddressList()) { + Result cmdDataResult = FourLetterWordUtil.executeFourLetterCmd( + param.getClusterPhyId(), + hostPort.getV1(), + hostPort.getV2(), + param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false, + param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS, + new MonitorCmdDataParser() + ); + + if (cmdDataResult.failed()) { + rz = Result.buildFromIgnoreData(cmdDataResult); + continue; + } + + MonitorCmdData cmdData = cmdDataResult.getData(); + + ZookeeperMetrics metrics = new ZookeeperMetrics(param.getClusterPhyId()); + metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY, cmdData.getZkMinLatency().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY, cmdData.getZkMaxLatency().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS, cmdData.getZkOutstandingRequests().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_NODE_COUNT, cmdData.getZkZnodeCount().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_WATCH_COUNT, cmdData.getZkWatchCount().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS, cmdData.getZkNumAliveConnections().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_RECEIVED, cmdData.getZkPacketsReceived().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_SENT, cmdData.getZkPacketsSent().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_EPHEMERALS_COUNT, cmdData.getZkEphemeralsCount().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE, cmdData.getZkApproximateDataSize().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT, cmdData.getZkOpenFileDescriptorCount().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT, cmdData.getZkMaxFileDescriptorCount().floatValue()); + + return Result.buildSuc(metrics); + } + + return rz != null? rz: Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId())); + } + + private Result doNothing(VersionItemParam metricParam) { + ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam; + return Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId())); + } + + private Result getMetricFromKafkaByJMX(VersionItemParam metricParam) { + ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam; + + String metricName = param.getMetricName(); + Long clusterPhyId = param.getClusterPhyId(); + Integer kafkaControllerId = param.getKafkaControllerId(); + + //1、获取jmx的属性信息 + VersionJmxInfo jmxInfo = getJMXInfo(clusterPhyId, metricName); + if(null == jmxInfo) { + return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST); + } + + //2、获取jmx连接 + JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClientWithCheck(clusterPhyId, kafkaControllerId); + if (ValidateUtils.isNull(jmxConnectorWrap)) { + return Result.buildFailure(VC_JMX_INIT_ERROR); + } + + try { + //2、获取jmx指标 + String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxInfo.getJmxObjectName()), jmxInfo.getJmxAttribute()).toString(); + + return Result.buildSuc(ZookeeperMetrics.initWithMetric(clusterPhyId, metricName, Float.valueOf(value))); + } catch (Exception e) { + return Result.buildFailure(VC_JMX_CONNECT_ERROR); + } + } +} diff --git a/km-dist/init/template/ks_kafka_zookeeper_metric b/km-dist/init/template/ks_kafka_zookeeper_metric new file mode 100644 index 00000000..abb54a61 --- /dev/null +++ b/km-dist/init/template/ks_kafka_zookeeper_metric @@ -0,0 +1,85 @@ +PUT _template/ks_kafka_zookeeper_metric +{ + "order" : 10, + "index_patterns" : [ + "ks_kafka_zookeeper_metric*" + ], + "settings" : { + "index" : { + "number_of_shards" : "10" + } + }, + "mappings" : { + "properties" : { + "routingValue" : { + "type" : "text", + "fields" : { + "keyword" : { + "ignore_above" : 256, + "type" : "keyword" + } + } + }, + "clusterPhyId" : { + "type" : "long" + }, + "metrics" : { + "properties" : { + "AvgRequestLatency" : { + "type" : "double" + }, + "MinRequestLatency" : { + "type" : "double" + }, + "MaxRequestLatency" : { + "type" : "double" + }, + "OutstandingRequests" : { + "type" : "double" + }, + "NodeCount" : { + "type" : "double" + }, + "WatchCount" : { + "type" : "double" + }, + "NumAliveConnections" : { + "type" : "double" + }, + "PacketsReceived" : { + "type" : "double" + }, + "PacketsSent" : { + "type" : "double" + }, + "EphemeralsCount" : { + "type" : "double" + }, + "ApproximateDataSize" : { + "type" : "double" + }, + "OpenFileDescriptorCount" : { + "type" : "double" + }, + "MaxFileDescriptorCount" : { + "type" : "double" + } + } + }, + "key" : { + "type" : "text", + "fields" : { + "keyword" : { + "ignore_above" : 256, + "type" : "keyword" + } + } + }, + "timestamp" : { + "format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis", + "type" : "date" + } + } + }, + "aliases" : { } + } \ No newline at end of file diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java index a6615fbc..faeb64cb 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java @@ -40,8 +40,7 @@ public class BaseMetricESDAO extends BaseESDAO { /** * 不同维度 kafka 监控数据 */ - private static Map ariusStatsEsDaoMap = Maps - .newConcurrentMap(); + private static Map ariusStatsEsDaoMap = Maps.newConcurrentMap(); /** * 检查 es 索引是否存在,不存在则创建索引 diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ZookeeperMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ZookeeperMetricESDAO.java new file mode 100644 index 00000000..8b391a3a --- /dev/null +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ZookeeperMetricESDAO.java @@ -0,0 +1,106 @@ +package com.xiaojukeji.know.streaming.km.persistence.es.dao; + +import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse; +import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; +import com.xiaojukeji.know.streaming.km.common.constant.ESConstant; +import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; +import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_INDEX; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_TEMPLATE; + +@Component +public class ZookeeperMetricESDAO extends BaseMetricESDAO { + + @PostConstruct + public void init() { + super.indexName = ZOOKEEPER_INDEX; + super.indexTemplate = ZOOKEEPER_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); + } + + /** + * 获取指定集群,指定指标,一段时间内的值 + */ + public Map> listMetricsByClusterPhyId(Long clusterPhyId, + List metricNameList, + String aggType, + Long startTime, + Long endTime) { + //1、获取需要查下的索引 + String realIndex = realIndex(startTime, endTime); + + //2、根据查询的时间区间大小来确定指标点的聚合区间大小 + String interval = MetricsUtils.getInterval(endTime - startTime); + + //3、构造agg查询条件 + String aggDsl = buildAggsDSL(metricNameList, aggType); + + //4、构造dsl查询条件,开始查询 + try { + String dsl = dslLoaderUtil.getFormatDslByFileName( + DslsConstant.GET_ZOOKEEPER_AGG_LIST_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl); + + return esOpClient.performRequestWithRouting( + String.valueOf(clusterPhyId), + realIndex, + dsl, + s -> handleListESQueryResponse(s, metricNameList, aggType), + ESConstant.DEFAULT_RETRY_TIME + ); + } catch (Exception e){ + LOGGER.error("class=ZookeeperMetricESDAO||method=listMetricsByClusterPhyId||clusterPhyId={}||errMsg=exception!", + clusterPhyId, e + ); + } + + return new HashMap<>(); + } + + /**************************************************** private method ****************************************************/ + + private Map> handleListESQueryResponse(ESQueryResponse response, List metrics, String aggType){ + Map esAggrMap = checkBucketsAndHitsOfResponseAggs(response); + if(null == esAggrMap) { + return new HashMap<>(); + } + + Map> metricMap = new HashMap<>(); + for(String metric : metrics){ + List metricPoints = new ArrayList<>(); + + esAggrMap.get(HIST).getBucketList().forEach( esBucket -> { + try { + if (null != esBucket.getUnusedMap().get(KEY)) { + Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); + String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString(); + + MetricPointVO metricPoint = new MetricPointVO(); + metricPoint.setAggType(aggType); + metricPoint.setTimeStamp(timestamp); + metricPoint.setValue(value); + metricPoint.setName(metric); + + metricPoints.add(metricPoint); + } + }catch (Exception e){ + LOGGER.error("method=handleESQueryResponse||metric={}||errMsg=exception!", metric, e); + } + } ); + + metricMap.put(metric, optimizeMetricPoints(metricPoints)); + } + + return metricMap; + } +} diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslsConstant.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslsConstant.java index 3f158f36..94a8698e 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslsConstant.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslsConstant.java @@ -80,4 +80,6 @@ public class DslsConstant { public static final String COUNT_GROUP_NOT_METRIC_VALUE = "GroupMetricESDAO/countGroupNotMetricValue"; + /**************************************************** Zookeeper ****************************************************/ + public static final String GET_ZOOKEEPER_AGG_LIST_METRICS = "ZookeeperMetricESDAO/getAggListZookeeperMetrics"; } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/JmxDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/JmxDAO.java index a3747c0a..017bcf04 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/JmxDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/JmxDAO.java @@ -12,5 +12,7 @@ import javax.management.ObjectName; public interface JmxDAO { Object getJmxValue(String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute); - Object getJmxValue(Long clusterPhyId, Integer brokerId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute); + Object getJmxValue(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute); + + Long getServerStartTime(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig); } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/impl/JmxDAOImpl.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/impl/JmxDAOImpl.java index ec8349cc..77eb3252 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/impl/JmxDAOImpl.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/impl/JmxDAOImpl.java @@ -19,24 +19,28 @@ public class JmxDAOImpl implements JmxDAO { @Override public Object getJmxValue(String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) { - return this.getJmxValue(null, null, jmxHost, jmxPort, jmxConfig, objectName, attribute); + return this.getJmxValue(null, jmxHost, jmxPort, jmxConfig, objectName, attribute); } @Override - public Object getJmxValue(Long clusterPhyId, Integer brokerId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) { + public Object getJmxValue(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) { JmxConnectorWrap jmxConnectorWrap = null; try { - jmxConnectorWrap = new JmxConnectorWrap(clusterPhyId, brokerId, null, jmxHost, jmxPort, jmxConfig); + jmxConnectorWrap = new JmxConnectorWrap(clusterPhyId, null, null, jmxHost, jmxPort, jmxConfig); if (!jmxConnectorWrap.checkJmxConnectionAndInitIfNeed()) { - log.error("method=getJmxValue||clusterPhyId={}||brokerId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMgs=create jmx client failed", - clusterPhyId, brokerId, jmxHost, jmxPort, jmxConfig); + log.error( + "method=getJmxValue||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMgs=create jmx client failed", + clusterPhyId, jmxHost, jmxPort, jmxConfig + ); return null; } return jmxConnectorWrap.getAttribute(objectName, attribute); } catch (Exception e) { - log.error("method=getJmxValue||clusterPhyId={}||brokerId={}||jmxHost={}||jmxPort={}||jmxConfig={}||objectName={}||attribute={}||msg=get attribute failed||errMsg={}", - clusterPhyId, brokerId, jmxHost, jmxPort, jmxConfig, objectName, attribute, e); + log.error( + "method=getJmxValue||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||objectName={}||attribute={}||msg=get attribute failed||errMsg=exception!", + clusterPhyId, jmxHost, jmxPort, jmxConfig, objectName, attribute, e + ); } finally { if (jmxConnectorWrap != null) { jmxConnectorWrap.close(); @@ -45,4 +49,27 @@ public class JmxDAOImpl implements JmxDAO { return null; } + + @Override + public Long getServerStartTime(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig) { + try { + Object object = this.getJmxValue( + clusterPhyId, + jmxHost, + jmxPort, + jmxConfig, + new ObjectName("java.lang:type=Runtime"), + "StartTime" + ); + + return object == null? null: (Long) object; + } catch (Exception e) { + log.error( + "class=JmxDAOImpl||method=getServerStartTime||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMsg=exception!", + clusterPhyId, jmxHost, jmxPort, jmxConfig, e + ); + } + + return null; + } } diff --git a/km-persistence/src/main/resources/dsl/ZookeeperMetricESDAO/getAggListZookeeperMetrics b/km-persistence/src/main/resources/dsl/ZookeeperMetricESDAO/getAggListZookeeperMetrics new file mode 100644 index 00000000..c05c221d --- /dev/null +++ b/km-persistence/src/main/resources/dsl/ZookeeperMetricESDAO/getAggListZookeeperMetrics @@ -0,0 +1,44 @@ +{ + "size": 0, + "query": { + "bool": { + "must": [ + { + "term": { + "clusterPhyId": { + "value": %d + } + } + }, + { + "term": { + "brokerId": { + "value": %d + } + } + }, + { + "range": { + "timestamp": { + "gte": %d, + "lte": %d + } + } + } + ] + } + }, + "aggs": { + "hist": { + "date_histogram": { + "field": "timestamp", + "fixed_interval": "%s", + "time_zone": "Asia/Shanghai", + "min_doc_count": 0 + }, + "aggs": { + %s + } + } + } +} \ No newline at end of file diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ZookeeperMetricCollectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ZookeeperMetricCollectorTask.java new file mode 100644 index 00000000..f533a30a --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ZookeeperMetricCollectorTask.java @@ -0,0 +1,33 @@ +package com.xiaojukeji.know.streaming.km.task.metrics; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.collector.metric.ZookeeperMetricCollector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author didi + */ +@Task(name = "ZookeeperMetricCollectorTask", + description = "Zookeeper指标采集任务", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class ZookeeperMetricCollectorTask extends AbstractAsyncMetricsDispatchTask { + private static final ILog log = LogFactory.getLog(ZookeeperMetricCollectorTask.class); + + @Autowired + private ZookeeperMetricCollector zookeeperMetricCollector; + + @Override + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + zookeeperMetricCollector.collectMetrics(clusterPhy); + + return TaskResult.SUCCESS; + } +}