bugfix, fix collect consumer metrics task

This commit is contained in:
zengqiao
2020-09-16 21:04:47 +08:00
parent 74b5700573
commit dd2e29dd40
5 changed files with 150 additions and 134 deletions

View File

@@ -1,8 +1,5 @@
package com.xiaojukeji.kafka.manager.common.entity.dto.consumer;
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
import java.util.List;
import java.util.Map;
/**
@@ -11,20 +8,33 @@ import java.util.Map;
* @date 2015/11/12
*/
public class ConsumerDTO {
/**
* 消费group名
*/
private Long clusterId;
private String topicName;
private String consumerGroup;
/**
* 消费类型一般为static
*/
private String location;
/**
* 订阅的每个topic的partition状态列表
*/
private Map<String, List<PartitionState>> topicPartitionMap;
private Map<Integer, Long> partitionOffsetMap;
private Map<Integer, Long> consumerOffsetMap;
public Long getClusterId() {
return clusterId;
}
public void setClusterId(Long clusterId) {
this.clusterId = clusterId;
}
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getConsumerGroup() {
return consumerGroup;
@@ -42,20 +52,31 @@ public class ConsumerDTO {
this.location = location;
}
public Map<String, List<PartitionState>> getTopicPartitionMap() {
return topicPartitionMap;
public Map<Integer, Long> getPartitionOffsetMap() {
return partitionOffsetMap;
}
public void setTopicPartitionMap(Map<String, List<PartitionState>> topicPartitionMap) {
this.topicPartitionMap = topicPartitionMap;
public void setPartitionOffsetMap(Map<Integer, Long> partitionOffsetMap) {
this.partitionOffsetMap = partitionOffsetMap;
}
public Map<Integer, Long> getConsumerOffsetMap() {
return consumerOffsetMap;
}
public void setConsumerOffsetMap(Map<Integer, Long> consumerOffsetMap) {
this.consumerOffsetMap = consumerOffsetMap;
}
@Override
public String toString() {
return "Consumer{" +
"consumerGroup='" + consumerGroup + '\'' +
return "ConsumerDTO{" +
"clusterId=" + clusterId +
", topicName='" + topicName + '\'' +
", consumerGroup='" + consumerGroup + '\'' +
", location='" + location + '\'' +
", topicPartitionMap=" + topicPartitionMap +
", partitionOffsetMap=" + partitionOffsetMap +
", consumerOffsetMap=" + consumerOffsetMap +
'}';
}
}

View File

@@ -2,12 +2,12 @@ package com.xiaojukeji.kafka.manager.service.collector;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.entity.ConsumerMetrics;
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerDTO;
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.ClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +34,8 @@ public class CollectConsumerMetricsTask extends BaseCollectTask {
if (clusterDO == null) {
return;
}
Map<String, List<PartitionState>> topicNamePartitionStateListMap = new HashMap<>();
List<ConsumerDTO> consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, topicNamePartitionStateListMap);
Map<TopicPartition, Long> allPartitionOffsetMap = new HashMap<>();
List<ConsumerDTO> consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, allPartitionOffsetMap);
List<ConsumerMetrics> consumerMetricsList = convert2ConsumerMetrics(consumerDTOList);
KafkaMetricsCache.putConsumerMetricsToCache(clusterId, consumerMetricsList);
@@ -47,23 +47,27 @@ public class CollectConsumerMetricsTask extends BaseCollectTask {
private List<ConsumerMetrics> convert2ConsumerMetrics(List<ConsumerDTO> consumerDTOList) {
List<ConsumerMetrics> consumerMetricsList = new ArrayList<>();
for (ConsumerDTO consumerDTO : consumerDTOList) {
Map<String, List<PartitionState>> topicNamePartitionStateListMap = consumerDTO.getTopicPartitionMap();
for(Map.Entry<String, List<PartitionState>> entry : topicNamePartitionStateListMap.entrySet()){
String topicName = entry.getKey();
List<PartitionState> partitionStateList = entry.getValue();
ConsumerMetrics consumerMetrics = new ConsumerMetrics();
consumerMetrics.setClusterId(clusterId);
consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup());
consumerMetrics.setLocation(consumerDTO.getLocation());
consumerMetrics.setTopicName(topicName);
long sumLag = 0;
for (PartitionState partitionState : partitionStateList) {
Map.Entry<Long, Long> offsetEntry = new AbstractMap.SimpleEntry<>(partitionState.getOffset(), partitionState.getConsumeOffset());
sumLag += (offsetEntry.getKey() - offsetEntry.getValue() > 0 ? offsetEntry.getKey() - offsetEntry.getValue(): 0);
}
consumerMetrics.setSumLag(sumLag);
consumerMetricsList.add(consumerMetrics);
if (consumerDTO.getPartitionOffsetMap() == null || consumerDTO.getConsumerOffsetMap() == null) {
continue;
}
ConsumerMetrics consumerMetrics = new ConsumerMetrics();
consumerMetrics.setClusterId(consumerDTO.getClusterId());
consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup());
consumerMetrics.setLocation(consumerDTO.getLocation());
consumerMetrics.setTopicName(consumerDTO.getTopicName());
long sumLag = 0;
for(Map.Entry<Integer, Long> entry : consumerDTO.getPartitionOffsetMap().entrySet()){
Long partitionOffset = entry.getValue();
Long consumerOffset = consumerDTO.getConsumerOffsetMap().get(entry.getKey());
if (partitionOffset == null || consumerOffset == null) {
continue;
}
sumLag += Math.max(partitionOffset - consumerOffset, 0);
}
consumerMetrics.setSumLag(sumLag);
consumerMetricsList.add(consumerMetrics);
}
return consumerMetricsList;
}

View File

@@ -7,6 +7,7 @@ import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerGroupDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.PartitionOffsetDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO;
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
import org.apache.kafka.common.TopicPartition;
import java.util.List;
import java.util.Map;
@@ -57,7 +58,7 @@ public interface ConsumerService {
* @return
*/
List<ConsumerDTO> getMonitoredConsumerList(ClusterDO clusterDO,
Map<String, List<PartitionState>> topicNamePartitionStateListMap);
Map<TopicPartition, Long> partitionOffsetMap);
/**
* 重置offset

View File

@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.service.service.impl;
import com.xiaojukeji.kafka.manager.common.constant.OffsetStoreLocation;
import com.xiaojukeji.kafka.manager.common.constant.StatusCode;
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO;
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
@@ -18,7 +17,6 @@ import com.xiaojukeji.kafka.manager.service.cache.ConsumerMetadataCache;
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientCache;
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
import com.xiaojukeji.kafka.manager.service.service.TopicService;
import com.xiaojukeji.kafka.manager.service.service.ZookeeperService;
import com.xiaojukeji.kafka.manager.common.utils.zk.ZkPathUtil;
import kafka.admin.AdminClient;
import org.apache.commons.lang.StringUtils;
@@ -49,9 +47,6 @@ public class ConsumerServiceImpl implements ConsumerService {
@Autowired
private TopicService topicService;
@Autowired
private ZookeeperService zkService;
private final ExecutorService consumerListThreadPool = Executors.newFixedThreadPool(50, new DefaultThreadFactory("ConsumerPool"));
@Override
@@ -135,20 +130,20 @@ public class ConsumerServiceImpl implements ConsumerService {
@Override
public List<ConsumerDTO> getMonitoredConsumerList(final ClusterDO clusterDO,
final Map<String, List<PartitionState>> partitionStateListMap) {
final Map<TopicPartition, Long> allPartitionOffsetMap) {
List<ConsumerGroupDTO> consumerGroupDTOList = getConsumerGroupList(clusterDO.getId());
if (consumerGroupDTOList == null || consumerGroupDTOList.isEmpty()) {
return new ArrayList<>();
}
FutureTask<ConsumerDTO>[] taskList = new FutureTask[consumerGroupDTOList.size()];
FutureTask<List<ConsumerDTO>>[] taskList = new FutureTask[consumerGroupDTOList.size()];
for (int i = 0; i < consumerGroupDTOList.size(); i++) {
final ConsumerGroupDTO consumerGroupDTO = consumerGroupDTOList.get(i);
taskList[i] = new FutureTask<>(new Callable<ConsumerDTO>() {
taskList[i] = new FutureTask<>(new Callable<List<ConsumerDTO>>() {
@Override
public ConsumerDTO call() throws Exception {
public List<ConsumerDTO> call() throws Exception {
try {
return getMonitoredConsumer(clusterDO, consumerGroupDTO, partitionStateListMap);
return getMonitoredConsumer(clusterDO, consumerGroupDTO, allPartitionOffsetMap);
} catch (Exception e) {
logger.error("get monitored consumer error, group:{}", consumerGroupDTO.getConsumerGroup(), e);
}
@@ -159,31 +154,70 @@ public class ConsumerServiceImpl implements ConsumerService {
}
List<ConsumerDTO> consumerList = new ArrayList<>();
for (FutureTask<ConsumerDTO> task : taskList) {
ConsumerDTO consumer = null;
for (FutureTask<List<ConsumerDTO>> task : taskList) {
List<ConsumerDTO> dtoList = null;
try {
consumer = task.get();
dtoList = task.get();
} catch (Exception e) {
logger.error("getMonitoredConsumerList@ConsumeServiceImpl, ", e);
}
if (consumer == null) {
if (dtoList == null) {
continue;
}
consumerList.add(consumer);
consumerList.addAll(dtoList);
}
return consumerList;
}
private ConsumerDTO getMonitoredConsumer(ClusterDO cluster, ConsumerGroupDTO consumerGroupDTO, Map<String, List<PartitionState>> globalTopicNamePartitionStateListMap) {
// 获取当前consumerGroup下的所有的topic的partitionState信息
Map<String, List<PartitionState>> topicNamePartitionStateListMap = getConsumerGroupPartitionStateList(cluster, consumerGroupDTO, globalTopicNamePartitionStateListMap);
private List<ConsumerDTO> getMonitoredConsumer(ClusterDO clusterDO,
ConsumerGroupDTO consumerGroupDTO,
Map<TopicPartition, Long> allPartitionOffsetMap) {
List<ConsumerDTO> dtoList = new ArrayList<>();
//将没有对应consumer的partition信息统一放到一个consumer中
ConsumerDTO consumerDTO = new ConsumerDTO();
consumerDTO.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
consumerDTO.setLocation(consumerGroupDTO.getOffsetStoreLocation().name());
consumerDTO.setTopicPartitionMap(topicNamePartitionStateListMap);
return consumerDTO;
List<String> topicNameList = ConsumerMetadataCache.getConsumerGroupConsumedTopicList(
clusterDO.getId(),
consumerGroupDTO.getOffsetStoreLocation().getLocation(),
consumerGroupDTO.getConsumerGroup()
);
for (String topicName : topicNameList) {
TopicMetadata metadata = ClusterMetadataManager.getTopicMetaData(clusterDO.getId(), topicName);
if (metadata == null || metadata.getPartitionNum() <= 0) {
continue;
}
if (!allPartitionOffsetMap.containsKey(new TopicPartition(topicName, 0))) {
Map<TopicPartition, Long> offsetMap = topicService.getTopicPartitionOffset(clusterDO, topicName);
if (offsetMap == null) {
offsetMap = new HashMap<>();
}
allPartitionOffsetMap.putAll(offsetMap);
}
Map<Integer, Long> consumerOffsetMap = null;
if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.ZOOKEEPER)) {
consumerOffsetMap = getTopicConsumerOffsetInZK(clusterDO, metadata, consumerGroupDTO);
} else if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.BROKER)) {
consumerOffsetMap = getTopicConsumerOffsetInBroker(clusterDO, topicName, consumerGroupDTO);
}
Map<Integer, Long> partitionOffsetMap = new HashMap<>();
for (int partitionId = 0; partitionId < metadata.getPartitionNum(); ++partitionId) {
Long offset = allPartitionOffsetMap.get(new TopicPartition(topicName, partitionId));
if (offset == null) {
continue;
}
partitionOffsetMap.put(partitionId, offset);
}
ConsumerDTO consumerDTO = new ConsumerDTO();
consumerDTO.setClusterId(clusterDO.getId());
consumerDTO.setTopicName(topicName);
consumerDTO.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
consumerDTO.setLocation(consumerGroupDTO.getOffsetStoreLocation().getLocation());
consumerDTO.setPartitionOffsetMap(partitionOffsetMap);
consumerDTO.setConsumerOffsetMap(consumerOffsetMap);
dtoList.add(consumerDTO);
}
return dtoList;
}
@Override
@@ -264,52 +298,15 @@ public class ConsumerServiceImpl implements ConsumerService {
kafkaConsumer.commitSync();
}
/**
* 获取属于该集群和consumerGroup下的所有topic的信息
*/
private Map<String, List<PartitionState>> getConsumerGroupPartitionStateList(ClusterDO clusterDO,
ConsumerGroupDTO consumerGroupDTO,
Map<String, List<PartitionState>> globalTopicNamePartitionStateListMap) {
Map<String, List<PartitionState>> topicNamePartitionStateListMap = new HashMap<>(2);
private Map<Integer, Long> getTopicConsumerOffsetInZK(ClusterDO clusterDO,
TopicMetadata topicMetadata,
ConsumerGroupDTO consumerGroupDTO) {
Map<Integer, Long> offsetMap = new HashMap<>();
List<String> topicNameList = ConsumerMetadataCache.getConsumerGroupConsumedTopicList(clusterDO.getId(),consumerGroupDTO.getOffsetStoreLocation().getLocation(), consumerGroupDTO.getConsumerGroup());
for (String topicName : topicNameList) {
if (!ClusterMetadataManager.isTopicExist(clusterDO.getId(), topicName)) {
continue;
}
List<PartitionState> partitionStateList = globalTopicNamePartitionStateListMap.get(topicName);
if (partitionStateList == null) {
try {
partitionStateList = zkService.getTopicPartitionState(clusterDO.getId(), topicName);
} catch (Exception e) {
logger.error("get topic partition state failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e);
}
if (partitionStateList == null) {
continue;
}
globalTopicNamePartitionStateListMap.put(topicName, partitionStateList);
}
List<PartitionState> consumerGroupPartitionStateList = new ArrayList<>();
for (PartitionState partitionState: partitionStateList) {
consumerGroupPartitionStateList.add((PartitionState) partitionState.clone());
}
if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.ZOOKEEPER)) {
updateTopicConsumerOffsetInZK(clusterDO, topicName, consumerGroupDTO, consumerGroupPartitionStateList);
} else if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.BROKER)) {
updateTopicConsumerOffsetInBroker(clusterDO, topicName, consumerGroupDTO, consumerGroupPartitionStateList);
}
topicNamePartitionStateListMap.put(topicName, consumerGroupPartitionStateList);
}
return topicNamePartitionStateListMap;
}
private void updateTopicConsumerOffsetInZK(ClusterDO cluster, String topicName, ConsumerGroupDTO consumerGroupDTO, List<PartitionState> partitionStateList) {
ZkConfigImpl zkConfig = ClusterMetadataManager.getZKConfig(cluster.getId());
for (PartitionState partitionState : partitionStateList) {
ZkConfigImpl zkConfig = ClusterMetadataManager.getZKConfig(clusterDO.getId());
for (int partitionId = 0; partitionId < topicMetadata.getPartitionNum(); ++partitionId) {
//offset存储于zk中
String consumerGroupOffsetLocation = ZkPathUtil.getConsumerGroupOffsetTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicName, partitionState.getPartitionId());
String consumerGroupOffsetLocation = ZkPathUtil.getConsumerGroupOffsetTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicMetadata.getTopic(), partitionId);
String offset = null;
try {
Stat stat = zkConfig.getNodeStat(consumerGroupOffsetLocation);
@@ -317,39 +314,32 @@ public class ConsumerServiceImpl implements ConsumerService {
continue;
}
offset = zkConfig.get(consumerGroupOffsetLocation);
offsetMap.put(partitionId, Long.valueOf(offset));
} catch (ConfigException e) {
e.printStackTrace();
}
String consumerId = null;
try {
consumerId = zkConfig.get(ZkPathUtil.getConsumerGroupOwnersTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicName, partitionState.getPartitionId()));
} catch (ConfigException e) {
// logger.error("get consumerId error in updateTopicConsumerOffsetInZK cluster:{} topic:{} consumerGroup:{}", cluster.getClusterName(), topicName, consumerGroupDTO.getConsumerGroup());
}
partitionState.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
updatePartitionStateOffset(partitionState, offset, consumerId);
}
return offsetMap;
}
private void updateTopicConsumerOffsetInBroker(ClusterDO cluster, String topicName, ConsumerGroupDTO consumerGroupDTO, List<PartitionState> partitionStateList) {
Map<Integer, String> offsetsFromBroker = getOffsetByGroupAndTopicFromBroker(cluster, consumerGroupDTO.getConsumerGroup(), topicName);
private Map<Integer, Long> getTopicConsumerOffsetInBroker(ClusterDO clusterDO,
String topicName,
ConsumerGroupDTO consumerGroupDTO) {
Map<Integer, String> offsetsFromBroker = getOffsetByGroupAndTopicFromBroker(clusterDO, consumerGroupDTO.getConsumerGroup(), topicName);
if (offsetsFromBroker == null || offsetsFromBroker.isEmpty()) {
return;
return new HashMap<>(0);
}
for (PartitionState partitionState : partitionStateList) {
int partitionId = partitionState.getPartitionId();
updatePartitionStateOffset(partitionState, offsetsFromBroker.get(partitionId), null);
Map<Integer, Long> offsetMap = new HashMap<>(offsetsFromBroker.size());
for (Map.Entry<Integer, String> entry: offsetsFromBroker.entrySet()) {
try {
offsetMap.put(entry.getKey(), Long.valueOf(entry.getValue()));
} catch (Exception e) {
logger.error("get topic consumer offset failed, clusterId:{} topicName:{} consumerGroup:{}."
, clusterDO.getId(), topicName, consumerGroupDTO.getConsumerGroup());
}
}
}
private void updatePartitionStateOffset(PartitionState partitionState, String offset, String consumerId) {
partitionState.setConsumeOffset(0);
if (!StringUtils.isEmpty(offset)) {
partitionState.setConsumeOffset(Long.parseLong(offset));
}
partitionState.setConsumerGroup(consumerId);
return offsetMap;
}
private Map<Integer, String> getConsumeIdMap(Long clusterId, String topicName, String consumerGroup) {

View File

@@ -343,9 +343,9 @@ public class TopicServiceImpl implements TopicService {
topicOverviewDTO.setBytesInPerSec(topicMetrics.getBytesInPerSec());
topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getTotalProduceRequestsPerSec());
} else {
topicMetrics = jmxService.getSpecifiedTopicMetricsFromJmx(clusterId, topicName, TopicMetrics.getFieldNameList(MetricsType.TOPIC_FLOW_DETAIL), true);
topicOverviewDTO.setBytesInPerSec(topicMetrics.getBytesInPerSec());
topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getTotalProduceRequestsPerSec());
// topicMetrics = jmxService.getSpecifiedTopicMetricsFromJmx(clusterId, topicName, TopicMetrics.getFieldNameList(MetricsType.TOPIC_FLOW_DETAIL), true);
topicOverviewDTO.setBytesInPerSec(0.0);
topicOverviewDTO.setProduceRequestPerSec(0.0);
}
return topicOverviewDTO;
}