diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectClusterMetrics.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectClusterMetrics.java index fe710391..f7c50818 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectClusterMetrics.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectClusterMetrics.java @@ -1,7 +1,6 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; @@ -12,20 +11,18 @@ import lombok.ToString; */ @Data @NoArgsConstructor -@AllArgsConstructor @ToString public class ConnectClusterMetrics extends BaseMetrics { - private Long connectClusterId; + protected Long connectClusterId; - public ConnectClusterMetrics(Long clusterPhyId, Long connectClusterId){ + public ConnectClusterMetrics(Long clusterPhyId, Long connectClusterId ){ super(clusterPhyId); this.connectClusterId = connectClusterId; } - public static ConnectClusterMetrics initWithMetric(Long connectClusterId, String metric, Float value) { - ConnectClusterMetrics brokerMetrics = new ConnectClusterMetrics(connectClusterId, connectClusterId); - brokerMetrics.putMetric(metric, value); - return brokerMetrics; + public ConnectClusterMetrics(Long connectClusterId, String metricName, Float metricValue) { + this(null, connectClusterId); + this.putMetric(metricName, metricValue); } @Override diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectWorkerMetrics.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectWorkerMetrics.java index 78d9fe06..de4936e5 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectWorkerMetrics.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectWorkerMetrics.java @@ -1,7 +1,5 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect; -import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; @@ -11,25 +9,19 @@ import lombok.ToString; * @date 2022/11/2 */ @Data -@AllArgsConstructor @NoArgsConstructor @ToString -public class ConnectWorkerMetrics extends BaseMetrics { - - private Long connectClusterId; - +public class ConnectWorkerMetrics extends ConnectClusterMetrics { private String workerId; - public static ConnectWorkerMetrics initWithMetric(Long connectClusterId, String workerId, String metric, Float value) { - ConnectWorkerMetrics connectWorkerMetrics = new ConnectWorkerMetrics(); - connectWorkerMetrics.setConnectClusterId(connectClusterId); - connectWorkerMetrics.setWorkerId(workerId); - connectWorkerMetrics.putMetric(metric, value); - return connectWorkerMetrics; + public ConnectWorkerMetrics(Long connectClusterId, String workerId, String metricName, Float metricValue) { + super(null, connectClusterId); + this.workerId = workerId; + this.putMetric(metricName, metricValue); } @Override public String unique() { - return "KCC@" + clusterPhyId + "@" + connectClusterId + "@" + workerId; + return "KCW@" + clusterPhyId + "@" + connectClusterId + "@" + workerId; } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectorMetrics.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectorMetrics.java index 08540ed5..b497efb8 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectorMetrics.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectorMetrics.java @@ -1,6 +1,5 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect; -import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; @@ -12,24 +11,21 @@ import lombok.ToString; @Data @NoArgsConstructor @ToString -public class ConnectorMetrics extends BaseMetrics { - private Long connectClusterId; +public class ConnectorMetrics extends ConnectClusterMetrics { + protected String connectorName; - private String connectorName; - - private String connectorNameAndClusterId; + protected String connectorNameAndClusterId; public ConnectorMetrics(Long connectClusterId, String connectorName) { - super(null); + super(null, connectClusterId); this.connectClusterId = connectClusterId; this.connectorName = connectorName; this.connectorNameAndClusterId = connectorName + "#" + connectClusterId; } - public static ConnectorMetrics initWithMetric(Long connectClusterId, String connectorName, String metricName, Float value) { - ConnectorMetrics metrics = new ConnectorMetrics(connectClusterId, connectorName); - metrics.putMetric(metricName, value); - return metrics; + public ConnectorMetrics(Long connectClusterId, String connectorName, String metricName, Float metricValue) { + this(connectClusterId, connectorName); + this.putMetric(metricName, metricValue); } @Override diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectorTaskMetrics.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectorTaskMetrics.java index eb0dc42d..fc28c97e 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectorTaskMetrics.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectorTaskMetrics.java @@ -1,6 +1,5 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect; -import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; @@ -12,11 +11,7 @@ import lombok.ToString; @Data @NoArgsConstructor @ToString -public class ConnectorTaskMetrics extends BaseMetrics { - private Long connectClusterId; - - private String connectorName; - +public class ConnectorTaskMetrics extends ConnectorMetrics { private Integer taskId; public ConnectorTaskMetrics(Long connectClusterId, String connectorName, Integer taskId) { @@ -25,14 +20,13 @@ public class ConnectorTaskMetrics extends BaseMetrics { this.taskId = taskId; } - public static ConnectorTaskMetrics initWithMetric(Long connectClusterId, String connectorName, Integer taskId, String metricName, Float value) { - ConnectorTaskMetrics metrics = new ConnectorTaskMetrics(connectClusterId, connectorName, taskId); - metrics.putMetric(metricName,value); - return metrics; + public ConnectorTaskMetrics(Long connectClusterId, String connectorName, Integer taskId, String metricName, Float metricValue) { + this(connectClusterId, connectorName, taskId); + this.putMetric(metricName, metricValue); } @Override public String unique() { - return "KCOR@" + connectClusterId + "@" + connectorName + "@" + taskId; + return "KCORT@" + connectClusterId + "@" + connectorName + "@" + taskId; } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/connect/ConnectStatusEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/connect/ConnectStatusEnum.java new file mode 100644 index 00000000..235cead6 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/connect/ConnectStatusEnum.java @@ -0,0 +1,50 @@ +package com.xiaojukeji.know.streaming.km.common.enums.connect; + +import org.apache.kafka.connect.runtime.AbstractStatus; + +/** + * connector运行状态 + * @see AbstractStatus + */ +public enum ConnectStatusEnum { + UNASSIGNED(0, "UNASSIGNED"), + + RUNNING(1,"RUNNING"), + + PAUSED(2,"PAUSED"), + + FAILED(3, "FAILED"), + + DESTROYED(4, "DESTROYED"), + + UNKNOWN(-1, "UNKNOWN") + + ; + + ConnectStatusEnum(int status, String value) { + this.status = status; + this.value = value; + } + + private final int status; + + private final String value; + + public static ConnectStatusEnum getByValue(String value) { + for (ConnectStatusEnum statusEnum: ConnectStatusEnum.values()) { + if (statusEnum.value.equals(value)) { + return statusEnum; + } + } + + return ConnectStatusEnum.UNKNOWN; + } + + public int getStatus() { + return status; + } + + public String getValue() { + return value; + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterMetricServiceImpl.java index 5ed5af64..1444d3ac 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterMetricServiceImpl.java @@ -24,9 +24,9 @@ import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache; import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterMetricService; import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService; -import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService; +import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectMetricService; import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient; -import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.ConnectClusterMetricESDAO; +import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.cluster.ConnectClusterMetricESDAO; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -43,7 +43,7 @@ import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultS * @author didi */ @Service -public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService implements ConnectClusterMetricService { +public class ConnectClusterMetricServiceImpl extends BaseConnectMetricService implements ConnectClusterMetricService { protected static final ILog LOGGER = LogFactory.getLog(ConnectClusterMetricServiceImpl.class); public static final String CONNECT_CLUSTER_METHOD_GET_WORKER_METRIC_AVG = "getWorkerMetricAvg"; @@ -86,8 +86,7 @@ public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService String connectClusterMetricKey = CollectedMetricsLocalCache.genConnectClusterMetricCacheKey(connectClusterPhyId, metric); Float keyValue = CollectedMetricsLocalCache.getConnectClusterMetrics(connectClusterMetricKey); if (keyValue != null) { - ConnectClusterMetrics connectClusterMetrics = ConnectClusterMetrics.initWithMetric(connectClusterPhyId,metric,keyValue); - return Result.buildSuc(connectClusterMetrics); + return Result.buildSuc(new ConnectClusterMetrics(connectClusterPhyId, metric, keyValue)); } Result ret = this.collectConnectClusterMetricsFromKafka(connectClusterPhyId, metric); @@ -209,8 +208,7 @@ public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService try { //2、获取jmx指标 String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxInfo.getJmxObjectName()), jmxInfo.getJmxAttribute()).toString(); - ConnectWorkerMetrics connectWorkerMetrics = ConnectWorkerMetrics.initWithMetric(connectClusterId, workerId, metric, Float.valueOf(value)); - return Result.buildSuc(connectWorkerMetrics); + return Result.buildSuc(new ConnectWorkerMetrics(connectClusterId, workerId, metric, Float.valueOf(value))); } catch (Exception e) { LOGGER.error("method=getConnectWorkerMetricsByJMX||connectClusterId={}||workerId={}||metrics={}||jmx={}||msg={}", connectClusterId, workerId, metric, jmxInfo.getJmxObjectName(), e.getClass().getName()); @@ -231,8 +229,8 @@ public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService .collect(Collectors.toList()); } - protected List metricMap2VO(Long connectClusterId, - Map>> map){ + private List metricMap2VO(Long connectClusterId, + Map>> map){ List multiLinesVOS = new ArrayList<>(); if (map == null || map.isEmpty()) { // 如果为空,则直接返回 diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorMetricServiceImpl.java index 8792875d..ffcc16ab 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorMetricServiceImpl.java @@ -18,6 +18,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.connect.Connector import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; +import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectStatusEnum; import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; @@ -32,7 +33,7 @@ import com.xiaojukeji.know.streaming.km.core.service.connect.connector.Connector import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService; import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService; import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; -import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService; +import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectMetricService; import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient; import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.connector.ConnectorMetricESDAO; import org.springframework.beans.factory.annotation.Autowired; @@ -52,7 +53,7 @@ import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultS * @author didi */ @Service -public class ConnectorMetricServiceImpl extends BaseConnectorMetricService implements ConnectorMetricService { +public class ConnectorMetricServiceImpl extends BaseConnectMetricService implements ConnectorMetricService { protected static final ILog LOGGER = LogFactory.getLog(ConnectorMetricServiceImpl.class); public static final String CONNECTOR_METHOD_DO_NOTHING = "doNothing"; @@ -67,6 +68,8 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple public static final String CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE = "getMetricHealthScore"; + public static final String CONNECTOR_METHOD_GET_METRIC_RUNNING_STATUS = "getMetricRunningStatus"; + @Autowired private ConnectorMetricESDAO connectorMetricESDAO; @@ -98,11 +101,12 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple @Override protected void initRegisterVCHandler() { registerVCHandler(CONNECTOR_METHOD_DO_NOTHING, this::doNothing); - registerVCHandler(CONNECTOR_METHOD_GET_CONNECT_WORKER_METRIC_SUM, this::getConnectWorkerMetricSum); - registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_AVG, this::getConnectorTaskMetricsAvg); - registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_MAX, this::getConnectorTaskMetricsMax); - registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_SUM, this::getConnectorTaskMetricsSum); - registerVCHandler(CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE, this::getMetricHealthScore); + registerVCHandler(CONNECTOR_METHOD_GET_CONNECT_WORKER_METRIC_SUM, this::getConnectWorkerMetricSum); + registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_AVG, this::getConnectorTaskMetricsAvg); + registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_MAX, this::getConnectorTaskMetricsMax); + registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_SUM, this::getConnectorTaskMetricsSum); + registerVCHandler(CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE, this::getMetricHealthScore); + registerVCHandler(CONNECTOR_METHOD_GET_METRIC_RUNNING_STATUS, this::getMetricRunningStatus); } @Override @@ -111,8 +115,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple Float keyValue = CollectedMetricsLocalCache.getConnectorMetrics(connectorMetricKey); if (null != keyValue) { - ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterPhyId, connectorName, metric, keyValue); - return Result.buildSuc(connectorMetrics); + return Result.buildSuc(new ConnectorMetrics(connectClusterPhyId, connectorName, metric, keyValue)); } Result ret = this.collectConnectClusterMetricsFromKafka(connectClusterPhyId, connectorName, metric); @@ -216,6 +219,20 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple return Result.buildSuc(metrics); } + private Result getMetricRunningStatus(VersionItemParam metricParam) { + ConnectorMetricParam param = (ConnectorMetricParam) metricParam; + Long connectClusterId = param.getConnectClusterId(); + String connectorName = param.getConnectorName(); + String metricName = param.getMetricName(); + + ConnectorPO connector = connectorService.getConnectorFromDB(connectClusterId, connectorName); + if (connector == null) { + return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metricName, (float)ConnectStatusEnum.UNKNOWN.getStatus())); + } + + return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metricName, (float)ConnectStatusEnum.getByValue(connector.getState()).getStatus())); + } + private Result getConnectWorkerMetricSum(VersionItemParam metricParam) { ConnectorMetricParam param = (ConnectorMetricParam) metricParam; Long connectClusterId = param.getConnectClusterId(); @@ -240,12 +257,16 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple if (!isCollected) { return Result.buildFailure(NOT_EXIST); } - return Result.buildSuc(ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, sum)); + + return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, sum)); } //kafka.connect:type=connect-worker-metrics,connector="{connector}" 指标 private Result getConnectorMetric(Long connectClusterId, String workerId, String connectorName, String metric, ConnectorTypeEnum connectorType) { VersionConnectJmxInfo jmxInfo = getJMXInfo(connectClusterId, metric); + if (null == jmxInfo) { + return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST); + } if (jmxInfo.getType() != null) { if (connectorType == null) { @@ -257,9 +278,6 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple } } - if (null == jmxInfo) { - return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST); - } String jmxObjectName = String.format(jmxInfo.getJmxObjectName(), connectorName); JmxConnectorWrap jmxConnectorWrap = connectJMXClient.getClientWithCheck(connectClusterId, workerId); @@ -270,8 +288,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple try { //2、获取jmx指标 String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxObjectName), jmxInfo.getJmxAttribute()).toString(); - ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, Float.valueOf(value)); - return Result.buildSuc(connectorMetrics); + return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, Float.valueOf(value))); } catch (InstanceNotFoundException e) { // 忽略该错误,该错误出现的原因是该指标在JMX中不存在 return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName)); @@ -296,8 +313,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple } Float sum = ret.getData().stream().map(elem -> elem.getMetric(metric)).reduce(Float::sum).get(); - ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, sum / ret.getData().size()); - return Result.buildSuc(connectorMetrics); + return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, sum / ret.getData().size())); } private Result getConnectorTaskMetricsMax(VersionItemParam metricParam){ @@ -313,8 +329,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple } Float max = ret.getData().stream().max((a, b) -> a.getMetric(metric).compareTo(b.getMetric(metric))).get().getMetric(metric); - ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, max); - return Result.buildSuc(connectorMetrics); + return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, max)); } private Result getConnectorTaskMetricsSum(VersionItemParam metricParam){ @@ -330,8 +345,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple } Float sum = ret.getData().stream().map(elem -> elem.getMetric(metric)).reduce(Float::sum).get(); - ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, sum); - return Result.buildSuc(connectorMetrics); + return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, sum)); } @@ -358,6 +372,9 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple private Result getConnectorTaskMetric(Long connectClusterId, String workerId, String connectorName, Integer taskId, String metric, ConnectorTypeEnum connectorType) { VersionConnectJmxInfo jmxInfo = getJMXInfo(connectClusterId, metric); + if (null == jmxInfo) { + return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST); + } if (jmxInfo.getType() != null) { if (connectorType == null) { @@ -369,9 +386,6 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple } } - if (null == jmxInfo) { - return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST); - } String jmxObjectName=String.format(jmxInfo.getJmxObjectName(), connectorName, taskId); JmxConnectorWrap jmxConnectorWrap = connectJMXClient.getClientWithCheck(connectClusterId, workerId); @@ -382,8 +396,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple try { //2、获取jmx指标 String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxObjectName), jmxInfo.getJmxAttribute()).toString(); - ConnectorTaskMetrics connectorTaskMetrics = ConnectorTaskMetrics.initWithMetric(connectClusterId, connectorName, taskId, metric, Float.valueOf(value)); - return Result.buildSuc(connectorTaskMetrics); + return Result.buildSuc(new ConnectorTaskMetrics(connectClusterId, connectorName, taskId, metric, Float.valueOf(value))); } catch (Exception e) { LOGGER.error("method=getConnectorTaskMetric||connectClusterId={}||workerId={}||connectorName={}||taskId={}||metrics={}||jmx={}||msg={}", connectClusterId, workerId, connectorName, taskId, metric, jmxObjectName, e.getClass().getName()); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/impl/MirrorMakerMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/impl/MirrorMakerMetricServiceImpl.java index 83242841..2361be96 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/impl/MirrorMakerMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/impl/MirrorMakerMetricServiceImpl.java @@ -27,7 +27,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService; import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; -import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService; +import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectMetricService; import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient; import org.springframework.beans.factory.annotation.Autowired; import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.mm2.MirrorMakerMetricESDAO; @@ -49,7 +49,7 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT * @date 2022/12/15 */ @Service -public class MirrorMakerMetricServiceImpl extends BaseConnectorMetricService implements MirrorMakerMetricService { +public class MirrorMakerMetricServiceImpl extends BaseConnectMetricService implements MirrorMakerMetricService { protected static final ILog LOGGER = LogFactory.getLog(MirrorMakerMetricServiceImpl.class); public static final String MIRROR_MAKER_METHOD_DO_NOTHING = "doNothing"; @@ -190,7 +190,7 @@ public class MirrorMakerMetricServiceImpl extends BaseConnectorMetricService imp multiLinesVO.setMetricLines(metricLines); multiLinesVOS.add(multiLinesVO); - }catch (Exception e){ + } catch (Exception e){ LOGGER.error("method=metricMap2VO||connectClusterId={}||msg=exception!", connectClusterId, e); } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectorMetricService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectMetricService.java similarity index 90% rename from km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectorMetricService.java rename to km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectMetricService.java index febfdcf4..da424ebc 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectorMetricService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectMetricService.java @@ -9,7 +9,7 @@ import java.util.stream.Collectors; * @author wyb * @date 2022/11/9 */ -public abstract class BaseConnectorMetricService extends BaseConnectorVersionControlService{ +public abstract class BaseConnectMetricService extends BaseConnectVersionControlService { private List metricNames = new ArrayList<>(); @PostConstruct diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectorVersionControlService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectVersionControlService.java similarity index 95% rename from km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectorVersionControlService.java rename to km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectVersionControlService.java index ced858ff..8f426061 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectorVersionControlService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectVersionControlService.java @@ -14,7 +14,7 @@ import javax.annotation.Nullable; * @author wyb * @date 2022/11/8 */ -public abstract class BaseConnectorVersionControlService extends BaseVersionControlService { +public abstract class BaseConnectVersionControlService extends BaseVersionControlService { @Autowired ConnectClusterService connectClusterService; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/connect/ConnectorMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/connect/ConnectorMetricVersionItems.java index 2d4aeac2..bcad6e3d 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/connect/ConnectorMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/connect/ConnectorMetricVersionItems.java @@ -24,6 +24,8 @@ public class ConnectorMetricVersionItems extends BaseMetricVersionMetric { public static final String CONNECTOR_METRIC_HEALTH_STATE = "HealthState"; + public static final String CONNECTOR_METRIC_RUNNING_STATUS = "RunningStatus"; + public static final String CONNECTOR_METRIC_CONNECTOR_TOTAL_TASK_COUNT = "ConnectorTotalTaskCount"; public static final String CONNECTOR_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed"; @@ -128,6 +130,9 @@ public class ConnectorMetricVersionItems extends BaseMetricVersionMetric { items.add(buildAllVersionsItem() .name(CONNECTOR_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕机").desc("健康状态(0:好 1:中 2:差 3:宕机)").category(CATEGORY_HEALTH) .extendMethod(CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE)); + items.add(buildAllVersionsItem() + .name(CONNECTOR_METRIC_RUNNING_STATUS).unit("0:UNASSIGNED 1:RUNNING 2:PAUSED 3:FAILED 4:DESTROYED -1:UNKNOWN").desc("运行状态(0:UNASSIGNED 1:RUNNING 2:PAUSED 3:FAILED 4:DESTROYED -1:UNKNOWN)").category(CATEGORY_PERFORMANCE) + .extendMethod(CONNECTOR_METHOD_GET_METRIC_RUNNING_STATUS)); items.add(buildAllVersionsItem() .name(CONNECTOR_METRIC_HEALTH_CHECK_PASSED).unit("个").desc("健康项检查通过数").category(CATEGORY_HEALTH) .extendMethod(CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE)); diff --git a/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/common/MonitorSinkTagEnum.java b/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/common/MonitorSinkTagEnum.java index f78c547a..3d8b5c25 100644 --- a/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/common/MonitorSinkTagEnum.java +++ b/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/common/MonitorSinkTagEnum.java @@ -16,6 +16,11 @@ public enum MonitorSinkTagEnum { CONSUMER_GROUP("consumerGroup"), REPLICATION("replication"), + + CONNECT_CLUSTER_ID("connectClusterId"), + + CONNECT_CONNECTOR("connectConnector"), + ; private final String name; diff --git a/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java b/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java index bbb31647..6d6b9774 100644 --- a/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java +++ b/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java @@ -3,7 +3,9 @@ package com.xiaojukeji.know.streaming.km.monitor.component; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.*; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.ConnectorMetrics; import com.xiaojukeji.know.streaming.km.common.bean.event.metric.*; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.connect.ConnectorMetricEvent; import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; import com.xiaojukeji.know.streaming.km.monitor.common.MetricSinkPoint; import org.springframework.context.ApplicationListener; @@ -59,6 +61,10 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener< } else if(event instanceof ZookeeperMetricEvent) { ZookeeperMetricEvent zookeeperMetricEvent = (ZookeeperMetricEvent)event; sinkMetrics(zookeeperMetric2SinkPoint(zookeeperMetricEvent.getZookeeperMetrics())); + + } else if (event instanceof ConnectorMetricEvent) { + ConnectorMetricEvent connectorMetricEvent = (ConnectorMetricEvent)event; + sinkMetrics(connectConnectorMetric2SinkPoint(connectorMetricEvent.getConnectorMetricsList())); } } ); } @@ -170,6 +176,21 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener< return pointList; } + private List connectConnectorMetric2SinkPoint(List connectorMetricsList){ + List pointList = new ArrayList<>(); + + for(ConnectorMetrics metrics : connectorMetricsList){ + Map tagsMap = new HashMap<>(); + tagsMap.put(CLUSTER_ID.getName(), metrics.getClusterPhyId()); + tagsMap.put(CONNECT_CLUSTER_ID.getName(), metrics.getConnectClusterId()); + tagsMap.put(CONNECT_CONNECTOR.getName(), metrics.getConnectorName()); + + pointList.addAll(genSinkPoint("ConnectConnector", metrics.getMetrics(), metrics.getTimestamp(), tagsMap)); + } + + return pointList; + } + private List genSinkPoint(String metricPre, Map metrics, long timeStamp, diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectClusterMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/cluster/ConnectClusterMetricESDAO.java similarity index 99% rename from km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectClusterMetricESDAO.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/cluster/ConnectClusterMetricESDAO.java index 31256efe..8d9626a5 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectClusterMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/cluster/ConnectClusterMetricESDAO.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.persistence.es.dao.connect; +package com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.cluster; import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse; import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr;