diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java index 0eea8623..4c9a5528 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.service.service.gateway.impl; +import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; @@ -167,8 +168,10 @@ public class TopicConnectionServiceImpl implements TopicConnectionService { TopicConnection dto = convert2TopicConnectionDTO(connectionDO); // 过滤掉broker的机器 - if (brokerHostnameSet.contains(dto.getHostname()) || brokerHostnameSet.contains(dto.getIp())) { - // 发现消费的机器是broker, 则直接跳过. brokerHostnameSet有的集群存储的是IP + if (KafkaClientEnum.FETCH_CLIENT.getName().toLowerCase().equals(connectionDO.getType()) + && (brokerHostnameSet.contains(dto.getHostname()) || brokerHostnameSet.contains(dto.getIp()))) { + // 如果是fetch请求,并且是Broker的机器,则将数据进行过滤。 + // bad-case:如果broker上部署了消费客户端,则这个消费客户端也会一并被过滤掉。 continue; }