diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java index 17216793..fd05c218 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java @@ -168,9 +168,10 @@ public class GroupManagerImpl implements GroupManager { // 转换存储格式 Map tpMemberMap = new HashMap<>(); - //如果不是connect集群 + // 如果不是connect集群 if (!groupDescription.protocolType().equals(CONNECT_CLUSTER_PROTOCOL_TYPE)) { for (KSMemberDescription description : groupDescription.members()) { + // 如果是 Consumer 的 Description ,则 Assignment 的类型为 KSMemberConsumerAssignment 的 KSMemberConsumerAssignment assignment = (KSMemberConsumerAssignment) description.assignment(); for (TopicPartition tp : assignment.topicPartitions()) { tpMemberMap.put(tp, description); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/kafka/KSPartialKafkaAdminClient.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/kafka/KSPartialKafkaAdminClient.java index f985fb01..db809305 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/kafka/KSPartialKafkaAdminClient.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/kafka/KSPartialKafkaAdminClient.java @@ -1338,6 +1338,8 @@ public class KSPartialKafkaAdminClient { if (groupMember.memberAssignment().length > 0) { final Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); memberBaseAssignment = new KSMemberConsumerAssignment(new HashSet<>(assignment.partitions())); + } else { + memberBaseAssignment = new KSMemberConsumerAssignment(new HashSet<>()); } } else { ConnectProtocol.Assignment assignment = null;