调整ES相关文件位置 & 补充connectESDAO相关类

This commit is contained in:
zengqiao
2022-12-06 17:59:39 +08:00
committed by EricZeng
parent cc2a590b33
commit 249fe7c700
74 changed files with 2640 additions and 967 deletions

View File

@@ -0,0 +1,35 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author zengqiao
* @date 20/6/17
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ConnectClusterMetrics extends BaseMetrics {
private Long connectClusterId;
public ConnectClusterMetrics(Long clusterPhyId, Long connectClusterId){
super(clusterPhyId);
this.connectClusterId = connectClusterId;
}
public static ConnectClusterMetrics initWithMetric(Long connectClusterId, String metric, Float value) {
ConnectClusterMetrics brokerMetrics = new ConnectClusterMetrics(connectClusterId, connectClusterId);
brokerMetrics.putMetric(metric, value);
return brokerMetrics;
}
@Override
public String unique() {
return "KCC@" + clusterPhyId + "@" + connectClusterId;
}
}

View File

@@ -0,0 +1,35 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author wyb
* @date 2022/11/2
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class ConnectWorkerMetrics extends BaseMetrics {
private Long connectClusterId;
private String workerId;
public static ConnectWorkerMetrics initWithMetric(Long connectClusterId, String workerId, String metric, Float value) {
ConnectWorkerMetrics connectWorkerMetrics = new ConnectWorkerMetrics();
connectWorkerMetrics.setConnectClusterId(connectClusterId);
connectWorkerMetrics.setWorkerId(workerId);
connectWorkerMetrics.putMetric(metric, value);
return connectWorkerMetrics;
}
@Override
public String unique() {
return "KCC@" + clusterPhyId + "@" + connectClusterId + "@" + workerId;
}
}

View File

@@ -0,0 +1,39 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author zengqiao
* @date 20/6/17
*/
@Data
@NoArgsConstructor
@ToString
public class ConnectorMetrics extends BaseMetrics {
private Long connectClusterId;
private String connectorName;
private String connectorNameAndClusterId;
public ConnectorMetrics(Long connectClusterId, String connectorName) {
super(null);
this.connectClusterId = connectClusterId;
this.connectorName = connectorName;
this.connectorNameAndClusterId = connectorName + "#" + connectClusterId;
}
public static ConnectorMetrics initWithMetric(Long connectClusterId, String connectorName, String metricName, Float value) {
ConnectorMetrics metrics = new ConnectorMetrics(connectClusterId, connectorName);
metrics.putMetric(metricName, value);
return metrics;
}
@Override
public String unique() {
return "KCOR@" + connectClusterId + "@" + connectorName;
}
}

View File

@@ -0,0 +1,39 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author wyb
* @date 2022/11/4
*/
@Data
@NoArgsConstructor
@ToString
public class ConnectorTaskMetrics extends BaseMetrics {
private Long connectClusterId;
private String connectorName;
private Integer taskId;
public ConnectorTaskMetrics(Long connectClusterId, String connectorName, Integer taskId) {
this.connectClusterId = connectClusterId;
this.connectorName = connectorName;
this.taskId = taskId;
}
public static ConnectorTaskMetrics initWithMetric(Long connectClusterId, String connectorName, Integer taskId, String metricName, Float value) {
ConnectorTaskMetrics metrics = new ConnectorTaskMetrics(connectClusterId, connectorName, taskId);
metrics.putMetric(metricName,value);
return metrics;
}
@Override
public String unique() {
return "KCOR@" + connectClusterId + "@" + connectorName + "@" + taskId;
}
}

View File

@@ -0,0 +1,30 @@
package com.xiaojukeji.know.streaming.km.common.bean.po.metrice.connect;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BaseMetricESPO;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import static com.xiaojukeji.know.streaming.km.common.utils.CommonUtils.monitorTimestamp2min;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ConnectClusterMetricPO extends BaseMetricESPO {
private Long connectClusterId;
public ConnectClusterMetricPO(Long kafkaClusterPhyId, Long connectClusterId){
super(kafkaClusterPhyId);
this.connectClusterId = connectClusterId;
}
@Override
public String getKey() {
return "KCC@" + clusterPhyId + "@" + connectClusterId + "@" + monitorTimestamp2min(timestamp);
}
@Override
public String getRoutingValue() {
return String.valueOf(connectClusterId);
}
}

View File

@@ -0,0 +1,39 @@
package com.xiaojukeji.know.streaming.km.common.bean.po.metrice.connect;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BaseMetricESPO;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import static com.xiaojukeji.know.streaming.km.common.utils.CommonUtils.monitorTimestamp2min;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ConnectorMetricPO extends BaseMetricESPO {
private Long connectClusterId;
private String connectorName;
/**
* 用于es内部排序
*/
private String connectorNameAndClusterId;
public ConnectorMetricPO(Long kafkaClusterPhyId, Long connectClusterId, String connectorName){
super(kafkaClusterPhyId);
this.connectClusterId = connectClusterId;
this.connectorName = connectorName;
this.connectorNameAndClusterId = connectorName + "#" + connectClusterId;
}
@Override
public String getKey() {
return "KCOR@" + clusterPhyId + "@" + connectClusterId + "@" + connectorName + "@" + monitorTimestamp2min(timestamp);
}
@Override
public String getRoutingValue() {
return String.valueOf(connectClusterId);
}
}

View File

@@ -1,709 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.constant;
public class ESIndexConstant {
public final static String TOPIC_INDEX = "ks_kafka_topic_metric";
public final static String TOPIC_TEMPLATE = "{\n" +
" \"order\" : 10,\n" +
" \"index_patterns\" : [\n" +
" \"ks_kafka_topic_metric*\"\n" +
" ],\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : \"10\"\n" +
" }\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"properties\" : {\n" +
" \"brokerId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"routingValue\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"topic\" : {\n" +
" \"type\" : \"keyword\"\n" +
" },\n" +
" \"clusterPhyId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"metrics\" : {\n" +
" \"properties\" : {\n" +
" \"BytesIn_min_15\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"Messages\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"BytesRejected\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"PartitionURP\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"HealthCheckTotal\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"ReplicationCount\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"ReplicationBytesOut\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"ReplicationBytesIn\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"FailedFetchRequests\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"BytesIn_min_5\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"HealthScore\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"LogSize\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"BytesOut\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"BytesOut_min_15\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"FailedProduceRequests\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"BytesIn\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"BytesOut_min_5\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"MessagesIn\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"TotalProduceRequests\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"HealthCheckPassed\" : {\n" +
" \"type\" : \"float\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"brokerAgg\" : {\n" +
" \"type\" : \"keyword\"\n" +
" },\n" +
" \"key\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"timestamp\" : {\n" +
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
" \"index\" : true,\n" +
" \"type\" : \"date\",\n" +
" \"doc_values\" : true\n" +
" }\n" +
" }\n" +
" },\n" +
" \"aliases\" : { }\n" +
" }";
public final static String CLUSTER_INDEX = "ks_kafka_cluster_metric";
public final static String CLUSTER_TEMPLATE = "{\n" +
" \"order\" : 10,\n" +
" \"index_patterns\" : [\n" +
" \"ks_kafka_cluster_metric*\"\n" +
" ],\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : \"10\"\n" +
" }\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"properties\" : {\n" +
" \"routingValue\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"clusterPhyId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"metrics\" : {\n" +
" \"properties\" : {\n" +
" \"Connections\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"BytesIn_min_15\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"PartitionURP\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthScore_Topics\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"EventQueueSize\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"ActiveControllerCount\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"GroupDeads\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"BytesIn_min_5\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthCheckTotal_Topics\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"Partitions\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"BytesOut\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"Groups\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"BytesOut_min_15\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"TotalRequestQueueSize\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthCheckPassed_Groups\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"TotalProduceRequests\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthCheckPassed\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"TotalLogSize\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"GroupEmptys\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"PartitionNoLeader\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthScore_Brokers\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"Messages\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"Topics\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"PartitionMinISR_E\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthCheckTotal\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"Brokers\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"Replicas\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthCheckTotal_Groups\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"GroupRebalances\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"MessageIn\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthScore\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthCheckPassed_Topics\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthCheckTotal_Brokers\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"PartitionMinISR_S\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"BytesIn\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"BytesOut_min_5\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"GroupActives\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"MessagesIn\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"GroupReBalances\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthCheckPassed_Brokers\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthScore_Groups\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"TotalResponseQueueSize\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"Zookeepers\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"LeaderMessages\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthScore_Cluster\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthCheckPassed_Cluster\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"HealthCheckTotal_Cluster\" : {\n" +
" \"type\" : \"double\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"key\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"timestamp\" : {\n" +
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
" \"type\" : \"date\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"aliases\" : { }\n" +
" }";
public final static String BROKER_INDEX = "ks_kafka_broker_metric";
public final static String BROKER_TEMPLATE = "{\n" +
" \"order\" : 10,\n" +
" \"index_patterns\" : [\n" +
" \"ks_kafka_broker_metric*\"\n" +
" ],\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : \"10\"\n" +
" }\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"properties\" : {\n" +
" \"brokerId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"routingValue\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"clusterPhyId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"metrics\" : {\n" +
" \"properties\" : {\n" +
" \"NetworkProcessorAvgIdle\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"UnderReplicatedPartitions\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"BytesIn_min_15\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"HealthCheckTotal\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"RequestHandlerAvgIdle\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"connectionsCount\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"BytesIn_min_5\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"HealthScore\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"BytesOut\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"BytesOut_min_15\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"BytesIn\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"BytesOut_min_5\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"TotalRequestQueueSize\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"MessagesIn\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"TotalProduceRequests\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"HealthCheckPassed\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"TotalResponseQueueSize\" : {\n" +
" \"type\" : \"float\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"key\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"timestamp\" : {\n" +
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
" \"index\" : true,\n" +
" \"type\" : \"date\",\n" +
" \"doc_values\" : true\n" +
" }\n" +
" }\n" +
" },\n" +
" \"aliases\" : { }\n" +
" }";
public final static String PARTITION_INDEX = "ks_kafka_partition_metric";
public final static String PARTITION_TEMPLATE = "{\n" +
" \"order\" : 10,\n" +
" \"index_patterns\" : [\n" +
" \"ks_kafka_partition_metric*\"\n" +
" ],\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : \"10\"\n" +
" }\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"properties\" : {\n" +
" \"brokerId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"partitionId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"routingValue\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"clusterPhyId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"topic\" : {\n" +
" \"type\" : \"keyword\"\n" +
" },\n" +
" \"metrics\" : {\n" +
" \"properties\" : {\n" +
" \"LogStartOffset\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"Messages\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"LogEndOffset\" : {\n" +
" \"type\" : \"float\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"key\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"timestamp\" : {\n" +
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
" \"index\" : true,\n" +
" \"type\" : \"date\",\n" +
" \"doc_values\" : true\n" +
" }\n" +
" }\n" +
" },\n" +
" \"aliases\" : { }\n" +
" }";
public final static String GROUP_INDEX = "ks_kafka_group_metric";
public final static String GROUP_TEMPLATE = "{\n" +
" \"order\" : 10,\n" +
" \"index_patterns\" : [\n" +
" \"ks_kafka_group_metric*\"\n" +
" ],\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : \"10\"\n" +
" }\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"properties\" : {\n" +
" \"group\" : {\n" +
" \"type\" : \"keyword\"\n" +
" },\n" +
" \"partitionId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"routingValue\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"clusterPhyId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"topic\" : {\n" +
" \"type\" : \"keyword\"\n" +
" },\n" +
" \"metrics\" : {\n" +
" \"properties\" : {\n" +
" \"HealthScore\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"Lag\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"OffsetConsumed\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"HealthCheckTotal\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"HealthCheckPassed\" : {\n" +
" \"type\" : \"float\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"groupMetric\" : {\n" +
" \"type\" : \"keyword\"\n" +
" },\n" +
" \"key\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"timestamp\" : {\n" +
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
" \"index\" : true,\n" +
" \"type\" : \"date\",\n" +
" \"doc_values\" : true\n" +
" }\n" +
" }\n" +
" },\n" +
" \"aliases\" : { }\n" +
" }";
public final static String REPLICATION_INDEX = "ks_kafka_replication_metric";
public final static String REPLICATION_TEMPLATE = "{\n" +
" \"order\" : 10,\n" +
" \"index_patterns\" : [\n" +
" \"ks_kafka_replication_metric*\"\n" +
" ],\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : \"10\"\n" +
" }\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"properties\" : {\n" +
" \"brokerId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"partitionId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"routingValue\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"clusterPhyId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"topic\" : {\n" +
" \"type\" : \"keyword\"\n" +
" },\n" +
" \"metrics\" : {\n" +
" \"properties\" : {\n" +
" \"LogStartOffset\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"Messages\" : {\n" +
" \"type\" : \"float\"\n" +
" },\n" +
" \"LogEndOffset\" : {\n" +
" \"type\" : \"float\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"key\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"timestamp\" : {\n" +
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
" \"index\" : true,\n" +
" \"type\" : \"date\",\n" +
" \"doc_values\" : true\n" +
" }\n" +
" }\n" +
" },\n" +
" \"aliases\" : { }\n" +
" }";
public final static String ZOOKEEPER_INDEX = "ks_kafka_zookeeper_metric";
public final static String ZOOKEEPER_TEMPLATE = "{\n" +
" \"order\" : 10,\n" +
" \"index_patterns\" : [\n" +
" \"ks_kafka_zookeeper_metric*\"\n" +
" ],\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : \"10\"\n" +
" }\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"properties\" : {\n" +
" \"routingValue\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"clusterPhyId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"metrics\" : {\n" +
" \"properties\" : {\n" +
" \"AvgRequestLatency\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"MinRequestLatency\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"MaxRequestLatency\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"OutstandingRequests\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"NodeCount\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"WatchCount\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"NumAliveConnections\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"PacketsReceived\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"PacketsSent\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"EphemeralsCount\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"ApproximateDataSize\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"OpenFileDescriptorCount\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"MaxFileDescriptorCount\" : {\n" +
" \"type\" : \"double\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"key\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"timestamp\" : {\n" +
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
" \"type\" : \"date\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"aliases\" : { }\n" +
" }";
}

View File

@@ -1,4 +1,3 @@
PUT _template/ks_kafka_broker_metric
{ {
"order" : 10, "order" : 10,
"index_patterns" : [ "index_patterns" : [
@@ -6,7 +5,7 @@ PUT _template/ks_kafka_broker_metric
], ],
"settings" : { "settings" : {
"index" : { "index" : {
"number_of_shards" : "10" "number_of_shards" : "2"
} }
}, },
"mappings" : { "mappings" : {

View File

@@ -1,4 +1,3 @@
PUT _template/ks_kafka_cluster_metric
{ {
"order" : 10, "order" : 10,
"index_patterns" : [ "index_patterns" : [
@@ -6,7 +5,7 @@ PUT _template/ks_kafka_cluster_metric
], ],
"settings" : { "settings" : {
"index" : { "index" : {
"number_of_shards" : "10" "number_of_shards" : "2"
} }
}, },
"mappings" : { "mappings" : {
@@ -184,4 +183,4 @@ PUT _template/ks_kafka_cluster_metric
} }
}, },
"aliases" : { } "aliases" : { }
} }

View File

@@ -0,0 +1,86 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_connect_cluster_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "2"
}
},
"mappings" : {
"properties" : {
"connectClusterId" : {
"type" : "long"
},
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"clusterPhyId" : {
"type" : "long"
},
"metrics" : {
"properties" : {
"ConnectorCount" : {
"type" : "float"
},
"TaskCount" : {
"type" : "float"
},
"ConnectorStartupAttemptsTotal" : {
"type" : "float"
},
"ConnectorStartupFailurePercentage" : {
"type" : "float"
},
"ConnectorStartupFailureTotal" : {
"type" : "float"
},
"ConnectorStartupSuccessPercentage" : {
"type" : "float"
},
"ConnectorStartupSuccessTotal" : {
"type" : "float"
},
"TaskStartupAttemptsTotal" : {
"type" : "float"
},
"TaskStartupFailurePercentage" : {
"type" : "float"
},
"TaskStartupFailureTotal" : {
"type" : "float"
},
"TaskStartupSuccessPercentage" : {
"type" : "float"
},
"TaskStartupSuccessTotal" : {
"type" : "float"
}
}
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"index" : true,
"type" : "date",
"doc_values" : true
}
}
},
"aliases" : { }
}

View File

@@ -0,0 +1,194 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_connect_connector_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "2"
}
},
"mappings" : {
"properties" : {
"connectClusterId" : {
"type" : "long"
},
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"connectorName" : {
"type" : "keyword"
},
"connectorNameAndClusterId" : {
"type" : "keyword"
},
"clusterPhyId" : {
"type" : "long"
},
"metrics" : {
"properties" : {
"HealthState" : {
"type" : "float"
},
"ConnectorTotalTaskCount" : {
"type" : "float"
},
"HealthCheckPassed" : {
"type" : "float"
},
"HealthCheckTotal" : {
"type" : "float"
},
"ConnectorRunningTaskCount" : {
"type" : "float"
},
"ConnectorPausedTaskCount" : {
"type" : "float"
},
"ConnectorFailedTaskCount" : {
"type" : "float"
},
"ConnectorUnassignedTaskCount" : {
"type" : "float"
},
"BatchSizeAvg" : {
"type" : "float"
},
"BatchSizeMax" : {
"type" : "float"
},
"OffsetCommitAvgTimeMs" : {
"type" : "float"
},
"OffsetCommitMaxTimeMs" : {
"type" : "float"
},
"OffsetCommitFailurePercentage" : {
"type" : "float"
},
"OffsetCommitSuccessPercentage" : {
"type" : "float"
},
"PollBatchAvgTimeMs" : {
"type" : "float"
},
"PollBatchMaxTimeMs" : {
"type" : "float"
},
"SourceRecordActiveCount" : {
"type" : "float"
},
"SourceRecordActiveCountAvg" : {
"type" : "float"
},
"SourceRecordActiveCountMax" : {
"type" : "float"
},
"SourceRecordPollRate" : {
"type" : "float"
},
"SourceRecordPollTotal" : {
"type" : "float"
},
"SourceRecordWriteRate" : {
"type" : "float"
},
"SourceRecordWriteTotal" : {
"type" : "float"
},
"OffsetCommitCompletionRate" : {
"type" : "float"
},
"OffsetCommitCompletionTotal" : {
"type" : "float"
},
"OffsetCommitSkipRate" : {
"type" : "float"
},
"OffsetCommitSkipTotal" : {
"type" : "float"
},
"PartitionCount" : {
"type" : "float"
},
"PutBatchAvgTimeMs" : {
"type" : "float"
},
"PutBatchMaxTimeMs" : {
"type" : "float"
},
"SinkRecordActiveCount" : {
"type" : "float"
},
"SinkRecordActiveCountAvg" : {
"type" : "float"
},
"SinkRecordActiveCountMax" : {
"type" : "float"
},
"SinkRecordLagMax" : {
"type" : "float"
},
"SinkRecordReadRate" : {
"type" : "float"
},
"SinkRecordReadTotal" : {
"type" : "float"
},
"SinkRecordSendRate" : {
"type" : "float"
},
"SinkRecordSendTotal" : {
"type" : "float"
},
"DeadletterqueueProduceFailures" : {
"type" : "float"
},
"DeadletterqueueProduceRequests" : {
"type" : "float"
},
"LastErrorTimestamp" : {
"type" : "float"
},
"TotalErrorsLogged" : {
"type" : "float"
},
"TotalRecordErrors" : {
"type" : "float"
},
"TotalRecordFailures" : {
"type" : "float"
},
"TotalRecordsSkipped" : {
"type" : "float"
},
"TotalRetries" : {
"type" : "float"
}
}
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"index" : true,
"type" : "date",
"doc_values" : true
}
}
},
"aliases" : { }
}

View File

@@ -1,4 +1,3 @@
PUT _template/ks_kafka_group_metric
{ {
"order" : 10, "order" : 10,
"index_patterns" : [ "index_patterns" : [

View File

@@ -1,4 +1,3 @@
PUT _template/ks_kafka_partition_metric
{ {
"order" : 10, "order" : 10,
"index_patterns" : [ "index_patterns" : [

View File

@@ -1,4 +1,3 @@
PUT _template/ks_kafka_replication_metric
{ {
"order" : 10, "order" : 10,
"index_patterns" : [ "index_patterns" : [

View File

@@ -1,4 +1,3 @@
PUT _template/ks_kafka_topic_metric
{ {
"order" : 10, "order" : 10,
"index_patterns" : [ "index_patterns" : [

View File

@@ -1,4 +1,3 @@
PUT _template/ks_kafka_zookeeper_metric
{ {
"order" : 10, "order" : 10,
"index_patterns" : [ "index_patterns" : [

View File

@@ -0,0 +1,155 @@
package com.xiaojukeji.know.streaming.km.persistence.es;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.DefaultJSONParser;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.didiglobal.logi.log.ILog;
import com.google.common.collect.Lists;
import com.xiaojukeji.know.streaming.km.common.utils.LoggerUtil;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author didi
*/
public class ESFileLoader {
private static final ILog LOGGER = LoggerUtil.getESLogger();
public Map<String, String> loaderFileContext(String filePath, Field[] fields) {
LOGGER.info("method=loaderFileContext||DslLoaderUtil init start.");
List<String> dslFileNames = Lists.newLinkedList();
Map<String, String> fileContextMap = new HashMap<>();
if(null == fields || 0 == fields.length){
return fileContextMap;
}
// 反射获取接口中定义的变量中的值
for (int i = 0; i < fields.length; ++i) {
fields[i].setAccessible(true);
try {
dslFileNames.add(fields[i].get(null).toString());
} catch (IllegalAccessException e) {
LOGGER.error("method=loaderFileContext||errMsg=fail to read {} error. ", fields[i].getName(),
e);
}
}
// 加载dsl文件及内容
for (String fileName : dslFileNames) {
fileContextMap.put(fileName, readEsFileInJarFile(filePath, fileName));
}
// 输出加载的查询语句
LOGGER.info("method=loaderFileContext||msg=dsl files count {}", fileContextMap.size());
for (Map.Entry<String/*fileRelativePath*/, String/*dslContent*/> entry : fileContextMap.entrySet()) {
LOGGER.info("method=loaderFileContext||msg=file name {}, dsl content {}", entry.getKey(),
entry.getValue());
}
LOGGER.info("method=loaderFileContext||DslLoaderUtil init finished.");
return fileContextMap;
}
/**
* 去除json中的空格
*
* @param sourceDsl
* @return
*/
public String trimJsonBank(String sourceDsl) {
List<String> dslList = Lists.newArrayList();
DefaultJSONParser parser = null;
Object obj = null;
String dsl = sourceDsl;
// 解析多个json直到pos为0
for (;;) {
try {
// 这里需要Feature.OrderedField.getMask()保持有序
parser = new DefaultJSONParser(dsl, ParserConfig.getGlobalInstance(),
JSON.DEFAULT_PARSER_FEATURE | Feature.OrderedField.getMask());
obj = parser.parse();
} catch (Exception t) {
LOGGER.error("method=trimJsonBank||errMsg=parse json {} error. ", dsl, t);
}
if (obj == null) {
break;
}
if (obj instanceof JSONObject) {
dslList.add( JSON.toJSONString(obj, SerializerFeature.WriteMapNullValue));
int pos = parser.getLexer().pos();
if (pos <= 0) {
break;
}
dsl = dsl.substring(pos);
parser.getLexer().close();
} else {
parser.getLexer().close();
break;
}
}
// 格式化异常或者有多个查询语句,返回原来的查询语句
if (dslList.isEmpty() || dslList.size() > 1) {
return sourceDsl;
}
return dslList.get(0);
}
/**
* 从jar包中读取es相关的语句文件
*
* @param fileName
* @return
*/
private String readEsFileInJarFile(String filePath, String fileName) {
InputStream inputStream = this.getClass().getClassLoader()
.getResourceAsStream( filePath + fileName);
if (inputStream != null) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
List<String> lines = Lists.newLinkedList();
try {
while ((line = bufferedReader.readLine()) != null) {
lines.add(line);
}
return StringUtils.join(lines, "");
} catch (IOException e) {
LOGGER.error("method=readDslFileInJarFile||errMsg=read file {} error. ", fileName,
e);
return "";
} finally {
try {
inputStream.close();
} catch (IOException e) {
LOGGER.error(
"method=readDslFileInJarFile||errMsg=fail to close file {} error. ",
fileName, e);
}
}
} else {
LOGGER.error("method=readDslFileInJarFile||errMsg=fail to read file {} content",
fileName);
return "";
}
}
}

View File

@@ -13,8 +13,10 @@ import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.utils.IndexNameUtils; import com.xiaojukeji.know.streaming.km.common.utils.IndexNameUtils;
import com.xiaojukeji.know.streaming.km.persistence.es.BaseESDAO; import com.xiaojukeji.know.streaming.km.persistence.es.BaseESDAO;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateLoaderUtil;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@@ -29,8 +31,7 @@ public class BaseMetricESDAO extends BaseESDAO {
/** /**
* 操作的索引名称 * 操作的索引名称
*/ */
protected String indexName; protected String indexName;
protected String indexTemplate;
protected static final Long ONE_MIN = 60 * 1000L; protected static final Long ONE_MIN = 60 * 1000L;
protected static final Long FIVE_MIN = 5 * ONE_MIN; protected static final Long FIVE_MIN = 5 * ONE_MIN;
@@ -44,6 +45,9 @@ public class BaseMetricESDAO extends BaseESDAO {
*/ */
private static Map<String, BaseMetricESDAO> ariusStatsEsDaoMap = Maps.newConcurrentMap(); private static Map<String, BaseMetricESDAO> ariusStatsEsDaoMap = Maps.newConcurrentMap();
@Autowired
private TemplateLoaderUtil templateLoaderUtil;
/** /**
* es 地址 * es 地址
*/ */
@@ -56,6 +60,7 @@ public class BaseMetricESDAO extends BaseESDAO {
@Scheduled(cron = "0 3/5 * * * ?") @Scheduled(cron = "0 3/5 * * * ?")
public void checkCurrentDayIndexExist(){ public void checkCurrentDayIndexExist(){
try { try {
String indexTemplate = templateLoaderUtil.getContextByFileName(indexName);
esOpClient.createIndexTemplateIfNotExist(indexName, indexTemplate); esOpClient.createIndexTemplateIfNotExist(indexName, indexTemplate);
//检查最近7天索引存在不存 //检查最近7天索引存在不存
@@ -103,6 +108,15 @@ public class BaseMetricESDAO extends BaseESDAO {
ariusStatsEsDaoMap.put(statsType, baseAriusStatsEsDao); ariusStatsEsDaoMap.put(statsType, baseAriusStatsEsDao);
} }
/**
* 注册不同维度数据对应操作的es类
*
* @param baseAriusStatsEsDao
*/
public void register(BaseMetricESDAO baseAriusStatsEsDao) {
BaseMetricESDAO.register(indexName, baseAriusStatsEsDao);
}
/** /**
* 批量插入索引统计信息 * 批量插入索引统计信息
@@ -416,7 +430,7 @@ public class BaseMetricESDAO extends BaseESDAO {
Long endTime = System.currentTimeMillis(); Long endTime = System.currentTimeMillis();
Long startTime = endTime - 12 * ONE_HOUR; Long startTime = endTime - 12 * ONE_HOUR;
String dsl = dslLoaderUtil.getFormatDslByFileName(DslsConstant.GET_LATEST_METRIC_TIME, startTime, endTime, appendQueryDsl); String dsl = dslLoaderUtil.getFormatDslByFileName( DslConstant.GET_LATEST_METRIC_TIME, startTime, endTime, appendQueryDsl);
String realIndexName = IndexNameUtils.genDailyIndexName(indexName, startTime, endTime); String realIndexName = IndexNameUtils.genDailyIndexName(indexName, startTime, endTime);
return esOpClient.performRequest( return esOpClient.performRequest(

View File

@@ -9,7 +9,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPoint
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@@ -18,16 +18,15 @@ import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.BROKER_INDEX;
@Component @Component
public class BrokerMetricESDAO extends BaseMetricESDAO { public class BrokerMetricESDAO extends BaseMetricESDAO {
@PostConstruct @PostConstruct
public void init() { public void init() {
super.indexName = BROKER_INDEX; super.indexName = BROKER_INDEX;
super.indexTemplate = BROKER_TEMPLATE;
checkCurrentDayIndexExist(); checkCurrentDayIndexExist();
BaseMetricESDAO.register(indexName, this); register( this);
} }
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("BrokerMetricESDAO", 4,8, 500); protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("BrokerMetricESDAO", 4,8, 500);
@@ -40,7 +39,7 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
Long startTime = endTime - FIVE_MIN; Long startTime = endTime - FIVE_MIN;
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_BROKER_LATEST_METRICS, clusterId, brokerId, startTime, endTime); DslConstant.GET_BROKER_LATEST_METRICS, clusterId, brokerId, startTime, endTime);
BrokerMetricPO brokerMetricPO = esOpClient.performRequestAndTakeFirst( BrokerMetricPO brokerMetricPO = esOpClient.performRequestAndTakeFirst(
brokerId.toString(), brokerId.toString(),
@@ -68,7 +67,7 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
String aggDsl = buildAggsDSL(metrics, aggType); String aggDsl = buildAggsDSL(metrics, aggType);
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_BROKER_AGG_SINGLE_METRICS, clusterPhyId, brokerId, startTime, endTime, aggDsl); DslConstant.GET_BROKER_AGG_SINGLE_METRICS, clusterPhyId, brokerId, startTime, endTime, aggDsl);
return esOpClient.performRequestWithRouting( return esOpClient.performRequestWithRouting(
String.valueOf(brokerId), String.valueOf(brokerId),
@@ -132,7 +131,7 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
for(Long brokerId : brokerIds){ for(Long brokerId : brokerIds){
try { try {
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_BROKER_AGG_LIST_METRICS, DslConstant.GET_BROKER_AGG_LIST_METRICS,
clusterPhyId, clusterPhyId,
brokerId, brokerId,
startTime, startTime,
@@ -154,8 +153,8 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
); );
synchronized (table) { synchronized (table) {
for(String metric : metricMap.keySet()){ for(Map.Entry<String, List<MetricPointVO>> entry: metricMap.entrySet()){
table.put(metric, brokerId, metricMap.get(metric)); table.put(entry.getKey(), brokerId, entry.getValue());
} }
} }
}); });
@@ -187,7 +186,7 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
//4、查询es //4、查询es
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_BROKER_AGG_TOP_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl); DslConstant.GET_BROKER_AGG_TOP_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl);
return esOpClient.performRequest(realIndex, dsl, return esOpClient.performRequest(realIndex, dsl,
s -> handleTopBrokerESQueryResponse(s, metrics, topN), 3); s -> handleTopBrokerESQueryResponse(s, metrics, topN), 3);

View File

@@ -12,7 +12,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@@ -23,17 +23,16 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.CLUSTER_INDEX;
@Component @Component
public class ClusterMetricESDAO extends BaseMetricESDAO { public class ClusterMetricESDAO extends BaseMetricESDAO {
@PostConstruct @PostConstruct
public void init() { public void init() {
super.indexName = CLUSTER_INDEX; super.indexName = CLUSTER_INDEX;
super.indexTemplate = CLUSTER_TEMPLATE;
checkCurrentDayIndexExist(); checkCurrentDayIndexExist();
BaseMetricESDAO.register(indexName, this); register(this);
} }
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("ClusterMetricESDAO", 4,8, 500); protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("ClusterMetricESDAO", 4,8, 500);
@@ -46,7 +45,7 @@ public class ClusterMetricESDAO extends BaseMetricESDAO {
Long startTime = endTime - FIVE_MIN; Long startTime = endTime - FIVE_MIN;
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_CLUSTER_LATEST_METRICS, clusterId, startTime, endTime); DslConstant.GET_CLUSTER_LATEST_METRICS, clusterId, startTime, endTime);
ClusterMetricPO clusterMetricPO = esOpClient.performRequestAndTakeFirst( ClusterMetricPO clusterMetricPO = esOpClient.performRequestAndTakeFirst(
clusterId.toString(), realIndex(startTime, endTime), dsl, ClusterMetricPO.class); clusterId.toString(), realIndex(startTime, endTime), dsl, ClusterMetricPO.class);
@@ -67,7 +66,7 @@ public class ClusterMetricESDAO extends BaseMetricESDAO {
String aggDsl = buildAggsDSL(metrics, aggType); String aggDsl = buildAggsDSL(metrics, aggType);
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_CLUSTER_AGG_SINGLE_METRICS, clusterPhyId, startTime, endTime, aggDsl); DslConstant.GET_CLUSTER_AGG_SINGLE_METRICS, clusterPhyId, startTime, endTime, aggDsl);
return esOpClient.performRequestWithRouting(String.valueOf(clusterPhyId), realIndex, dsl, return esOpClient.performRequestWithRouting(String.valueOf(clusterPhyId), realIndex, dsl,
s -> handleSingleESQueryResponse(s, metrics, aggType), 3); s -> handleSingleESQueryResponse(s, metrics, aggType), 3);
@@ -103,7 +102,7 @@ public class ClusterMetricESDAO extends BaseMetricESDAO {
} }
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.LIST_CLUSTER_WITH_LATEST_METRICS, latestMetricTime, appendQueryDsl.toString(), sortDsl); DslConstant.LIST_CLUSTER_WITH_LATEST_METRICS, latestMetricTime, appendQueryDsl.toString(), sortDsl);
return esOpClient.performRequest(realIndex, dsl, ClusterMetricPO.class); return esOpClient.performRequest(realIndex, dsl, ClusterMetricPO.class);
} }
@@ -133,7 +132,7 @@ public class ClusterMetricESDAO extends BaseMetricESDAO {
5000, 5000,
() -> { () -> {
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_CLUSTER_AGG_LIST_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl); DslConstant.GET_CLUSTER_AGG_LIST_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl);
Map<String/*metric*/, List<MetricPointVO>> metricMap = esOpClient.performRequestWithRouting( Map<String/*metric*/, List<MetricPointVO>> metricMap = esOpClient.performRequestWithRouting(
String.valueOf(clusterPhyId), realIndex, dsl, String.valueOf(clusterPhyId), realIndex, dsl,

View File

@@ -12,7 +12,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPoint
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@@ -23,17 +23,16 @@ import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.constant.Constant.ZERO; import static com.xiaojukeji.know.streaming.km.common.constant.Constant.ZERO;
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.GROUP_INDEX;
@Component @Component
public class GroupMetricESDAO extends BaseMetricESDAO { public class GroupMetricESDAO extends BaseMetricESDAO {
@PostConstruct @PostConstruct
public void init() { public void init() {
super.indexName = GROUP_INDEX; super.indexName = GROUP_INDEX;
super.indexTemplate = GROUP_TEMPLATE;
checkCurrentDayIndexExist(); checkCurrentDayIndexExist();
BaseMetricESDAO.register(indexName, this); register(this);
} }
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("GroupMetricESDAO", 4,8, 500); protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("GroupMetricESDAO", 4,8, 500);
@@ -59,7 +58,7 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
String topic = groupTopic.getTopicName(); String topic = groupTopic.getTopicName();
try { try {
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.LIST_GROUP_LATEST_METRICS_BY_GROUP_TOPIC, clusterPhyId, group, topic, DslConstant.LIST_GROUP_LATEST_METRICS_BY_GROUP_TOPIC, clusterPhyId, group, topic,
startTime, latestTime, aggDsl); startTime, latestTime, aggDsl);
String routing = routing(clusterPhyId, group); String routing = routing(clusterPhyId, group);
@@ -86,7 +85,7 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
String realIndex = realIndex(startTime, latestTime); String realIndex = realIndex(startTime, latestTime);
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.LIST_GROUP_LATEST_METRICS_OF_PARTITION, clusterPhyId, group, topic, latestTime); DslConstant.LIST_GROUP_LATEST_METRICS_OF_PARTITION, clusterPhyId, group, topic, latestTime);
List<GroupMetricPO> groupMetricPOS = esOpClient.performRequest(realIndex, dsl, GroupMetricPO.class); List<GroupMetricPO> groupMetricPOS = esOpClient.performRequest(realIndex, dsl, GroupMetricPO.class);
return filterMetrics(groupMetricPOS, metrics); return filterMetrics(groupMetricPOS, metrics);
@@ -101,8 +100,8 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
String matchDsl = buildTermsDsl(Arrays.asList(match)); String matchDsl = buildTermsDsl(Arrays.asList(match));
String dsl = match.isEqual() String dsl = match.isEqual()
? dslLoaderUtil.getFormatDslByFileName(DslsConstant.COUNT_GROUP_METRIC_VALUE, clusterPhyId, groupName, startTime, endTime, matchDsl) ? dslLoaderUtil.getFormatDslByFileName( DslConstant.COUNT_GROUP_METRIC_VALUE, clusterPhyId, groupName, startTime, endTime, matchDsl)
: dslLoaderUtil.getFormatDslByFileName(DslsConstant.COUNT_GROUP_NOT_METRIC_VALUE, clusterPhyId, groupName, startTime, endTime, matchDsl); : dslLoaderUtil.getFormatDslByFileName( DslConstant.COUNT_GROUP_NOT_METRIC_VALUE, clusterPhyId, groupName, startTime, endTime, matchDsl);
return esOpClient.performRequestWithRouting(clusterPhyId.toString() + "@" + groupName, realIndex, dsl, return esOpClient.performRequestWithRouting(clusterPhyId.toString() + "@" + groupName, realIndex, dsl,
s -> handleESQueryResponseCount(s), 3); s -> handleESQueryResponseCount(s), 3);
@@ -127,7 +126,7 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
Integer partition = tp.getPartition(); Integer partition = tp.getPartition();
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.LIST_GROUP_METRICS, clusterId, groupName, topic, partition, startTime, endTime, interval, aggDsl); DslConstant.LIST_GROUP_METRICS, clusterId, groupName, topic, partition, startTime, endTime, interval, aggDsl);
Map<String/*metric*/, List<MetricPointVO>> metricMap = esOpClient.performRequest(realIndex, dsl, Map<String/*metric*/, List<MetricPointVO>> metricMap = esOpClient.performRequest(realIndex, dsl,
s -> handleGroupMetrics(s, aggType, metrics), 3); s -> handleGroupMetrics(s, aggType, metrics), 3);
@@ -148,7 +147,7 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
String realIndex = realIndex(startTime, endTime); String realIndex = realIndex(startTime, endTime);
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_GROUP_TOPIC_PARTITION, clusterPhyId, groupName, startTime, endTime); DslConstant.GET_GROUP_TOPIC_PARTITION, clusterPhyId, groupName, startTime, endTime);
List<GroupMetricPO> groupMetricPOS = esOpClient.performRequestWithRouting(routing(clusterPhyId, groupName), realIndex, dsl, GroupMetricPO.class); List<GroupMetricPO> groupMetricPOS = esOpClient.performRequestWithRouting(routing(clusterPhyId, groupName), realIndex, dsl, GroupMetricPO.class);
return groupMetricPOS.stream().map(g -> new TopicPartitionKS(g.getTopic(), g.getPartitionId().intValue())).collect( Collectors.toSet()); return groupMetricPOS.stream().map(g -> new TopicPartitionKS(g.getTopic(), g.getPartitionId().intValue())).collect( Collectors.toSet());

View File

@@ -1,14 +1,14 @@
package com.xiaojukeji.know.streaming.km.persistence.es.dao; package com.xiaojukeji.know.streaming.km.persistence.es.dao;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.PartitionMetricPO; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.PartitionMetricPO;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.List; import java.util.List;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.PARTITION_INDEX;
/** /**
* @author didi * @author didi
@@ -18,10 +18,9 @@ public class PartitionMetricESDAO extends BaseMetricESDAO {
@PostConstruct @PostConstruct
public void init() { public void init() {
super.indexName = PARTITION_INDEX; super.indexName = PARTITION_INDEX;
super.indexTemplate = PARTITION_TEMPLATE;
checkCurrentDayIndexExist(); checkCurrentDayIndexExist();
BaseMetricESDAO.register(indexName, this); register(this);
} }
public PartitionMetricPO getPartitionLatestMetrics(Long clusterPhyId, String topic, public PartitionMetricPO getPartitionLatestMetrics(Long clusterPhyId, String topic,
@@ -31,7 +30,7 @@ public class PartitionMetricESDAO extends BaseMetricESDAO {
Long startTime = endTime - FIVE_MIN; Long startTime = endTime - FIVE_MIN;
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_PARTITION_LATEST_METRICS, clusterPhyId, topic, brokerId, partitionId, startTime, endTime); DslConstant.GET_PARTITION_LATEST_METRICS, clusterPhyId, topic, brokerId, partitionId, startTime, endTime);
PartitionMetricPO partitionMetricPO = esOpClient.performRequestAndTakeFirst( PartitionMetricPO partitionMetricPO = esOpClient.performRequestAndTakeFirst(
partitionId.toString(), realIndex(startTime, endTime), dsl, PartitionMetricPO.class); partitionId.toString(), realIndex(startTime, endTime), dsl, PartitionMetricPO.class);
@@ -45,7 +44,7 @@ public class PartitionMetricESDAO extends BaseMetricESDAO {
Long startTime = endTime - FIVE_MIN; Long startTime = endTime - FIVE_MIN;
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.LIST_PARTITION_LATEST_METRICS_BY_TOPIC, clusterPhyId, topic, startTime, endTime); DslConstant.LIST_PARTITION_LATEST_METRICS_BY_TOPIC, clusterPhyId, topic, startTime, endTime);
List<PartitionMetricPO> partitionMetricPOS = esOpClient.performRequest( List<PartitionMetricPO> partitionMetricPOS = esOpClient.performRequest(
realIndex(startTime, endTime), dsl, PartitionMetricPO.class); realIndex(startTime, endTime), dsl, PartitionMetricPO.class);

View File

@@ -4,7 +4,7 @@ import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResp
import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr; import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@@ -14,7 +14,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.VALUE; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.VALUE;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.REPLICATION_INDEX;
/** /**
* @author didi * @author didi
@@ -24,10 +24,9 @@ public class ReplicationMetricESDAO extends BaseMetricESDAO {
@PostConstruct @PostConstruct
public void init() { public void init() {
super.indexName = REPLICATION_INDEX; super.indexName = REPLICATION_INDEX;
super.indexTemplate = REPLICATION_TEMPLATE;
checkCurrentDayIndexExist(); checkCurrentDayIndexExist();
BaseMetricESDAO.register(indexName, this); register(this);
} }
/** /**
@@ -39,7 +38,7 @@ public class ReplicationMetricESDAO extends BaseMetricESDAO {
Long startTime = endTime - FIVE_MIN; Long startTime = endTime - FIVE_MIN;
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_REPLICATION_LATEST_METRICS, clusterPhyId, brokerId, topic, partitionId, startTime, endTime); DslConstant.GET_REPLICATION_LATEST_METRICS, clusterPhyId, brokerId, topic, partitionId, startTime, endTime);
ReplicationMetricPO replicationMetricPO = esOpClient.performRequestAndTakeFirst( ReplicationMetricPO replicationMetricPO = esOpClient.performRequestAndTakeFirst(
realIndex(startTime, endTime), dsl, ReplicationMetricPO.class); realIndex(startTime, endTime), dsl, ReplicationMetricPO.class);
@@ -61,7 +60,7 @@ public class ReplicationMetricESDAO extends BaseMetricESDAO {
String aggDsl = buildAggsDSL(metrics, aggType); String aggDsl = buildAggsDSL(metrics, aggType);
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_REPLICATION_AGG_SINGLE_METRICS, clusterPhyId, brokerId,topic, partitionId, startTime, endTime, aggDsl); DslConstant.GET_REPLICATION_AGG_SINGLE_METRICS, clusterPhyId, brokerId,topic, partitionId, startTime, endTime, aggDsl);
return esOpClient.performRequestWithRouting(String.valueOf(brokerId), realIndex, dsl, return esOpClient.performRequestWithRouting(String.valueOf(brokerId), realIndex, dsl,
s -> handleSingleESQueryResponse(s, metrics, aggType), 3); s -> handleSingleESQueryResponse(s, metrics, aggType), 3);

View File

@@ -13,7 +13,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPoint
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@@ -22,17 +22,16 @@ import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.TOPIC_INDEX;
@Component @Component
public class TopicMetricESDAO extends BaseMetricESDAO { public class TopicMetricESDAO extends BaseMetricESDAO {
@PostConstruct @PostConstruct
public void init() { public void init() {
super.indexName = TOPIC_INDEX; super.indexName = TOPIC_INDEX;
super.indexTemplate = TOPIC_TEMPLATE;
checkCurrentDayIndexExist(); checkCurrentDayIndexExist();
BaseMetricESDAO.register(indexName, this); register(this);
} }
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("TopicMetricESDAO", 4,8, 500); protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("TopicMetricESDAO", 4,8, 500);
@@ -47,7 +46,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
String sortDsl = buildSortDsl(sort, SearchSort.DEFAULT); String sortDsl = buildSortDsl(sort, SearchSort.DEFAULT);
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_TOPIC_MAX_OR_MIN_SINGLE_METRIC, clusterPhyId, startTime, endTime, topic, sortDsl); DslConstant.GET_TOPIC_MAX_OR_MIN_SINGLE_METRIC, clusterPhyId, startTime, endTime, topic, sortDsl);
TopicMetricPO topicMetricPO = esOpClient.performRequestAndTakeFirst(topic, realIndex, dsl, TopicMetricPO.class); TopicMetricPO topicMetricPO = esOpClient.performRequestAndTakeFirst(topic, realIndex, dsl, TopicMetricPO.class);
ret.add(topicMetricPO); ret.add(topicMetricPO);
} }
@@ -74,7 +73,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
} }
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_TOPIC_AGG_SINGLE_METRICS, clusterPhyId, startTime, endTime, appendQueryDsl.toString(), aggDsl); DslConstant.GET_TOPIC_AGG_SINGLE_METRICS, clusterPhyId, startTime, endTime, appendQueryDsl.toString(), aggDsl);
return esOpClient.performRequest(realIndex, dsl, return esOpClient.performRequest(realIndex, dsl,
s -> handleSingleESQueryResponse(s, metrics, aggType), 3); s -> handleSingleESQueryResponse(s, metrics, aggType), 3);
@@ -112,7 +111,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
String realIndex = realIndex(startTime, latestMetricTime); String realIndex = realIndex(startTime, latestMetricTime);
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.LIST_TOPIC_WITH_LATEST_METRICS, clusterId, latestMetricTime, appendQueryDsl.toString(), sortDsl); DslConstant.LIST_TOPIC_WITH_LATEST_METRICS, clusterId, latestMetricTime, appendQueryDsl.toString(), sortDsl);
return esOpClient.performRequest(realIndex, dsl, TopicMetricPO.class); return esOpClient.performRequest(realIndex, dsl, TopicMetricPO.class);
} }
@@ -126,8 +125,8 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
String termDsl = buildTermsDsl(Arrays.asList(term)); String termDsl = buildTermsDsl(Arrays.asList(term));
String dsl = term.isEqual() String dsl = term.isEqual()
? dslLoaderUtil.getFormatDslByFileName(DslsConstant.COUNT_TOPIC_METRIC_VALUE, clusterPhyId, topic, startTime, endTime, termDsl) ? dslLoaderUtil.getFormatDslByFileName( DslConstant.COUNT_TOPIC_METRIC_VALUE, clusterPhyId, topic, startTime, endTime, termDsl)
: dslLoaderUtil.getFormatDslByFileName(DslsConstant.COUNT_TOPIC_NOT_METRIC_VALUE, clusterPhyId, topic, startTime, endTime, termDsl); : dslLoaderUtil.getFormatDslByFileName( DslConstant.COUNT_TOPIC_NOT_METRIC_VALUE, clusterPhyId, topic, startTime, endTime, termDsl);
return esOpClient.performRequestWithRouting(topic, realIndex, dsl, return esOpClient.performRequestWithRouting(topic, realIndex, dsl,
s -> handleESQueryResponseCount(s), 3); s -> handleESQueryResponseCount(s), 3);
@@ -141,7 +140,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
Long startTime = endTime - FIVE_MIN; Long startTime = endTime - FIVE_MIN;
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_TOPIC_BROKER_LATEST_METRICS, clusterPhyId, topic, brokerId, startTime, endTime); DslConstant.GET_TOPIC_BROKER_LATEST_METRICS, clusterPhyId, topic, brokerId, startTime, endTime);
TopicMetricPO topicMetricPO = esOpClient.performRequestAndTakeFirst(topic, realIndex(startTime, endTime), dsl, TopicMetricPO.class); TopicMetricPO topicMetricPO = esOpClient.performRequestAndTakeFirst(topic, realIndex(startTime, endTime), dsl, TopicMetricPO.class);
@@ -165,7 +164,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
} }
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_TOPIC_LATEST_METRICS, clusterPhyId, startTime, endTime, appendQueryDsl.toString()); DslConstant.GET_TOPIC_LATEST_METRICS, clusterPhyId, startTime, endTime, appendQueryDsl.toString());
//topicMetricPOS 已经按照 timeStamp 倒序排好序了 //topicMetricPOS 已经按照 timeStamp 倒序排好序了
List<TopicMetricPO> topicMetricPOS = esOpClient.performRequest(realIndex(startTime, endTime), dsl, TopicMetricPO.class); List<TopicMetricPO> topicMetricPOS = esOpClient.performRequest(realIndex(startTime, endTime), dsl, TopicMetricPO.class);
@@ -197,7 +196,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
} }
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_TOPIC_LATEST_METRICS, clusterPhyId, startTime, endTime, appendQueryDsl.toString()); DslConstant.GET_TOPIC_LATEST_METRICS, clusterPhyId, startTime, endTime, appendQueryDsl.toString());
TopicMetricPO topicMetricPO = esOpClient.performRequestAndTakeFirst(topic, realIndex(startTime, endTime), dsl, TopicMetricPO.class); TopicMetricPO topicMetricPO = esOpClient.performRequestAndTakeFirst(topic, realIndex(startTime, endTime), dsl, TopicMetricPO.class);
@@ -262,7 +261,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
3000, 3000,
() -> { () -> {
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_TOPIC_AGG_LIST_METRICS, clusterPhyId, topic, startTime, endTime, interval, aggDsl); DslConstant.GET_TOPIC_AGG_LIST_METRICS, clusterPhyId, topic, startTime, endTime, interval, aggDsl);
Map<String/*metric*/, List<MetricPointVO>> metricMap = esOpClient.performRequestWithRouting(topic, realIndex, dsl, Map<String/*metric*/, List<MetricPointVO>> metricMap = esOpClient.performRequestWithRouting(topic, realIndex, dsl,
s -> handleListESQueryResponse(s, metrics, aggType), 3); s -> handleListESQueryResponse(s, metrics, aggType), 3);
@@ -299,7 +298,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
//4、查询es //4、查询es
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_TOPIC_AGG_TOP_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl); DslConstant.GET_TOPIC_AGG_TOP_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl);
return esOpClient.performRequest(realIndex, dsl, return esOpClient.performRequest(realIndex, dsl,
s -> handleTopTopicESQueryResponse(s, metrics, topN), 3); s -> handleTopTopicESQueryResponse(s, metrics, topN), 3);

View File

@@ -5,7 +5,7 @@ import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.constant.ESConstant; import com.xiaojukeji.know.streaming.km.common.constant.ESConstant;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@@ -15,18 +15,16 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_INDEX; import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.ZOOKEEPER_INDEX;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_TEMPLATE;
@Component @Component
public class ZookeeperMetricESDAO extends BaseMetricESDAO { public class ZookeeperMetricESDAO extends BaseMetricESDAO {
@PostConstruct @PostConstruct
public void init() { public void init() {
super.indexName = ZOOKEEPER_INDEX; super.indexName = ZOOKEEPER_INDEX;
super.indexTemplate = ZOOKEEPER_TEMPLATE;
checkCurrentDayIndexExist(); checkCurrentDayIndexExist();
BaseMetricESDAO.register(indexName, this); register(this);
} }
/** /**
@@ -49,7 +47,7 @@ public class ZookeeperMetricESDAO extends BaseMetricESDAO {
//4、构造dsl查询条件开始查询 //4、构造dsl查询条件开始查询
try { try {
String dsl = dslLoaderUtil.getFormatDslByFileName( String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_ZOOKEEPER_AGG_LIST_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl); DslConstant.GET_ZOOKEEPER_AGG_LIST_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl);
return esOpClient.performRequestWithRouting( return esOpClient.performRequestWithRouting(
String.valueOf(clusterPhyId), String.valueOf(clusterPhyId),

View File

@@ -0,0 +1,275 @@
package com.xiaojukeji.know.streaming.km.persistence.es.dao.connect;
import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse;
import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.CONNECT_CLUSTER_INDEX;
@Component
public class ConnectClusterMetricESDAO extends BaseMetricESDAO {
@PostConstruct
public void init() {
super.indexName = CONNECT_CLUSTER_INDEX;
checkCurrentDayIndexExist();
register( this);
}
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("ConnectClusterMetricESDAO", 4,8, 500);
/**
* 获取集群 clusterPhyId 中每个 metric 的 topN 的 connectCluster 在指定时间[startTime、endTime]区间内所有的指标
* topN 按照[startTime, endTime] 时间段内最后一个值来排序
*/
public Table<String/*metric*/, Long/*connectClusterId*/, List<MetricPointVO>> listMetricsByTop(Long clusterPhyId,
List<Long> connectClusterIdList,
List<String> metricNameList,
String aggType,
int topN,
Long startTime,
Long endTime){
// 1、获取TopN
Map<String, List<Long>> topNConnectClusterIds = getTopNConnectClusterIds(clusterPhyId, metricNameList, aggType, topN, startTime, endTime);
Table<String, Long, List<MetricPointVO>> table = HashBasedTable.create();
// 2、查询指标
for(String metric : metricNameList) {
table.putAll(
this.listMetricsByConnectClusterIdList(
clusterPhyId,
Arrays.asList(metric),
aggType,
topNConnectClusterIds.getOrDefault(metric, connectClusterIdList),
startTime,
endTime
)
);
}
return table;
}
/**
* 获取集群 clusterPhyId 中每个 metric 的指定 connectClusters 在指定时间[startTime、endTime]区间内所有的指标
*/
public Table<String/*metric*/, Long/*connectClusterId*/, List<MetricPointVO>> listMetricsByConnectClusterIdList(Long clusterPhyId,
List<String> metricNameList,
String aggType,
List<Long> connectClusterIdList,
Long startTime,
Long endTime){
//1、获取需要查下的索引
String realIndex = realIndex(startTime, endTime);
//2、根据查询的时间区间大小来确定指标点的聚合区间大小
String interval = MetricsUtils.getInterval(endTime - startTime);
//3、构造agg查询条件
String aggDsl = buildAggsDSL(metricNameList, aggType);
final Table<String, Long, List<MetricPointVO>> table = HashBasedTable.create();
//4、构造dsl查询条件
for(Long connectClusterId : connectClusterIdList){
try {
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslConstant.GET_CONNECT_CLUSTER_AGG_LIST_METRICS,
clusterPhyId,
connectClusterId,
startTime,
endTime,
interval,
aggDsl
);
queryFuture.runnableTask(
String.format("class=ConnectClusterMetricESDAO||method=listMetricsByConnectClusterIdList||ClusterPhyId=%d", clusterPhyId),
5000,
() -> {
Map<String, List<MetricPointVO>> metricMap = esOpClient.performRequestWithRouting(
String.valueOf(connectClusterId),
realIndex,
dsl,
s -> handleListESQueryResponse(s, metricNameList, aggType),
3
);
synchronized (table) {
for(String metric : metricMap.keySet()){
table.put(metric, connectClusterId, metricMap.get(metric));
}
}
});
} catch (Exception e) {
LOGGER.error(
"class=ConnectClusterMetricESDAO||method=listMetricsByConnectClusterIdList||clusterPhyId={}||connectClusterId{}||errMsg=exception!",
clusterPhyId, connectClusterId, e
);
}
}
queryFuture.waitExecute();
return table;
}
/**
* 获取集群 clusterPhyId 中每个 metric 的 topN 的 broker
*/
//public for test
public Map<String, List<Long>> getTopNConnectClusterIds(Long clusterPhyId,
List<String> metricNameList,
String aggType,
int topN,
Long startTime,
Long endTime){
//1、获取需要查下的索引
String realIndex = realIndex(startTime, endTime);
//2、根据查询的时间区间大小来确定指标点的聚合区间大小
String interval = MetricsUtils.getInterval(endTime - startTime);
//3、构造agg查询条件
String aggDsl = buildAggsDSL(metricNameList, aggType);
//4、查询es
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslConstant.GET_CONNECT_CLUSTER_AGG_TOP_METRICS,
clusterPhyId,
startTime,
endTime,
interval,
aggDsl
);
return esOpClient.performRequest(
realIndex,
dsl,
s -> handleTopConnectClusterESQueryResponse(s, metricNameList, topN),
3
);
}
/**************************************************** private method ****************************************************/
private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryResponse response, List<String> metrics, String aggType){
Map<String, List<MetricPointVO>> metricMap = new HashMap<>();
if(null == response || null == response.getAggs()){
return metricMap;
}
Map<String, ESAggr> esAggrMap = response.getAggs().getEsAggrMap();
if (null == esAggrMap || null == esAggrMap.get(HIST)) {
return metricMap;
}
if(CollectionUtils.isEmpty(esAggrMap.get(HIST).getBucketList())){
return metricMap;
}
for(String metric : metrics){
List<MetricPointVO> metricPoints = new ArrayList<>();
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
if(null == value){return;}
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setTimeStamp(timestamp);
metricPoint.setValue(value.toString());
metricPoint.setName(metric);
metricPoints.add(metricPoint);
}else {
LOGGER.info("");
}
}catch (Exception e){
LOGGER.error("metric={}||errMsg=exception!", metric, e);
}
} );
metricMap.put(metric, optimizeMetricPoints(metricPoints));
}
return metricMap;
}
private Map<String, List<Long>> handleTopConnectClusterESQueryResponse(ESQueryResponse response, List<String> metrics, int topN){
Map<String, List<Long>> ret = new HashMap<>();
if(null == response || null == response.getAggs()){
return ret;
}
Map<String, ESAggr> esAggrMap = response.getAggs().getEsAggrMap();
if (null == esAggrMap || null == esAggrMap.get(HIST)) {
return ret;
}
if(CollectionUtils.isEmpty(esAggrMap.get(HIST).getBucketList())){
return ret;
}
Map<String, List<Tuple<Long, Double>>> metricBrokerValueMap = new HashMap<>();
//1、先获取每个指标对应的所有brokerIds以及指标的值
for(String metric : metrics) {
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
Long connectorClusterId = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(HIST).getBucketList()
.get(0).getAggrMap().get(metric).getUnusedMap().get(VALUE);
if(null == value){return;}
List<Tuple<Long, Double>> connectorClusterValue = (null == metricBrokerValueMap.get(metric)) ?
new ArrayList<>() : metricBrokerValueMap.get(metric);
connectorClusterValue.add(new Tuple<>(connectorClusterId, Double.valueOf(value.toString())));
metricBrokerValueMap.put(metric, connectorClusterValue);
}
}catch (Exception e){
LOGGER.error("metric={}||errMsg=exception!", metric, e);
}
} );
}
//2、对每个指标的broker按照指标值排序并截取前topN个brokerIds
for(String metric : metricBrokerValueMap.keySet()){
List<Tuple<Long, Double>> connectorClusterValue = metricBrokerValueMap.get(metric);
connectorClusterValue.sort((o1, o2) -> {
if(null == o1 || null == o2){return 0;}
return o2.getV2().compareTo(o1.getV2());
} );
List<Tuple<Long, Double>> temp = (connectorClusterValue.size() > topN) ? connectorClusterValue.subList(0, topN) : connectorClusterValue;
List<Long> connectorClusterIds = temp.stream().map(t -> t.getV1()).collect(Collectors.toList());
ret.put(metric, connectorClusterIds);
}
return ret;
}
}

View File

@@ -0,0 +1,365 @@
package com.xiaojukeji.know.streaming.km.persistence.es.dao.connect;
import com.alibaba.druid.util.StringUtils;
import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse;
import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchTerm;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.connect.ConnectorMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.common.utils.Triple;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.CONNECT_CONNECTOR_INDEX;
@Component
public class ConnectorMetricESDAO extends BaseMetricESDAO {
@PostConstruct
public void init() {
super.indexName = CONNECT_CONNECTOR_INDEX;
checkCurrentDayIndexExist();
register( this);
}
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("ConnectorMetricESDAO", 4,8, 500);
/**
* 获取每个 metric 的 topN 个 connector 的指标,如果获取不到 topN 的topics, 则默认返回 defaultTopics 的指标
*/
public Table<String/*metric*/, Tuple<Long, String>, List<MetricPointVO>> listMetricsByTopN(Long clusterPhyId,
List<Tuple<Long, String>> defaultConnectorList,
List<String> metricNameList,
String aggType,
int topN,
Long startTime,
Long endTime){
//1、获取topN要查询的topic每一个指标的topN的topic可能不一样
Map<String, List<Tuple<Long, String>>> metricsMap = this.getTopNConnectors(clusterPhyId, metricNameList, aggType, topN, startTime, endTime);
Table<String, Tuple<Long, String>, List<MetricPointVO>> table = HashBasedTable.create();
for(String metricName : metricNameList){
table.putAll(this.listMetricsByConnectors(
clusterPhyId,
Arrays.asList(metricName),
aggType,
metricsMap.getOrDefault(metricName, defaultConnectorList),
startTime,
endTime)
);
}
return table;
}
public List<ConnectorMetricPO> getConnectorLatestMetric(Long clusterPhyId, List<Tuple<Long, String>> connectClusterIdAndConnectorNameList, List<String> metricsNames){
List<ConnectorMetricPO> connectorMetricPOS = new CopyOnWriteArrayList<>();
for(Tuple<Long, String> connectClusterIdAndConnectorName : connectClusterIdAndConnectorNameList){
queryFuture.runnableTask(
"getConnectorLatestMetric",
30000,
() -> {
ConnectorMetricPO connectorMetricPO = this.getConnectorLatestMetric(clusterPhyId, connectClusterIdAndConnectorName.getV1(), connectClusterIdAndConnectorName.getV2(), metricsNames);
connectorMetricPOS.add(connectorMetricPO);
});
}
queryFuture.waitExecute();
return connectorMetricPOS;
}
public ConnectorMetricPO getConnectorLatestMetric(Long clusterPhyId, Long connectClusterId, String connectorName, List<String> metricsNames){
Long endTime = getLatestMetricTime();
Long startTime = endTime - FIVE_MIN;
SearchTerm searchClusterIdTerm = new SearchTerm("connectClusterId", connectClusterId.toString());
searchClusterIdTerm.setField(true);
SearchTerm searchClusterNameTerm = new SearchTerm("connectorName", connectorName);
searchClusterNameTerm.setField(true);
String termDsl = buildTermsDsl(Arrays.asList(searchClusterIdTerm, searchClusterNameTerm));
StringBuilder appendQueryDsl = new StringBuilder();
if(!StringUtils.isEmpty(termDsl)){
appendQueryDsl.append(",").append(termDsl);
}
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslConstant.GET_CONNECTOR_LATEST_METRICS, connectClusterId, connectorName, startTime, endTime, appendQueryDsl.toString());
ConnectorMetricPO connectorMetricPO = esOpClient.performRequestAndTakeFirst(
connectClusterId.toString(), realIndex(startTime, endTime), dsl, ConnectorMetricPO.class);
return (null == connectorMetricPO) ? new ConnectorMetricPO(clusterPhyId, connectClusterId, connectorName)
: filterMetrics(connectorMetricPO, metricsNames);
}
/**
* 获取每个 metric 指定个 topic 的指标
*/
public Table<String/*metric*/, Tuple<Long, String>, List<MetricPointVO>> listMetricsByConnectors(Long clusterPhyId,
List<String> metrics,
String aggType,
List<Tuple<Long, String>> connectorList,
Long startTime,
Long endTime) {
//1、获取需要查下的索引
String realIndex = realIndex(startTime, endTime);
//2、根据查询的时间区间大小来确定指标点的聚合区间大小
String interval = MetricsUtils.getInterval(endTime - startTime);
//3、构造agg查询条件
String aggDsl = buildAggsDSL(metrics, aggType);
final Table<String, Tuple<Long, String>, List<MetricPointVO>> table = HashBasedTable.create();
//4、构造dsl查询条件
for(Tuple<Long, String> connector : connectorList) {
try {
queryFuture.runnableTask(
String.format(
"method=listConnectorMetricsByConnectors||ClusterPhyId=%d||connectorName=%s",
clusterPhyId, connector.getV2() ),
3000,
() -> {
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslConstant.GET_CONNECTOR_AGG_LIST_METRICS,
clusterPhyId,
connector.getV1(),
connector.getV2(),
startTime,
endTime,
interval,
aggDsl
);
Map<String/*metric*/, List<MetricPointVO>> metricMap = esOpClient.performRequestWithRouting(
connector.getV1().toString(),
realIndex,
dsl,
s -> handleListESQueryResponse(s, metrics, aggType),
3
);
synchronized (table){
for(String metric : metricMap.keySet()){
table.put(metric, connector, metricMap.get(metric));
}
}
});
} catch (Exception e) {
LOGGER.error(
"method=listConnectorMetricsByConnectors||clusterPhyId={}||connectorName{}||errMsg=exception!",
clusterPhyId, connector.getV2(), e
);
}
}
queryFuture.waitExecute();
return table;
}
//public for test
public Map<String, List<Tuple<Long, String>>> getTopNConnectors(Long clusterPhyId,
List<String> metricNameList,
String aggType,
int topN,
Long startTime,
Long endTime){
//1、获取需要查下的索引
String realIndex = realIndex(startTime, endTime);
//2、根据查询的时间区间大小来确定指标点的聚合区间大小
String interval = MetricsUtils.getInterval(endTime - startTime);
//3、构造agg查询条件
String aggDsl = buildAggsDSL(metricNameList, aggType);
//4、查询es
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslConstant.GET_CONNECTOR_AGG_TOP_METRICS,
clusterPhyId,
startTime,
endTime,
interval,
aggDsl
);
return esOpClient.performRequest(
realIndex,
dsl,
s -> handleTopConnectorESQueryResponse(s, metricNameList, topN),
3
);
}
/**************************************************** private method ****************************************************/
private Table<String/*topic*/, String/*metric*/, MetricPointVO> handleSingleESQueryResponse(ESQueryResponse response, List<String> metrics, String aggType){
Table<String, String, MetricPointVO> table = HashBasedTable.create();
Map<String, ESAggr> esAggrMap = checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap){return table;}
for(String metric : metrics){
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
String topic = esBucket.getUnusedMap().get(KEY).toString();
String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString();
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setValue(value);
metricPoint.setName(metric);
table.put(topic, metric, metricPoint);
}else {
LOGGER.debug("method=handleListESQueryResponse||metric={}||errMsg=get topic is null!", metric);
}
}catch (Exception e){
LOGGER.error("method=handleListESQueryResponse||metric={}||errMsg=exception!", metric, e);
}
});
}
return table;
}
private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryResponse response, List<String> metrics, String aggType){
Map<String, List<MetricPointVO>> metricMap = new HashMap<>();
Map<String, ESAggr> esAggrMap = checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap){return metricMap;}
for(String metric : metrics){
List<MetricPointVO> metricPoints = new ArrayList<>();
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
if(value == null){return;}
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setTimeStamp(timestamp);
metricPoint.setValue(value.toString());
metricPoint.setName(metric);
metricPoints.add(metricPoint);
}else {
LOGGER.info("");
}
}catch (Exception e){
LOGGER.error("method=handleListESQueryResponse||metric={}||errMsg=exception!", metric, e);
}
} );
metricMap.put(metric, optimizeMetricPoints(metricPoints));
}
return metricMap;
}
private Map<String, List<Tuple<Long, String>>> handleTopConnectorESQueryResponse(ESQueryResponse response, List<String> metricNameList, int topN){
Map<String, List<Tuple<Long, String>>> ret = new HashMap<>();
Map<String, ESAggr> esAggrMap = checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap) {
return ret;
}
Map<String, List<Triple<Long, String, Double>>> metricValueMap = new HashMap<>();
// 1、先获取每个指标对应的所有 connector 以及指标的值
for(String metricName : metricNameList) {
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
String connectorNameAndClusterId = esBucket.getUnusedMap().get(KEY).toString();
Object value = esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap().get(metricName).getUnusedMap().get(VALUE);
if (value == null) {
return;
}
Double metricValue = Double.valueOf(value.toString());
Tuple<String, Long> tuple = splitConnectorNameAndClusterId(connectorNameAndClusterId);
if (null == tuple) {
return;
}
metricValueMap.putIfAbsent(metricName, new ArrayList<>());
metricValueMap.get(metricName).add(new Triple<>(tuple.getV2(), tuple.getV1(), metricValue));
}
} catch (Exception e) {
LOGGER.error("method=handleTopConnectorESQueryResponse||metricName={}||errMsg=exception!", metricName, e);
}
} );
}
//2、对每个指标的connector按照指标值排序并截取前topN个connectors
for(Map.Entry<String, List<Triple<Long, String, Double>>> entry : metricValueMap.entrySet()){
entry.getValue().sort((o1, o2) -> {
if(null == o1 || null == o2) {
return 0;
}
return o2.v3().compareTo(o1.v3());
} );
List<Triple<Long, String, Double>> temp = (entry.getValue().size() > topN) ? entry.getValue().subList(0, topN) : entry.getValue();
List<Tuple<Long, String>> connectorList = new ArrayList<>();
for (Triple<Long, String, Double> triple: temp) {
connectorList.add(new Tuple<>(triple.v1(), triple.v2()));
}
ret.put(entry.getKey(), connectorList);
}
return ret;
}
private Map<String/*metric*/, Map<String/*topic*/, List<MetricPointVO>>> topicMetricMap2MetricTopicMap(
Map<String/*topic*/, Map<String/*metric*/, List<MetricPointVO>>> topicMetricMap){
Map<String/*metric*/, Map<String/*topic*/, List<MetricPointVO>>> ret = new HashMap<>();
for(String topic : topicMetricMap.keySet()){
Map<String/*metric*/, List<MetricPointVO>> metricMap = topicMetricMap.get(topic);
for(String metric : metricMap.keySet()){
Map<String/*topic*/, List<MetricPointVO>> brokerMap = (null == ret.get(metric)) ? new HashMap<>() : ret.get(metric);
brokerMap.put(topic, metricMap.get(metric));
ret.put(metric, brokerMap);
}
}
return ret;
}
private Tuple<String, Long> splitConnectorNameAndClusterId(String connectorNameAndClusterId){
String[] ss = connectorNameAndClusterId.split("#");
if(null == ss || ss.length != 2){return null;}
return new Tuple<>(ss[0], Long.valueOf(ss[1]));
}
}

View File

@@ -13,9 +13,9 @@ package com.xiaojukeji.know.streaming.km.persistence.es.dsls;
* 在dslFiles目录下新建以类名为名称的文件夹以方法名为名称的文件名 * 在dslFiles目录下新建以类名为名称的文件夹以方法名为名称的文件名
* *
*/ */
public class DslsConstant { public class DslConstant {
private DslsConstant() {} private DslConstant() {}
/**************************************************** Base ****************************************************/ /**************************************************** Base ****************************************************/
public static final String GET_LATEST_METRIC_TIME = "BaseMetricESDAO/getLatestMetricTime"; public static final String GET_LATEST_METRIC_TIME = "BaseMetricESDAO/getLatestMetricTime";
@@ -82,4 +82,18 @@ public class DslsConstant {
/**************************************************** Zookeeper ****************************************************/ /**************************************************** Zookeeper ****************************************************/
public static final String GET_ZOOKEEPER_AGG_LIST_METRICS = "ZookeeperMetricESDAO/getAggListZookeeperMetrics"; public static final String GET_ZOOKEEPER_AGG_LIST_METRICS = "ZookeeperMetricESDAO/getAggListZookeeperMetrics";
/**************************************************** Connect-Cluster ****************************************************/
public static final String GET_CONNECT_CLUSTER_AGG_LIST_METRICS = "ConnectClusterMetricESDAO/getAggListConnectClusterMetrics";
public static final String GET_CONNECT_CLUSTER_AGG_TOP_METRICS = "ConnectClusterMetricESDAO/getAggTopMetricsConnectClusters";
/**************************************************** Connect-Connector ****************************************************/
public static final String GET_CONNECTOR_LATEST_METRICS = "ConnectorMetricESDAO/getConnectorLatestMetric";
public static final String GET_CONNECTOR_AGG_LIST_METRICS = "ConnectorMetricESDAO/getConnectorAggListMetric";
public static final String GET_CONNECTOR_AGG_TOP_METRICS = "ConnectorMetricESDAO/getConnectorAggTopMetric";
} }

View File

@@ -1,26 +1,14 @@
package com.xiaojukeji.know.streaming.km.persistence.es.dsls; package com.xiaojukeji.know.streaming.km.persistence.es.dsls;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.DefaultJSONParser;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.utils.LoggerUtil; import com.xiaojukeji.know.streaming.km.common.utils.LoggerUtil;
import com.xiaojukeji.know.streaming.km.persistence.es.ESFileLoader;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
@@ -33,50 +21,25 @@ import java.util.Map;
* *
*/ */
@Component @Component
public class DslLoaderUtil { public class DslLoaderUtil extends ESFileLoader {
private static final ILog LOGGER = LoggerUtil.getESLogger(); private static final ILog LOGGER = LoggerUtil.getESLogger();
private static final String FILE_PATH = "es/dsl/";
/** /**
* 查询语句容器 * 查询语句容器
* key : fileRelativePath
* value : dslContent
*/ */
private Map<String/*fileRelativePath*/, String/*dslContent*/> dslsMap = Maps.newHashMap(); private Map<String, String> dslsMap = Maps.newHashMap();
@PostConstruct @PostConstruct
public void init() { public void init() {
LOGGER.info("method=init||DslLoaderUtil init start."); dslsMap.putAll(loaderFileContext(FILE_PATH, DslConstant.class.getDeclaredFields()));
List<String> dslFileNames = Lists.newLinkedList();
// 反射获取接口中定义的变量中的值
Field[] fields = DslsConstant.class.getDeclaredFields();
for (int i = 0; i < fields.length; ++i) {
fields[i].setAccessible(true);
try {
dslFileNames.add(fields[i].get(null).toString());
} catch (IllegalAccessException e) {
LOGGER.error("method=init||errMsg=fail to read {} error. ", fields[i].getName(),
e);
}
}
// 加载dsl文件及内容
for (String fileName : dslFileNames) {
dslsMap.put(fileName, readDslFileInJarFile(fileName));
}
// 输出加载的查询语句
LOGGER.info("method=init||msg=dsl files count {}", dslsMap.size());
for (Map.Entry<String/*fileRelativePath*/, String/*dslContent*/> entry : dslsMap.entrySet()) {
LOGGER.info("method=init||msg=file name {}, dsl content {}", entry.getKey(),
entry.getValue());
}
LOGGER.info("method=init||DslLoaderUtil init finished.");
} }
/** /**
* 获取查询语句 * 获取查询语句
*
* @param fileName
* @return
*/ */
public String getDslByFileName(String fileName) { public String getDslByFileName(String fileName) {
return dslsMap.get(fileName); return dslsMap.get(fileName);
@@ -84,10 +47,6 @@ public class DslLoaderUtil {
/** /**
* 获取格式化的查询语句 * 获取格式化的查询语句
*
* @param fileName
* @param args
* @return
*/ */
public String getFormatDslByFileName(String fileName, Object... args) { public String getFormatDslByFileName(String fileName, Object... args) {
String loadDslContent = getDslByFileName(fileName); String loadDslContent = getDslByFileName(fileName);
@@ -107,128 +66,4 @@ public class DslLoaderUtil {
return dsl; return dsl;
} }
public String getFormatDslForCatIndexByCondition(String fileName, String boolMustDsl, Object... args) {
String formatDslByFileName = getFormatDslByFileName(fileName, args);
return formatDslByFileName.replace("\"boolMustDsl\"", boolMustDsl);
}
public String getFormatDslByFileNameByAggParam(String fileName, String clusterPhyMetrics, String interval,
String aggType, Object... args) {
String formatDslByFileName = getFormatDslByFileName(fileName, args);
return formatDslByFileName
.replace("{interval}", interval)
.replace("{clusterPhyMetrics}", clusterPhyMetrics)
.replace("{aggType}", aggType);
}
public String getFormatDslByFileNameAndOtherParam(String fileName, String interval, String aggsDsl,
Object... args) {
String formatDslByFileName = getFormatDslByFileName(fileName, args);
return formatDslByFileName
.replace("{interval}", interval)
.replace("\"aggsDsl\":1", aggsDsl);
}
public String getDslByTopNNameInfo(String fileName, String interval, String topNameStr, String aggsDsl,
Object... args) {
String formatDslByFileName = getFormatDslByFileName(fileName, args);
return formatDslByFileName
.replace("{interval}", interval)
.replace("\"aggsDsl\":1", aggsDsl)
.replace("\"topNameListStr\"", topNameStr);
}
/**************************************************** private method ****************************************************/
/**
* 去除json中的空格
*
* @param sourceDsl
* @return
*/
private String trimJsonBank(String sourceDsl) {
List<String> dslList = Lists.newArrayList();
DefaultJSONParser parser = null;
Object obj = null;
String dsl = sourceDsl;
// 解析多个json直到pos为0
for (;;) {
try {
// 这里需要Feature.OrderedField.getMask()保持有序
parser = new DefaultJSONParser(dsl, ParserConfig.getGlobalInstance(),
JSON.DEFAULT_PARSER_FEATURE | Feature.OrderedField.getMask());
obj = parser.parse();
} catch (Exception t) {
LOGGER.error("method=trimJsonBank||errMsg=parse json {} error. ", dsl, t);
}
if (obj == null) {
break;
}
if (obj instanceof JSONObject) {
dslList.add( JSON.toJSONString(obj, SerializerFeature.WriteMapNullValue));
int pos = parser.getLexer().pos();
if (pos <= 0) {
break;
}
dsl = dsl.substring(pos);
parser.getLexer().close();
} else {
parser.getLexer().close();
break;
}
}
// 格式化异常或者有多个查询语句,返回原来的查询语句
if (dslList.isEmpty() || dslList.size() > 1) {
return sourceDsl;
}
return dslList.get(0);
}
/**
* 从jar包中读取dsl语句文件
*
* @param fileName
* @return
*/
private String readDslFileInJarFile(String fileName) {
InputStream inputStream = this.getClass().getClassLoader()
.getResourceAsStream( String.format("dsl/%s", fileName));
if (inputStream != null) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
List<String> lines = Lists.newLinkedList();
try {
while ((line = bufferedReader.readLine()) != null) {
lines.add(line);
}
return StringUtils.join(lines, "");
} catch (IOException e) {
LOGGER.error("method=readDslFileInJarFile||errMsg=read file {} error. ", fileName,
e);
return "";
} finally {
try {
inputStream.close();
} catch (IOException e) {
LOGGER.error(
"method=readDslFileInJarFile||errMsg=fail to close file {} error. ",
fileName, e);
}
}
} else {
LOGGER.error("method=readDslFileInJarFile||errMsg=fail to read file {} content",
fileName);
return "";
}
}
} }

View File

@@ -0,0 +1,19 @@
package com.xiaojukeji.know.streaming.km.persistence.es.template;
/**
* @author didi
*/
public class TemplateConstant {
public static final String TOPIC_INDEX = "ks_kafka_topic_metric";
public static final String CLUSTER_INDEX = "ks_kafka_cluster_metric";
public static final String BROKER_INDEX = "ks_kafka_broker_metric";
public static final String PARTITION_INDEX = "ks_kafka_partition_metric";
public static final String GROUP_INDEX = "ks_kafka_group_metric";
public static final String REPLICATION_INDEX = "ks_kafka_replication_metric";
public static final String ZOOKEEPER_INDEX = "ks_kafka_zookeeper_metric";
public static final String CONNECT_CLUSTER_INDEX = "ks_kafka_connect_cluster_metric";
public static final String CONNECT_CONNECTOR_INDEX = "ks_kafka_connect_connector_metric";
private TemplateConstant() {
}
}

View File

@@ -0,0 +1,28 @@
package com.xiaojukeji.know.streaming.km.persistence.es.template;
import com.google.common.collect.Maps;
import com.xiaojukeji.know.streaming.km.persistence.es.ESFileLoader;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Map;
@Component
public class TemplateLoaderUtil extends ESFileLoader {
private static final String FILE_PATH = "es/template/";
/**
* 查询语句容器
*/
private Map<String, String> templateMap = Maps.newHashMap();
@PostConstruct
public void init() {
templateMap.putAll(loaderFileContext(FILE_PATH, TemplateConstant.class.getDeclaredFields()));
}
public String getContextByFileName(String fileName) {
return templateMap.get(fileName);
}
}

View File

@@ -0,0 +1,44 @@
{
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"clusterPhyId": {
"value": %d
}
}
},
{
"term": {
"connectClusterId": {
"value": %d
}
}
},
{
"range": {
"timestamp": {
"gte": %d,
"lte": %d
}
}
}
]
}
},
"aggs": {
"hist": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "%s",
"time_zone": "Asia/Shanghai",
"min_doc_count": 0
},
"aggs": {
%s
}
}
}
}

View File

@@ -0,0 +1,45 @@
{
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"clusterPhyId": {
"value": %d
}
}
},
{
"range": {
"timestamp": {
"gte": %d,
"lte": %d
}
}
}
]
}
},
"aggs": {
"hist": {
"terms": {
"field": "connectClusterId",
"collect_mode": "breadth_first"
},
"aggs": {
"hist": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "%s",
"time_zone": "Asia/Shanghai",
"min_doc_count": 0
},
"aggs": {
%s
}
}
}
}
}
}

View File

@@ -0,0 +1,51 @@
{
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"clusterPhyId": {
"value": %d
}
}
},
{
"term": {
"connectClusterId": {
"value": %d
}
}
},
{
"term": {
"connectorName": {
"value": "%s"
}
}
},
{
"range": {
"timestamp": {
"gte": %d,
"lte": %d
}
}
}
]
}
},
"aggs": {
"hist": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "%s",
"time_zone": "Asia/Shanghai",
"min_doc_count": 0
},
"aggs": {
%s
}
}
}
}

View File

@@ -0,0 +1,45 @@
{
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"clusterPhyId": {
"value": %d
}
}
},
{
"range": {
"timestamp": {
"gte": %d,
"lte": %d
}
}
}
]
}
},
"aggs": {
"hist": {
"terms": {
"field": "connectorNameAndClusterId",
"collect_mode": "breadth_first"
},
"aggs": {
"hist": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "%s",
"time_zone": "Asia/Shanghai",
"min_doc_count": 0
},
"aggs": {
%s
}
}
}
}
}
}

View File

@@ -0,0 +1,39 @@
{
"size":1000,
"query": {
"bool": {
"must": [
{
"term": {
"connectClusterId": {
"value": %d
}
}
},
{
"term": {
"connectorName": {
"value": "%s"
}
}
},
{
"range": {
"timestamp": {
"gte": %d,
"lte": %d
}
}
}
%s
]
}
},
"sort": [
{
"timestamp": {
"order": "desc"
}
}
]
}

View File

@@ -0,0 +1,101 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_broker_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "2"
}
},
"mappings" : {
"properties" : {
"brokerId" : {
"type" : "long"
},
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"clusterPhyId" : {
"type" : "long"
},
"metrics" : {
"properties" : {
"NetworkProcessorAvgIdle" : {
"type" : "float"
},
"UnderReplicatedPartitions" : {
"type" : "float"
},
"BytesIn_min_15" : {
"type" : "float"
},
"HealthCheckTotal" : {
"type" : "float"
},
"RequestHandlerAvgIdle" : {
"type" : "float"
},
"connectionsCount" : {
"type" : "float"
},
"BytesIn_min_5" : {
"type" : "float"
},
"HealthScore" : {
"type" : "float"
},
"BytesOut" : {
"type" : "float"
},
"BytesOut_min_15" : {
"type" : "float"
},
"BytesIn" : {
"type" : "float"
},
"BytesOut_min_5" : {
"type" : "float"
},
"TotalRequestQueueSize" : {
"type" : "float"
},
"MessagesIn" : {
"type" : "float"
},
"TotalProduceRequests" : {
"type" : "float"
},
"HealthCheckPassed" : {
"type" : "float"
},
"TotalResponseQueueSize" : {
"type" : "float"
}
}
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"index" : true,
"type" : "date",
"doc_values" : true
}
}
},
"aliases" : { }
}

View File

@@ -0,0 +1,186 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_cluster_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "2"
}
},
"mappings" : {
"properties" : {
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"clusterPhyId" : {
"type" : "long"
},
"metrics" : {
"properties" : {
"Connections" : {
"type" : "double"
},
"BytesIn_min_15" : {
"type" : "double"
},
"PartitionURP" : {
"type" : "double"
},
"HealthScore_Topics" : {
"type" : "double"
},
"EventQueueSize" : {
"type" : "double"
},
"ActiveControllerCount" : {
"type" : "double"
},
"GroupDeads" : {
"type" : "double"
},
"BytesIn_min_5" : {
"type" : "double"
},
"HealthCheckTotal_Topics" : {
"type" : "double"
},
"Partitions" : {
"type" : "double"
},
"BytesOut" : {
"type" : "double"
},
"Groups" : {
"type" : "double"
},
"BytesOut_min_15" : {
"type" : "double"
},
"TotalRequestQueueSize" : {
"type" : "double"
},
"HealthCheckPassed_Groups" : {
"type" : "double"
},
"TotalProduceRequests" : {
"type" : "double"
},
"HealthCheckPassed" : {
"type" : "double"
},
"TotalLogSize" : {
"type" : "double"
},
"GroupEmptys" : {
"type" : "double"
},
"PartitionNoLeader" : {
"type" : "double"
},
"HealthScore_Brokers" : {
"type" : "double"
},
"Messages" : {
"type" : "double"
},
"Topics" : {
"type" : "double"
},
"PartitionMinISR_E" : {
"type" : "double"
},
"HealthCheckTotal" : {
"type" : "double"
},
"Brokers" : {
"type" : "double"
},
"Replicas" : {
"type" : "double"
},
"HealthCheckTotal_Groups" : {
"type" : "double"
},
"GroupRebalances" : {
"type" : "double"
},
"MessageIn" : {
"type" : "double"
},
"HealthScore" : {
"type" : "double"
},
"HealthCheckPassed_Topics" : {
"type" : "double"
},
"HealthCheckTotal_Brokers" : {
"type" : "double"
},
"PartitionMinISR_S" : {
"type" : "double"
},
"BytesIn" : {
"type" : "double"
},
"BytesOut_min_5" : {
"type" : "double"
},
"GroupActives" : {
"type" : "double"
},
"MessagesIn" : {
"type" : "double"
},
"GroupReBalances" : {
"type" : "double"
},
"HealthCheckPassed_Brokers" : {
"type" : "double"
},
"HealthScore_Groups" : {
"type" : "double"
},
"TotalResponseQueueSize" : {
"type" : "double"
},
"Zookeepers" : {
"type" : "double"
},
"LeaderMessages" : {
"type" : "double"
},
"HealthScore_Cluster" : {
"type" : "double"
},
"HealthCheckPassed_Cluster" : {
"type" : "double"
},
"HealthCheckTotal_Cluster" : {
"type" : "double"
}
}
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"type" : "date"
}
}
},
"aliases" : { }
}

View File

@@ -0,0 +1,86 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_connect_cluster_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "2"
}
},
"mappings" : {
"properties" : {
"connectClusterId" : {
"type" : "long"
},
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"clusterPhyId" : {
"type" : "long"
},
"metrics" : {
"properties" : {
"ConnectorCount" : {
"type" : "float"
},
"TaskCount" : {
"type" : "float"
},
"ConnectorStartupAttemptsTotal" : {
"type" : "float"
},
"ConnectorStartupFailurePercentage" : {
"type" : "float"
},
"ConnectorStartupFailureTotal" : {
"type" : "float"
},
"ConnectorStartupSuccessPercentage" : {
"type" : "float"
},
"ConnectorStartupSuccessTotal" : {
"type" : "float"
},
"TaskStartupAttemptsTotal" : {
"type" : "float"
},
"TaskStartupFailurePercentage" : {
"type" : "float"
},
"TaskStartupFailureTotal" : {
"type" : "float"
},
"TaskStartupSuccessPercentage" : {
"type" : "float"
},
"TaskStartupSuccessTotal" : {
"type" : "float"
}
}
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"index" : true,
"type" : "date",
"doc_values" : true
}
}
},
"aliases" : { }
}

View File

@@ -0,0 +1,194 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_connect_connector_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "2"
}
},
"mappings" : {
"properties" : {
"connectClusterId" : {
"type" : "long"
},
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"connectorName" : {
"type" : "keyword"
},
"connectorNameAndClusterId" : {
"type" : "keyword"
},
"clusterPhyId" : {
"type" : "long"
},
"metrics" : {
"properties" : {
"HealthState" : {
"type" : "float"
},
"ConnectorTotalTaskCount" : {
"type" : "float"
},
"HealthCheckPassed" : {
"type" : "float"
},
"HealthCheckTotal" : {
"type" : "float"
},
"ConnectorRunningTaskCount" : {
"type" : "float"
},
"ConnectorPausedTaskCount" : {
"type" : "float"
},
"ConnectorFailedTaskCount" : {
"type" : "float"
},
"ConnectorUnassignedTaskCount" : {
"type" : "float"
},
"BatchSizeAvg" : {
"type" : "float"
},
"BatchSizeMax" : {
"type" : "float"
},
"OffsetCommitAvgTimeMs" : {
"type" : "float"
},
"OffsetCommitMaxTimeMs" : {
"type" : "float"
},
"OffsetCommitFailurePercentage" : {
"type" : "float"
},
"OffsetCommitSuccessPercentage" : {
"type" : "float"
},
"PollBatchAvgTimeMs" : {
"type" : "float"
},
"PollBatchMaxTimeMs" : {
"type" : "float"
},
"SourceRecordActiveCount" : {
"type" : "float"
},
"SourceRecordActiveCountAvg" : {
"type" : "float"
},
"SourceRecordActiveCountMax" : {
"type" : "float"
},
"SourceRecordPollRate" : {
"type" : "float"
},
"SourceRecordPollTotal" : {
"type" : "float"
},
"SourceRecordWriteRate" : {
"type" : "float"
},
"SourceRecordWriteTotal" : {
"type" : "float"
},
"OffsetCommitCompletionRate" : {
"type" : "float"
},
"OffsetCommitCompletionTotal" : {
"type" : "float"
},
"OffsetCommitSkipRate" : {
"type" : "float"
},
"OffsetCommitSkipTotal" : {
"type" : "float"
},
"PartitionCount" : {
"type" : "float"
},
"PutBatchAvgTimeMs" : {
"type" : "float"
},
"PutBatchMaxTimeMs" : {
"type" : "float"
},
"SinkRecordActiveCount" : {
"type" : "float"
},
"SinkRecordActiveCountAvg" : {
"type" : "float"
},
"SinkRecordActiveCountMax" : {
"type" : "float"
},
"SinkRecordLagMax" : {
"type" : "float"
},
"SinkRecordReadRate" : {
"type" : "float"
},
"SinkRecordReadTotal" : {
"type" : "float"
},
"SinkRecordSendRate" : {
"type" : "float"
},
"SinkRecordSendTotal" : {
"type" : "float"
},
"DeadletterqueueProduceFailures" : {
"type" : "float"
},
"DeadletterqueueProduceRequests" : {
"type" : "float"
},
"LastErrorTimestamp" : {
"type" : "float"
},
"TotalErrorsLogged" : {
"type" : "float"
},
"TotalRecordErrors" : {
"type" : "float"
},
"TotalRecordFailures" : {
"type" : "float"
},
"TotalRecordsSkipped" : {
"type" : "float"
},
"TotalRetries" : {
"type" : "float"
}
}
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"index" : true,
"type" : "date",
"doc_values" : true
}
}
},
"aliases" : { }
}

View File

@@ -0,0 +1,74 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_group_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "10"
}
},
"mappings" : {
"properties" : {
"group" : {
"type" : "keyword"
},
"partitionId" : {
"type" : "long"
},
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"clusterPhyId" : {
"type" : "long"
},
"topic" : {
"type" : "keyword"
},
"metrics" : {
"properties" : {
"HealthScore" : {
"type" : "float"
},
"Lag" : {
"type" : "float"
},
"OffsetConsumed" : {
"type" : "float"
},
"HealthCheckTotal" : {
"type" : "float"
},
"HealthCheckPassed" : {
"type" : "float"
}
}
},
"groupMetric" : {
"type" : "keyword"
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"index" : true,
"type" : "date",
"doc_values" : true
}
}
},
"aliases" : { }
}

View File

@@ -0,0 +1,65 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_partition_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "10"
}
},
"mappings" : {
"properties" : {
"brokerId" : {
"type" : "long"
},
"partitionId" : {
"type" : "long"
},
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"clusterPhyId" : {
"type" : "long"
},
"topic" : {
"type" : "keyword"
},
"metrics" : {
"properties" : {
"LogStartOffset" : {
"type" : "float"
},
"Messages" : {
"type" : "float"
},
"LogEndOffset" : {
"type" : "float"
}
}
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"index" : true,
"type" : "date",
"doc_values" : true
}
}
},
"aliases" : { }
}

View File

@@ -0,0 +1,65 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_replication_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "10"
}
},
"mappings" : {
"properties" : {
"brokerId" : {
"type" : "long"
},
"partitionId" : {
"type" : "long"
},
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"clusterPhyId" : {
"type" : "long"
},
"topic" : {
"type" : "keyword"
},
"metrics" : {
"properties" : {
"LogStartOffset" : {
"type" : "float"
},
"Messages" : {
"type" : "float"
},
"LogEndOffset" : {
"type" : "float"
}
}
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"index" : true,
"type" : "date",
"doc_values" : true
}
}
},
"aliases" : { }
}

View File

@@ -0,0 +1,116 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_topic_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "10"
}
},
"mappings" : {
"properties" : {
"brokerId" : {
"type" : "long"
},
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"topic" : {
"type" : "keyword"
},
"clusterPhyId" : {
"type" : "long"
},
"metrics" : {
"properties" : {
"BytesIn_min_15" : {
"type" : "float"
},
"Messages" : {
"type" : "float"
},
"BytesRejected" : {
"type" : "float"
},
"PartitionURP" : {
"type" : "float"
},
"HealthCheckTotal" : {
"type" : "float"
},
"ReplicationCount" : {
"type" : "float"
},
"ReplicationBytesOut" : {
"type" : "float"
},
"ReplicationBytesIn" : {
"type" : "float"
},
"FailedFetchRequests" : {
"type" : "float"
},
"BytesIn_min_5" : {
"type" : "float"
},
"HealthScore" : {
"type" : "float"
},
"LogSize" : {
"type" : "float"
},
"BytesOut" : {
"type" : "float"
},
"BytesOut_min_15" : {
"type" : "float"
},
"FailedProduceRequests" : {
"type" : "float"
},
"BytesIn" : {
"type" : "float"
},
"BytesOut_min_5" : {
"type" : "float"
},
"MessagesIn" : {
"type" : "float"
},
"TotalProduceRequests" : {
"type" : "float"
},
"HealthCheckPassed" : {
"type" : "float"
}
}
},
"brokerAgg" : {
"type" : "keyword"
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"index" : true,
"type" : "date",
"doc_values" : true
}
}
},
"aliases" : { }
}

View File

@@ -0,0 +1,84 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_zookeeper_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "10"
}
},
"mappings" : {
"properties" : {
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"clusterPhyId" : {
"type" : "long"
},
"metrics" : {
"properties" : {
"AvgRequestLatency" : {
"type" : "double"
},
"MinRequestLatency" : {
"type" : "double"
},
"MaxRequestLatency" : {
"type" : "double"
},
"OutstandingRequests" : {
"type" : "double"
},
"NodeCount" : {
"type" : "double"
},
"WatchCount" : {
"type" : "double"
},
"NumAliveConnections" : {
"type" : "double"
},
"PacketsReceived" : {
"type" : "double"
},
"PacketsSent" : {
"type" : "double"
},
"EphemeralsCount" : {
"type" : "double"
},
"ApproximateDataSize" : {
"type" : "double"
},
"OpenFileDescriptorCount" : {
"type" : "double"
},
"MaxFileDescriptorCount" : {
"type" : "double"
}
}
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"type" : "date"
}
}
},
"aliases" : { }
}