[Bugfix]修复ConsumerAssignment类型转换错误的问题

1、问题
KSGroupDescription 的 KSMemberBaseAssignment 对象,转 KSMemberConsumerAssignment 时,会出现转换失败的错误。

2、原因
KSPartialKafkaAdminClient 在返回 KSMemberDescription 时,当 ConsumerGroup 的 memberAssignment.length() <= 0 时,遗漏对 memberBaseAssignment 对象进行初始化。

3、解决
发现 memberAssignment.length() <= 0 时,主动将 KSMemberDescription 中的 memberBaseAssignment 赋值为 KSMemberConsumerAssignment 对象。
This commit is contained in:
ZQKC
2023-03-17 20:34:50 +08:00
parent c71865f623
commit 769c2c0fbc
2 changed files with 4 additions and 1 deletions

View File

@@ -168,9 +168,10 @@ public class GroupManagerImpl implements GroupManager {
// 转换存储格式 // 转换存储格式
Map<TopicPartition, KSMemberDescription> tpMemberMap = new HashMap<>(); Map<TopicPartition, KSMemberDescription> tpMemberMap = new HashMap<>();
//如果不是connect集群 // 如果不是connect集群
if (!groupDescription.protocolType().equals(CONNECT_CLUSTER_PROTOCOL_TYPE)) { if (!groupDescription.protocolType().equals(CONNECT_CLUSTER_PROTOCOL_TYPE)) {
for (KSMemberDescription description : groupDescription.members()) { for (KSMemberDescription description : groupDescription.members()) {
// 如果是 Consumer 的 Description ,则 Assignment 的类型为 KSMemberConsumerAssignment 的
KSMemberConsumerAssignment assignment = (KSMemberConsumerAssignment) description.assignment(); KSMemberConsumerAssignment assignment = (KSMemberConsumerAssignment) description.assignment();
for (TopicPartition tp : assignment.topicPartitions()) { for (TopicPartition tp : assignment.topicPartitions()) {
tpMemberMap.put(tp, description); tpMemberMap.put(tp, description);

View File

@@ -1338,6 +1338,8 @@ public class KSPartialKafkaAdminClient {
if (groupMember.memberAssignment().length > 0) { if (groupMember.memberAssignment().length > 0) {
final Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); final Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
memberBaseAssignment = new KSMemberConsumerAssignment(new HashSet<>(assignment.partitions())); memberBaseAssignment = new KSMemberConsumerAssignment(new HashSet<>(assignment.partitions()));
} else {
memberBaseAssignment = new KSMemberConsumerAssignment(new HashSet<>());
} }
} else { } else {
ConnectProtocol.Assignment assignment = null; ConnectProtocol.Assignment assignment = null;