From 2256e8bbdb122ff50634a97a3e5387f9a574bd3d Mon Sep 17 00:00:00 2001 From: ZQKC Date: Fri, 12 May 2023 14:25:20 +0800 Subject: [PATCH] =?UTF-8?q?[Bugfix]=E4=BF=AE=E5=A4=8DConnect-GroupDescript?= =?UTF-8?q?ion=E8=A7=A3=E6=9E=90=E5=A4=B1=E8=B4=A5=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98(#1010)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、先尝试使用IncrementalCooperativeConnectProtocol协议进行解析; 2、IncrementalCooperativeConnectProtocol协议解析失败后,再维持原先的情况,使用ConnectProtocol协议进行解析; --- .../kafka/KSPartialKafkaAdminClient.java | 46 +++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/kafka/KSPartialKafkaAdminClient.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/kafka/KSPartialKafkaAdminClient.java index db809305..8f53d998 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/kafka/KSPartialKafkaAdminClient.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/kafka/KSPartialKafkaAdminClient.java @@ -78,6 +78,8 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.distributed.ConnectProtocol; +import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState; +import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol; import org.slf4j.Logger; import java.net.InetSocketAddress; @@ -1342,19 +1344,7 @@ public class KSPartialKafkaAdminClient { memberBaseAssignment = new KSMemberConsumerAssignment(new HashSet<>()); } } else { - ConnectProtocol.Assignment assignment = null; - if (groupMember.memberAssignment().length > 0) { - assignment = ConnectProtocol. - deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); - } - - ConnectProtocol.WorkerState workerState = null; - if (groupMember.memberMetadata().length > 0) { - workerState = ConnectProtocol. - deserializeMetadata(ByteBuffer.wrap(groupMember.memberMetadata())); - } - - memberBaseAssignment = new KSMemberConnectAssignment(assignment, workerState); + memberBaseAssignment = deserializeConnectGroupDataCompatibility(groupMember); } memberDescriptions.add(new KSMemberDescription( @@ -1383,6 +1373,36 @@ public class KSPartialKafkaAdminClient { }; } + private KSMemberBaseAssignment deserializeConnectGroupDataCompatibility(DescribedGroupMember groupMember) { + try { + // 高版本的反序列化方式 + ExtendedWorkerState workerState = null; + if (groupMember.memberMetadata().length > 0) { + workerState = IncrementalCooperativeConnectProtocol. + deserializeMetadata(ByteBuffer.wrap(groupMember.memberMetadata())); + + return new KSMemberConnectAssignment(workerState.assignment(), workerState); + } + } catch (Exception e) { + // ignore + } + + // 低版本的反序列化方式 + ConnectProtocol.Assignment assignment = null; + if (groupMember.memberAssignment().length > 0) { + assignment = ConnectProtocol. + deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); + } + + ConnectProtocol.WorkerState workerState = null; + if (groupMember.memberMetadata().length > 0) { + workerState = ConnectProtocol. + deserializeMetadata(ByteBuffer.wrap(groupMember.memberMetadata())); + } + + return new KSMemberConnectAssignment(assignment, workerState); + } + private Set validAclOperations(final int authorizedOperations) { if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) {