mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-08 15:52:15 +08:00
增加过滤掉Broker的连接信息时,增加请求类型的判断
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
package com.xiaojukeji.kafka.manager.service.service.gateway.impl;
|
package com.xiaojukeji.kafka.manager.service.service.gateway.impl;
|
||||||
|
|
||||||
|
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO;
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection;
|
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection;
|
||||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
|
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
|
||||||
@@ -167,8 +168,10 @@ public class TopicConnectionServiceImpl implements TopicConnectionService {
|
|||||||
TopicConnection dto = convert2TopicConnectionDTO(connectionDO);
|
TopicConnection dto = convert2TopicConnectionDTO(connectionDO);
|
||||||
|
|
||||||
// 过滤掉broker的机器
|
// 过滤掉broker的机器
|
||||||
if (brokerHostnameSet.contains(dto.getHostname()) || brokerHostnameSet.contains(dto.getIp())) {
|
if (KafkaClientEnum.FETCH_CLIENT.getName().toLowerCase().equals(connectionDO.getType())
|
||||||
// 发现消费的机器是broker, 则直接跳过. brokerHostnameSet有的集群存储的是IP
|
&& (brokerHostnameSet.contains(dto.getHostname()) || brokerHostnameSet.contains(dto.getIp()))) {
|
||||||
|
// 如果是fetch请求,并且是Broker的机器,则将数据进行过滤。
|
||||||
|
// bad-case:如果broker上部署了消费客户端,则这个消费客户端也会一并被过滤掉。
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user