[Optimize]增加Connector运行状态指标 (#1110)

1、增加Connector运行状态指标;
2、将Connector指标上报普罗米修斯;
3、调整代码继承关系;
This commit is contained in:
EricZeng
2023-08-02 21:07:45 +08:00
committed by GitHub
parent bdffc10ca6
commit 55161e439a
14 changed files with 156 additions and 85 deletions

View File

@@ -24,9 +24,9 @@ import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterMetricService;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectMetricService;
import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.ConnectClusterMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.cluster.ConnectClusterMetricESDAO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@@ -43,7 +43,7 @@ import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultS
* @author didi
*/
@Service
public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService implements ConnectClusterMetricService {
public class ConnectClusterMetricServiceImpl extends BaseConnectMetricService implements ConnectClusterMetricService {
protected static final ILog LOGGER = LogFactory.getLog(ConnectClusterMetricServiceImpl.class);
public static final String CONNECT_CLUSTER_METHOD_GET_WORKER_METRIC_AVG = "getWorkerMetricAvg";
@@ -86,8 +86,7 @@ public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService
String connectClusterMetricKey = CollectedMetricsLocalCache.genConnectClusterMetricCacheKey(connectClusterPhyId, metric);
Float keyValue = CollectedMetricsLocalCache.getConnectClusterMetrics(connectClusterMetricKey);
if (keyValue != null) {
ConnectClusterMetrics connectClusterMetrics = ConnectClusterMetrics.initWithMetric(connectClusterPhyId,metric,keyValue);
return Result.buildSuc(connectClusterMetrics);
return Result.buildSuc(new ConnectClusterMetrics(connectClusterPhyId, metric, keyValue));
}
Result<ConnectClusterMetrics> ret = this.collectConnectClusterMetricsFromKafka(connectClusterPhyId, metric);
@@ -209,8 +208,7 @@ public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService
try {
//2、获取jmx指标
String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxInfo.getJmxObjectName()), jmxInfo.getJmxAttribute()).toString();
ConnectWorkerMetrics connectWorkerMetrics = ConnectWorkerMetrics.initWithMetric(connectClusterId, workerId, metric, Float.valueOf(value));
return Result.buildSuc(connectWorkerMetrics);
return Result.buildSuc(new ConnectWorkerMetrics(connectClusterId, workerId, metric, Float.valueOf(value)));
} catch (Exception e) {
LOGGER.error("method=getConnectWorkerMetricsByJMX||connectClusterId={}||workerId={}||metrics={}||jmx={}||msg={}",
connectClusterId, workerId, metric, jmxInfo.getJmxObjectName(), e.getClass().getName());
@@ -231,8 +229,8 @@ public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService
.collect(Collectors.toList());
}
protected List<MetricMultiLinesVO> metricMap2VO(Long connectClusterId,
Map<String/*metric*/, Map<Long, List<MetricPointVO>>> map){
private List<MetricMultiLinesVO> metricMap2VO(Long connectClusterId,
Map<String/*metric*/, Map<Long, List<MetricPointVO>>> map){
List<MetricMultiLinesVO> multiLinesVOS = new ArrayList<>();
if (map == null || map.isEmpty()) {
// 如果为空,则直接返回

View File

@@ -18,6 +18,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.connect.Connector
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectStatusEnum;
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
@@ -32,7 +33,7 @@ import com.xiaojukeji.know.streaming.km.core.service.connect.connector.Connector
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectMetricService;
import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.connector.ConnectorMetricESDAO;
import org.springframework.beans.factory.annotation.Autowired;
@@ -52,7 +53,7 @@ import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultS
* @author didi
*/
@Service
public class ConnectorMetricServiceImpl extends BaseConnectorMetricService implements ConnectorMetricService {
public class ConnectorMetricServiceImpl extends BaseConnectMetricService implements ConnectorMetricService {
protected static final ILog LOGGER = LogFactory.getLog(ConnectorMetricServiceImpl.class);
public static final String CONNECTOR_METHOD_DO_NOTHING = "doNothing";
@@ -67,6 +68,8 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
public static final String CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE = "getMetricHealthScore";
public static final String CONNECTOR_METHOD_GET_METRIC_RUNNING_STATUS = "getMetricRunningStatus";
@Autowired
private ConnectorMetricESDAO connectorMetricESDAO;
@@ -98,11 +101,12 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
@Override
protected void initRegisterVCHandler() {
registerVCHandler(CONNECTOR_METHOD_DO_NOTHING, this::doNothing);
registerVCHandler(CONNECTOR_METHOD_GET_CONNECT_WORKER_METRIC_SUM, this::getConnectWorkerMetricSum);
registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_AVG, this::getConnectorTaskMetricsAvg);
registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_MAX, this::getConnectorTaskMetricsMax);
registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_SUM, this::getConnectorTaskMetricsSum);
registerVCHandler(CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE, this::getMetricHealthScore);
registerVCHandler(CONNECTOR_METHOD_GET_CONNECT_WORKER_METRIC_SUM, this::getConnectWorkerMetricSum);
registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_AVG, this::getConnectorTaskMetricsAvg);
registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_MAX, this::getConnectorTaskMetricsMax);
registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_SUM, this::getConnectorTaskMetricsSum);
registerVCHandler(CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE, this::getMetricHealthScore);
registerVCHandler(CONNECTOR_METHOD_GET_METRIC_RUNNING_STATUS, this::getMetricRunningStatus);
}
@Override
@@ -111,8 +115,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
Float keyValue = CollectedMetricsLocalCache.getConnectorMetrics(connectorMetricKey);
if (null != keyValue) {
ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterPhyId, connectorName, metric, keyValue);
return Result.buildSuc(connectorMetrics);
return Result.buildSuc(new ConnectorMetrics(connectClusterPhyId, connectorName, metric, keyValue));
}
Result<ConnectorMetrics> ret = this.collectConnectClusterMetricsFromKafka(connectClusterPhyId, connectorName, metric);
@@ -216,6 +219,20 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
return Result.buildSuc(metrics);
}
private Result<ConnectorMetrics> getMetricRunningStatus(VersionItemParam metricParam) {
ConnectorMetricParam param = (ConnectorMetricParam) metricParam;
Long connectClusterId = param.getConnectClusterId();
String connectorName = param.getConnectorName();
String metricName = param.getMetricName();
ConnectorPO connector = connectorService.getConnectorFromDB(connectClusterId, connectorName);
if (connector == null) {
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metricName, (float)ConnectStatusEnum.UNKNOWN.getStatus()));
}
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metricName, (float)ConnectStatusEnum.getByValue(connector.getState()).getStatus()));
}
private Result<ConnectorMetrics> getConnectWorkerMetricSum(VersionItemParam metricParam) {
ConnectorMetricParam param = (ConnectorMetricParam) metricParam;
Long connectClusterId = param.getConnectClusterId();
@@ -240,12 +257,16 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
if (!isCollected) {
return Result.buildFailure(NOT_EXIST);
}
return Result.buildSuc(ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, sum));
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, sum));
}
//kafka.connect:type=connect-worker-metrics,connector="{connector}" 指标
private Result<ConnectorMetrics> getConnectorMetric(Long connectClusterId, String workerId, String connectorName, String metric, ConnectorTypeEnum connectorType) {
VersionConnectJmxInfo jmxInfo = getJMXInfo(connectClusterId, metric);
if (null == jmxInfo) {
return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);
}
if (jmxInfo.getType() != null) {
if (connectorType == null) {
@@ -257,9 +278,6 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
}
}
if (null == jmxInfo) {
return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);
}
String jmxObjectName = String.format(jmxInfo.getJmxObjectName(), connectorName);
JmxConnectorWrap jmxConnectorWrap = connectJMXClient.getClientWithCheck(connectClusterId, workerId);
@@ -270,8 +288,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
try {
//2、获取jmx指标
String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxObjectName), jmxInfo.getJmxAttribute()).toString();
ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, Float.valueOf(value));
return Result.buildSuc(connectorMetrics);
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, Float.valueOf(value)));
} catch (InstanceNotFoundException e) {
// 忽略该错误该错误出现的原因是该指标在JMX中不存在
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName));
@@ -296,8 +313,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
}
Float sum = ret.getData().stream().map(elem -> elem.getMetric(metric)).reduce(Float::sum).get();
ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, sum / ret.getData().size());
return Result.buildSuc(connectorMetrics);
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, sum / ret.getData().size()));
}
private Result<ConnectorMetrics> getConnectorTaskMetricsMax(VersionItemParam metricParam){
@@ -313,8 +329,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
}
Float max = ret.getData().stream().max((a, b) -> a.getMetric(metric).compareTo(b.getMetric(metric))).get().getMetric(metric);
ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, max);
return Result.buildSuc(connectorMetrics);
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, max));
}
private Result<ConnectorMetrics> getConnectorTaskMetricsSum(VersionItemParam metricParam){
@@ -330,8 +345,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
}
Float sum = ret.getData().stream().map(elem -> elem.getMetric(metric)).reduce(Float::sum).get();
ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, sum);
return Result.buildSuc(connectorMetrics);
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, sum));
}
@@ -358,6 +372,9 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
private Result<ConnectorTaskMetrics> getConnectorTaskMetric(Long connectClusterId, String workerId, String connectorName, Integer taskId, String metric, ConnectorTypeEnum connectorType) {
VersionConnectJmxInfo jmxInfo = getJMXInfo(connectClusterId, metric);
if (null == jmxInfo) {
return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);
}
if (jmxInfo.getType() != null) {
if (connectorType == null) {
@@ -369,9 +386,6 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
}
}
if (null == jmxInfo) {
return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);
}
String jmxObjectName=String.format(jmxInfo.getJmxObjectName(), connectorName, taskId);
JmxConnectorWrap jmxConnectorWrap = connectJMXClient.getClientWithCheck(connectClusterId, workerId);
@@ -382,8 +396,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
try {
//2、获取jmx指标
String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxObjectName), jmxInfo.getJmxAttribute()).toString();
ConnectorTaskMetrics connectorTaskMetrics = ConnectorTaskMetrics.initWithMetric(connectClusterId, connectorName, taskId, metric, Float.valueOf(value));
return Result.buildSuc(connectorTaskMetrics);
return Result.buildSuc(new ConnectorTaskMetrics(connectClusterId, connectorName, taskId, metric, Float.valueOf(value)));
} catch (Exception e) {
LOGGER.error("method=getConnectorTaskMetric||connectClusterId={}||workerId={}||connectorName={}||taskId={}||metrics={}||jmx={}||msg={}",
connectClusterId, workerId, connectorName, taskId, metric, jmxObjectName, e.getClass().getName());

View File

@@ -27,7 +27,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectMetricService;
import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient;
import org.springframework.beans.factory.annotation.Autowired;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.mm2.MirrorMakerMetricESDAO;
@@ -49,7 +49,7 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT
* @date 2022/12/15
*/
@Service
public class MirrorMakerMetricServiceImpl extends BaseConnectorMetricService implements MirrorMakerMetricService {
public class MirrorMakerMetricServiceImpl extends BaseConnectMetricService implements MirrorMakerMetricService {
protected static final ILog LOGGER = LogFactory.getLog(MirrorMakerMetricServiceImpl.class);
public static final String MIRROR_MAKER_METHOD_DO_NOTHING = "doNothing";
@@ -190,7 +190,7 @@ public class MirrorMakerMetricServiceImpl extends BaseConnectorMetricService imp
multiLinesVO.setMetricLines(metricLines);
multiLinesVOS.add(multiLinesVO);
}catch (Exception e){
} catch (Exception e){
LOGGER.error("method=metricMap2VO||connectClusterId={}||msg=exception!", connectClusterId, e);
}
}

View File

@@ -9,7 +9,7 @@ import java.util.stream.Collectors;
* @author wyb
* @date 2022/11/9
*/
public abstract class BaseConnectorMetricService extends BaseConnectorVersionControlService{
public abstract class BaseConnectMetricService extends BaseConnectVersionControlService {
private List<String> metricNames = new ArrayList<>();
@PostConstruct

View File

@@ -14,7 +14,7 @@ import javax.annotation.Nullable;
* @author wyb
* @date 2022/11/8
*/
public abstract class BaseConnectorVersionControlService extends BaseVersionControlService {
public abstract class BaseConnectVersionControlService extends BaseVersionControlService {
@Autowired
ConnectClusterService connectClusterService;

View File

@@ -24,6 +24,8 @@ public class ConnectorMetricVersionItems extends BaseMetricVersionMetric {
public static final String CONNECTOR_METRIC_HEALTH_STATE = "HealthState";
public static final String CONNECTOR_METRIC_RUNNING_STATUS = "RunningStatus";
public static final String CONNECTOR_METRIC_CONNECTOR_TOTAL_TASK_COUNT = "ConnectorTotalTaskCount";
public static final String CONNECTOR_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed";
@@ -128,6 +130,9 @@ public class ConnectorMetricVersionItems extends BaseMetricVersionMetric {
items.add(buildAllVersionsItem()
.name(CONNECTOR_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕机").desc("健康状态(0:好 1:中 2:差 3:宕机)").category(CATEGORY_HEALTH)
.extendMethod(CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE));
items.add(buildAllVersionsItem()
.name(CONNECTOR_METRIC_RUNNING_STATUS).unit("0:UNASSIGNED 1:RUNNING 2:PAUSED 3:FAILED 4:DESTROYED -1:UNKNOWN").desc("运行状态(0:UNASSIGNED 1:RUNNING 2:PAUSED 3:FAILED 4:DESTROYED -1:UNKNOWN)").category(CATEGORY_PERFORMANCE)
.extendMethod(CONNECTOR_METHOD_GET_METRIC_RUNNING_STATUS));
items.add(buildAllVersionsItem()
.name(CONNECTOR_METRIC_HEALTH_CHECK_PASSED).unit("").desc("健康项检查通过数").category(CATEGORY_HEALTH)
.extendMethod(CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE));