From b4c60eb9109aed72a6f1b9365512dba5dd8d22af Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 28 Feb 2022 12:07:50 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=BF=87=E6=BB=A4=E6=8E=89Br?= =?UTF-8?q?oker=E7=9A=84=E8=BF=9E=E6=8E=A5=E4=BF=A1=E6=81=AF=E6=97=B6?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=8A=A0=E8=AF=B7=E6=B1=82=E7=B1=BB=E5=9E=8B?= =?UTF-8?q?=E7=9A=84=E5=88=A4=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/gateway/impl/TopicConnectionServiceImpl.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java index 0eea8623..4c9a5528 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.service.service.gateway.impl; +import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; @@ -167,8 +168,10 @@ public class TopicConnectionServiceImpl implements TopicConnectionService { TopicConnection dto = convert2TopicConnectionDTO(connectionDO); // 过滤掉broker的机器 - if (brokerHostnameSet.contains(dto.getHostname()) || brokerHostnameSet.contains(dto.getIp())) { - // 发现消费的机器是broker, 则直接跳过. brokerHostnameSet有的集群存储的是IP + if (KafkaClientEnum.FETCH_CLIENT.getName().toLowerCase().equals(connectionDO.getType()) + && (brokerHostnameSet.contains(dto.getHostname()) || brokerHostnameSet.contains(dto.getIp()))) { + // 如果是fetch请求,并且是Broker的机器,则将数据进行过滤。 + // bad-case:如果broker上部署了消费客户端,则这个消费客户端也会一并被过滤掉。 continue; }