[Bugfix]修复Connect-GroupDescription解析失败的问题(#1010)

1、先尝试使用IncrementalCooperativeConnectProtocol协议进行解析;
2、IncrementalCooperativeConnectProtocol协议解析失败后,再维持原先的情况,使用ConnectProtocol协议进行解析;
This commit is contained in:
ZQKC
2023-05-12 14:25:20 +08:00
committed by EricZeng
parent e975932d41
commit 2256e8bbdb

View File

@@ -78,6 +78,8 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
@@ -1342,19 +1344,7 @@ public class KSPartialKafkaAdminClient {
memberBaseAssignment = new KSMemberConsumerAssignment(new HashSet<>());
}
} else {
ConnectProtocol.Assignment assignment = null;
if (groupMember.memberAssignment().length > 0) {
assignment = ConnectProtocol.
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
}
ConnectProtocol.WorkerState workerState = null;
if (groupMember.memberMetadata().length > 0) {
workerState = ConnectProtocol.
deserializeMetadata(ByteBuffer.wrap(groupMember.memberMetadata()));
}
memberBaseAssignment = new KSMemberConnectAssignment(assignment, workerState);
memberBaseAssignment = deserializeConnectGroupDataCompatibility(groupMember);
}
memberDescriptions.add(new KSMemberDescription(
@@ -1383,6 +1373,36 @@ public class KSPartialKafkaAdminClient {
};
}
private KSMemberBaseAssignment deserializeConnectGroupDataCompatibility(DescribedGroupMember groupMember) {
try {
// 高版本的反序列化方式
ExtendedWorkerState workerState = null;
if (groupMember.memberMetadata().length > 0) {
workerState = IncrementalCooperativeConnectProtocol.
deserializeMetadata(ByteBuffer.wrap(groupMember.memberMetadata()));
return new KSMemberConnectAssignment(workerState.assignment(), workerState);
}
} catch (Exception e) {
// ignore
}
// 低版本的反序列化方式
ConnectProtocol.Assignment assignment = null;
if (groupMember.memberAssignment().length > 0) {
assignment = ConnectProtocol.
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
}
ConnectProtocol.WorkerState workerState = null;
if (groupMember.memberMetadata().length > 0) {
workerState = ConnectProtocol.
deserializeMetadata(ByteBuffer.wrap(groupMember.memberMetadata()));
}
return new KSMemberConnectAssignment(assignment, workerState);
}
private Set<AclOperation> validAclOperations(final int authorizedOperations) {
if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) {