新增自定义的KSPartialKafkaAdminClient

由于原生的KafkaAdminClient在解析Group时,会将Connect集群的Group过滤掉,因此自定义KSPartialKafkaAdminClient,使其具备获取Connect Group的能力
This commit is contained in:
zengqiao
2022-12-06 17:34:15 +08:00
committed by EricZeng
parent 5b3f3e5575
commit cc2a590b33
8 changed files with 1959 additions and 0 deletions

View File

@@ -0,0 +1,45 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka;
import org.apache.kafka.common.KafkaFuture;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class KSDescribeGroupsResult {
private final Map<String, KafkaFuture<KSGroupDescription>> futures;
public KSDescribeGroupsResult(final Map<String, KafkaFuture<KSGroupDescription>> futures) {
this.futures = futures;
}
/**
* Return a map from group id to futures which yield group descriptions.
*/
public Map<String, KafkaFuture<KSGroupDescription>> describedGroups() {
return futures;
}
/**
* Return a future which yields all ConsumerGroupDescription objects, if all the describes succeed.
*/
public KafkaFuture<Map<String, KSGroupDescription>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
new KafkaFuture.BaseFunction<Void, Map<String, KSGroupDescription>>() {
@Override
public Map<String, KSGroupDescription> apply(Void v) {
try {
Map<String, KSGroupDescription> descriptions = new HashMap<>(futures.size());
for (Map.Entry<String, KafkaFuture<KSGroupDescription>> entry : futures.entrySet()) {
descriptions.put(entry.getKey(), entry.getValue().get());
}
return descriptions;
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, since the KafkaFuture#allOf already ensured
// that all of the futures completed successfully.
throw new RuntimeException(e);
}
}
});
}
}

View File

@@ -0,0 +1,124 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.utils.Utils;
import java.util.*;
public class KSGroupDescription {
private final String groupId;
private final String protocolType;
private final Collection<KSMemberDescription> members;
private final String partitionAssignor;
private final ConsumerGroupState state;
private final Node coordinator;
private final Set<AclOperation> authorizedOperations;
public KSGroupDescription(String groupId,
String protocolType,
Collection<KSMemberDescription> members,
String partitionAssignor,
ConsumerGroupState state,
Node coordinator) {
this(groupId, protocolType, members, partitionAssignor, state, coordinator, Collections.emptySet());
}
public KSGroupDescription(String groupId,
String protocolType,
Collection<KSMemberDescription> members,
String partitionAssignor,
ConsumerGroupState state,
Node coordinator,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.protocolType = protocolType;
this.members = members == null ? Collections.emptyList() :
Collections.unmodifiableList(new ArrayList<>(members));
this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
this.state = state;
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final KSGroupDescription that = (KSGroupDescription) o;
return protocolType == that.protocolType &&
Objects.equals(groupId, that.groupId) &&
Objects.equals(members, that.members) &&
Objects.equals(partitionAssignor, that.partitionAssignor) &&
state == that.state &&
Objects.equals(coordinator, that.coordinator) &&
Objects.equals(authorizedOperations, that.authorizedOperations);
}
@Override
public int hashCode() {
return Objects.hash(groupId, protocolType, members, partitionAssignor, state, coordinator, authorizedOperations);
}
/**
* The id of the consumer group.
*/
public String groupId() {
return groupId;
}
/**
* If consumer group is simple or not.
*/
public String protocolType() {
return protocolType;
}
/**
* A list of the members of the consumer group.
*/
public Collection<KSMemberDescription> members() {
return members;
}
/**
* The consumer group partition assignor.
*/
public String partitionAssignor() {
return partitionAssignor;
}
/**
* The consumer group state, or UNKNOWN if the state is too new for us to parse.
*/
public ConsumerGroupState state() {
return state;
}
/**
* The consumer group coordinator, or null if the coordinator is not known.
*/
public Node coordinator() {
return coordinator;
}
/**
* authorizedOperations for this group, or null if that information is not known.
*/
public Set<AclOperation> authorizedOperations() {
return authorizedOperations;
}
@Override
public String toString() {
return "(groupId=" + groupId +
", protocolType=" + protocolType +
", members=" + Utils.join(members, ",") +
", partitionAssignor=" + partitionAssignor +
", state=" + state +
", coordinator=" + coordinator +
", authorizedOperations=" + authorizedOperations +
")";
}
}

View File

@@ -0,0 +1,79 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.util.ArrayList;
import java.util.Collection;
public class KSListGroupsResult {
private final KafkaFutureImpl<Collection<ConsumerGroupListing>> all;
private final KafkaFutureImpl<Collection<ConsumerGroupListing>> valid;
private final KafkaFutureImpl<Collection<Throwable>> errors;
public KSListGroupsResult(KafkaFutureImpl<Collection<Object>> future) {
this.all = new KafkaFutureImpl<>();
this.valid = new KafkaFutureImpl<>();
this.errors = new KafkaFutureImpl<>();
future.thenApply(new KafkaFuture.BaseFunction<Collection<Object>, Void>() {
@Override
public Void apply(Collection<Object> results) {
ArrayList<Throwable> curErrors = new ArrayList<>();
ArrayList<ConsumerGroupListing> curValid = new ArrayList<>();
for (Object resultObject : results) {
if (resultObject instanceof Throwable) {
curErrors.add((Throwable) resultObject);
} else {
curValid.add((ConsumerGroupListing) resultObject);
}
}
if (!curErrors.isEmpty()) {
all.completeExceptionally(curErrors.get(0));
} else {
all.complete(curValid);
}
valid.complete(curValid);
errors.complete(curErrors);
return null;
}
});
}
/**
* Returns a future that yields either an exception, or the full set of consumer group
* listings.
*
* In the event of a failure, the future yields nothing but the first exception which
* occurred.
*/
public KafkaFuture<Collection<ConsumerGroupListing>> all() {
return all;
}
/**
* Returns a future which yields just the valid listings.
*
* This future never fails with an error, no matter what happens. Errors are completely
* ignored. If nothing can be fetched, an empty collection is yielded.
* If there is an error, but some results can be returned, this future will yield
* those partial results. When using this future, it is a good idea to also check
* the errors future so that errors can be displayed and handled.
*/
public KafkaFuture<Collection<ConsumerGroupListing>> valid() {
return valid;
}
/**
* Returns a future which yields just the errors which occurred.
*
* If this future yields a non-empty collection, it is very likely that elements are
* missing from the valid() set.
*
* This future itself never fails with an error. In the event of an error, this future
* will successfully yield a collection containing at least one exception.
*/
public KafkaFuture<Collection<Throwable>> errors() {
return errors;
}
}

View File

@@ -0,0 +1,4 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka;
public class KSMemberBaseAssignment {
}

View File

@@ -0,0 +1,25 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka;
import lombok.Getter;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
@Getter
public class KSMemberConnectAssignment extends KSMemberBaseAssignment {
private final ConnectProtocol.Assignment assignment;
private final ConnectProtocol.WorkerState workerState;
public KSMemberConnectAssignment(ConnectProtocol.Assignment assignment, ConnectProtocol.WorkerState workerState) {
this.assignment = assignment;
this.workerState = workerState;
}
@Override
public String toString() {
return "KSMemberConnectAssignment{" +
"assignment=" + assignment +
", workerState=" + workerState +
'}';
}
}

View File

@@ -0,0 +1,50 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class KSMemberConsumerAssignment extends KSMemberBaseAssignment {
private final Set<TopicPartition> topicPartitions;
/**
* Creates an instance with the specified parameters.
*
* @param topicPartitions List of topic partitions
*/
public KSMemberConsumerAssignment(Set<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions == null ? Collections.<TopicPartition>emptySet() :
Collections.unmodifiableSet(new HashSet<>(topicPartitions));
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
KSMemberConsumerAssignment that = (KSMemberConsumerAssignment) o;
return Objects.equals(topicPartitions, that.topicPartitions);
}
@Override
public int hashCode() {
return topicPartitions != null ? topicPartitions.hashCode() : 0;
}
/**
* The topic partitions assigned to a group member.
*/
public Set<TopicPartition> topicPartitions() {
return topicPartitions;
}
@Override
public String toString() {
return "(topicPartitions=" + Utils.join(topicPartitions, ",") + ")";
}
}

View File

@@ -0,0 +1,93 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka;
import java.util.Objects;
import java.util.Optional;
public class KSMemberDescription {
private final String memberId;
private final Optional<String> groupInstanceId;
private final String clientId;
private final String host;
private final KSMemberBaseAssignment assignment;
public KSMemberDescription(String memberId,
Optional<String> groupInstanceId,
String clientId,
String host,
KSMemberBaseAssignment assignment) {
this.memberId = memberId == null ? "" : memberId;
this.groupInstanceId = groupInstanceId;
this.clientId = clientId == null ? "" : clientId;
this.host = host == null ? "" : host;
this.assignment = assignment == null ?
new KSMemberBaseAssignment() : assignment;
}
public KSMemberDescription(String memberId,
String clientId,
String host,
KSMemberBaseAssignment assignment) {
this(memberId, Optional.empty(), clientId, host, assignment);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
KSMemberDescription that = (KSMemberDescription) o;
return memberId.equals(that.memberId) &&
groupInstanceId.equals(that.groupInstanceId) &&
clientId.equals(that.clientId) &&
host.equals(that.host) &&
assignment.equals(that.assignment);
}
@Override
public int hashCode() {
return Objects.hash(memberId, groupInstanceId, clientId, host, assignment);
}
/**
* The consumer id of the group member.
*/
public String consumerId() {
return memberId;
}
/**
* The instance id of the group member.
*/
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
/**
* The client id of the group member.
*/
public String clientId() {
return clientId;
}
/**
* The host where the group member is running.
*/
public String host() {
return host;
}
/**
* The assignment of the group member.
*/
public KSMemberBaseAssignment assignment() {
return assignment;
}
@Override
public String toString() {
return "(memberId=" + memberId +
", groupInstanceId=" + groupInstanceId.orElse("null") +
", clientId=" + clientId +
", host=" + host +
", assignment=" + assignment + ")";
}
}