diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSDescribeGroupsResult.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSDescribeGroupsResult.java new file mode 100644 index 00000000..9630703c --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSDescribeGroupsResult.java @@ -0,0 +1,45 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka; + +import org.apache.kafka.common.KafkaFuture; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +public class KSDescribeGroupsResult { + private final Map> futures; + + public KSDescribeGroupsResult(final Map> futures) { + this.futures = futures; + } + + /** + * Return a map from group id to futures which yield group descriptions. + */ + public Map> describedGroups() { + return futures; + } + + /** + * Return a future which yields all ConsumerGroupDescription objects, if all the describes succeed. + */ + public KafkaFuture> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply( + new KafkaFuture.BaseFunction>() { + @Override + public Map apply(Void v) { + try { + Map descriptions = new HashMap<>(futures.size()); + for (Map.Entry> entry : futures.entrySet()) { + descriptions.put(entry.getKey(), entry.getValue().get()); + } + return descriptions; + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, since the KafkaFuture#allOf already ensured + // that all of the futures completed successfully. + throw new RuntimeException(e); + } + } + }); + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSGroupDescription.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSGroupDescription.java new file mode 100644 index 00000000..c58f89d2 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSGroupDescription.java @@ -0,0 +1,124 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka; + +import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.utils.Utils; + +import java.util.*; + +public class KSGroupDescription { + private final String groupId; + private final String protocolType; + private final Collection members; + private final String partitionAssignor; + private final ConsumerGroupState state; + private final Node coordinator; + private final Set authorizedOperations; + + public KSGroupDescription(String groupId, + String protocolType, + Collection members, + String partitionAssignor, + ConsumerGroupState state, + Node coordinator) { + this(groupId, protocolType, members, partitionAssignor, state, coordinator, Collections.emptySet()); + } + + public KSGroupDescription(String groupId, + String protocolType, + Collection members, + String partitionAssignor, + ConsumerGroupState state, + Node coordinator, + Set authorizedOperations) { + this.groupId = groupId == null ? "" : groupId; + this.protocolType = protocolType; + this.members = members == null ? Collections.emptyList() : + Collections.unmodifiableList(new ArrayList<>(members)); + this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor; + this.state = state; + this.coordinator = coordinator; + this.authorizedOperations = authorizedOperations; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final KSGroupDescription that = (KSGroupDescription) o; + return protocolType == that.protocolType && + Objects.equals(groupId, that.groupId) && + Objects.equals(members, that.members) && + Objects.equals(partitionAssignor, that.partitionAssignor) && + state == that.state && + Objects.equals(coordinator, that.coordinator) && + Objects.equals(authorizedOperations, that.authorizedOperations); + } + + @Override + public int hashCode() { + return Objects.hash(groupId, protocolType, members, partitionAssignor, state, coordinator, authorizedOperations); + } + + /** + * The id of the consumer group. + */ + public String groupId() { + return groupId; + } + + /** + * If consumer group is simple or not. + */ + public String protocolType() { + return protocolType; + } + + /** + * A list of the members of the consumer group. + */ + public Collection members() { + return members; + } + + /** + * The consumer group partition assignor. + */ + public String partitionAssignor() { + return partitionAssignor; + } + + /** + * The consumer group state, or UNKNOWN if the state is too new for us to parse. + */ + public ConsumerGroupState state() { + return state; + } + + /** + * The consumer group coordinator, or null if the coordinator is not known. + */ + public Node coordinator() { + return coordinator; + } + + /** + * authorizedOperations for this group, or null if that information is not known. + */ + public Set authorizedOperations() { + return authorizedOperations; + } + + @Override + public String toString() { + return "(groupId=" + groupId + + ", protocolType=" + protocolType + + ", members=" + Utils.join(members, ",") + + ", partitionAssignor=" + partitionAssignor + + ", state=" + state + + ", coordinator=" + coordinator + + ", authorizedOperations=" + authorizedOperations + + ")"; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSListGroupsResult.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSListGroupsResult.java new file mode 100644 index 00000000..9f07138b --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSListGroupsResult.java @@ -0,0 +1,79 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka; + +import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.internals.KafkaFutureImpl; + +import java.util.ArrayList; +import java.util.Collection; + +public class KSListGroupsResult { + private final KafkaFutureImpl> all; + private final KafkaFutureImpl> valid; + private final KafkaFutureImpl> errors; + + public KSListGroupsResult(KafkaFutureImpl> future) { + this.all = new KafkaFutureImpl<>(); + this.valid = new KafkaFutureImpl<>(); + this.errors = new KafkaFutureImpl<>(); + future.thenApply(new KafkaFuture.BaseFunction, Void>() { + @Override + public Void apply(Collection results) { + ArrayList curErrors = new ArrayList<>(); + ArrayList curValid = new ArrayList<>(); + for (Object resultObject : results) { + if (resultObject instanceof Throwable) { + curErrors.add((Throwable) resultObject); + } else { + curValid.add((ConsumerGroupListing) resultObject); + } + } + if (!curErrors.isEmpty()) { + all.completeExceptionally(curErrors.get(0)); + } else { + all.complete(curValid); + } + valid.complete(curValid); + errors.complete(curErrors); + return null; + } + }); + } + + /** + * Returns a future that yields either an exception, or the full set of consumer group + * listings. + * + * In the event of a failure, the future yields nothing but the first exception which + * occurred. + */ + public KafkaFuture> all() { + return all; + } + + /** + * Returns a future which yields just the valid listings. + * + * This future never fails with an error, no matter what happens. Errors are completely + * ignored. If nothing can be fetched, an empty collection is yielded. + * If there is an error, but some results can be returned, this future will yield + * those partial results. When using this future, it is a good idea to also check + * the errors future so that errors can be displayed and handled. + */ + public KafkaFuture> valid() { + return valid; + } + + /** + * Returns a future which yields just the errors which occurred. + * + * If this future yields a non-empty collection, it is very likely that elements are + * missing from the valid() set. + * + * This future itself never fails with an error. In the event of an error, this future + * will successfully yield a collection containing at least one exception. + */ + public KafkaFuture> errors() { + return errors; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSMemberBaseAssignment.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSMemberBaseAssignment.java new file mode 100644 index 00000000..3c38e4e5 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSMemberBaseAssignment.java @@ -0,0 +1,4 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka; + +public class KSMemberBaseAssignment { +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSMemberConnectAssignment.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSMemberConnectAssignment.java new file mode 100644 index 00000000..ba6bf638 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSMemberConnectAssignment.java @@ -0,0 +1,25 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka; + +import lombok.Getter; +import org.apache.kafka.connect.runtime.distributed.ConnectProtocol; + + +@Getter +public class KSMemberConnectAssignment extends KSMemberBaseAssignment { + private final ConnectProtocol.Assignment assignment; + + private final ConnectProtocol.WorkerState workerState; + + public KSMemberConnectAssignment(ConnectProtocol.Assignment assignment, ConnectProtocol.WorkerState workerState) { + this.assignment = assignment; + this.workerState = workerState; + } + + @Override + public String toString() { + return "KSMemberConnectAssignment{" + + "assignment=" + assignment + + ", workerState=" + workerState + + '}'; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSMemberConsumerAssignment.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSMemberConsumerAssignment.java new file mode 100644 index 00000000..0a9950c3 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSMemberConsumerAssignment.java @@ -0,0 +1,50 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +public class KSMemberConsumerAssignment extends KSMemberBaseAssignment { + private final Set topicPartitions; + + /** + * Creates an instance with the specified parameters. + * + * @param topicPartitions List of topic partitions + */ + public KSMemberConsumerAssignment(Set topicPartitions) { + this.topicPartitions = topicPartitions == null ? Collections.emptySet() : + Collections.unmodifiableSet(new HashSet<>(topicPartitions)); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + KSMemberConsumerAssignment that = (KSMemberConsumerAssignment) o; + + return Objects.equals(topicPartitions, that.topicPartitions); + } + + @Override + public int hashCode() { + return topicPartitions != null ? topicPartitions.hashCode() : 0; + } + + /** + * The topic partitions assigned to a group member. + */ + public Set topicPartitions() { + return topicPartitions; + } + + @Override + public String toString() { + return "(topicPartitions=" + Utils.join(topicPartitions, ",") + ")"; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSMemberDescription.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSMemberDescription.java new file mode 100644 index 00000000..2690ba77 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/kafka/KSMemberDescription.java @@ -0,0 +1,93 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.kafka; + +import java.util.Objects; +import java.util.Optional; + +public class KSMemberDescription { + private final String memberId; + private final Optional groupInstanceId; + private final String clientId; + private final String host; + private final KSMemberBaseAssignment assignment; + + public KSMemberDescription(String memberId, + Optional groupInstanceId, + String clientId, + String host, + KSMemberBaseAssignment assignment) { + this.memberId = memberId == null ? "" : memberId; + this.groupInstanceId = groupInstanceId; + this.clientId = clientId == null ? "" : clientId; + this.host = host == null ? "" : host; + this.assignment = assignment == null ? + new KSMemberBaseAssignment() : assignment; + } + + public KSMemberDescription(String memberId, + String clientId, + String host, + KSMemberBaseAssignment assignment) { + this(memberId, Optional.empty(), clientId, host, assignment); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + KSMemberDescription that = (KSMemberDescription) o; + return memberId.equals(that.memberId) && + groupInstanceId.equals(that.groupInstanceId) && + clientId.equals(that.clientId) && + host.equals(that.host) && + assignment.equals(that.assignment); + } + + @Override + public int hashCode() { + return Objects.hash(memberId, groupInstanceId, clientId, host, assignment); + } + + /** + * The consumer id of the group member. + */ + public String consumerId() { + return memberId; + } + + /** + * The instance id of the group member. + */ + public Optional groupInstanceId() { + return groupInstanceId; + } + + /** + * The client id of the group member. + */ + public String clientId() { + return clientId; + } + + /** + * The host where the group member is running. + */ + public String host() { + return host; + } + + /** + * The assignment of the group member. + */ + public KSMemberBaseAssignment assignment() { + return assignment; + } + + @Override + public String toString() { + return "(memberId=" + memberId + + ", groupInstanceId=" + groupInstanceId.orElse("null") + + ", clientId=" + clientId + + ", host=" + host + + ", assignment=" + assignment + ")"; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/kafka/KSPartialKafkaAdminClient.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/kafka/KSPartialKafkaAdminClient.java new file mode 100644 index 00000000..f985fb01 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/kafka/KSPartialKafkaAdminClient.java @@ -0,0 +1,1539 @@ +package com.xiaojukeji.know.streaming.km.common.utils.kafka; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.xiaojukeji.know.streaming.km.common.annotations.KafkaSource; +import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.*; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientDnsLookup; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.StaleMetadataException; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.clients.admin.internals.AdminMetadataManager; +import org.apache.kafka.clients.admin.internals.ConsumerGroupOperationContext; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.*; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.message.DescribeGroupsRequestData; +import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup; +import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.ListGroupsRequestData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.MetadataRequestData; +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.network.ChannelBuilder; +import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.requests.DescribeGroupsRequest; +import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.ListGroupsRequest; +import org.apache.kafka.common.requests.ListGroupsResponse; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.runtime.distributed.ConnectProtocol; +import org.slf4j.Logger; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.utils.Utils.closeQuietly; + +/** + * The default implementation of {@link Admin}. An instance of this class is created by invoking one of the + * {@code create()} methods in {@code AdminClient}. Users should not refer to this class directly. + * + *

+ * This class is thread-safe. + *

+ * The API of this class is evolving, see {@link Admin} for details. + */ +@KafkaSource(modified = 1) +public class KSPartialKafkaAdminClient { + + /** + * The next integer to use to name a KafkaAdminClient which the user hasn't specified an explicit name for. + */ + private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new AtomicInteger(1); + + /** + * The prefix to use for the JMX metrics for this class + */ + private static final String JMX_PREFIX = "ks-kafka.admin.client"; + + /** + * An invalid shutdown time which indicates that a shutdown has not yet been performed. + */ + private static final long INVALID_SHUTDOWN_TIME = -1; + + /** + * Thread name prefix for admin client network thread + */ + static final String NETWORK_THREAD_PREFIX = "ks-kafka-admin-client-thread"; + + private final Logger log; + + /** + * The default timeout to use for an operation. + */ + private final int defaultApiTimeoutMs; + + /** + * The timeout to use for a single request. + */ + private final int requestTimeoutMs; + + /** + * The name of this AdminClient instance. + */ + private final String clientId; + + /** + * Provides the time. + */ + private final Time time; + + /** + * The cluster metadata manager used by the KafkaClient. + */ + private final AdminMetadataManager metadataManager; + + /** + * The metrics for this KafkaAdminClient. + */ + private final Metrics metrics; + + /** + * The network client to use. + */ + private final KafkaClient client; + + /** + * The runnable used in the service thread for this admin client. + */ + private final AdminClientRunnable runnable; + + /** + * The network service thread for this admin client. + */ + private final Thread thread; + + /** + * During a close operation, this is the time at which we will time out all pending operations + * and force the RPC thread to exit. If the admin client is not closing, this will be 0. + */ + private final AtomicLong hardShutdownTimeMs = new AtomicLong(INVALID_SHUTDOWN_TIME); + + /** + * A factory which creates TimeoutProcessors for the RPC thread. + */ + private final TimeoutProcessorFactory timeoutProcessorFactory; + + private final int maxRetries; + + private final long retryBackoffMs; + + /** + * Get or create a list value from a map. + * + * @param map The map to get or create the element from. + * @param key The key. + * @param The key type. + * @param The value type. + * @return The list value. + */ + static List getOrCreateListValue(Map> map, K key) { + return map.computeIfAbsent(key, k -> new LinkedList<>()); + } + + /** + * Get the current time remaining before a deadline as an integer. + * + * @param now The current time in milliseconds. + * @param deadlineMs The deadline time in milliseconds. + * @return The time delta in milliseconds. + */ + static int calcTimeoutMsRemainingAsInt(long now, long deadlineMs) { + long deltaMs = deadlineMs - now; + if (deltaMs > Integer.MAX_VALUE) + deltaMs = Integer.MAX_VALUE; + else if (deltaMs < Integer.MIN_VALUE) + deltaMs = Integer.MIN_VALUE; + return (int) deltaMs; + } + + /** + * Generate the client id based on the configuration. + * + * @param config The configuration + * + * @return The client id + */ + static String generateClientId(AdminClientConfig config) { + String clientId = config.getString(AdminClientConfig.CLIENT_ID_CONFIG); + if (!clientId.isEmpty()) + return clientId; + return "adminclient-" + ADMIN_CLIENT_ID_SEQUENCE.getAndIncrement(); + } + + /** + * Get the deadline for a particular call. + * + * @param now The current time in milliseconds. + * @param optionTimeoutMs The timeout option given by the user. + * + * @return The deadline in milliseconds. + */ + private long calcDeadlineMs(long now, Integer optionTimeoutMs) { + if (optionTimeoutMs != null) + return now + Math.max(0, optionTimeoutMs); + return now + defaultApiTimeoutMs; + } + + /** + * Pretty-print an exception. + * + * @param throwable The exception. + * + * @return A compact human-readable string. + */ + static String prettyPrintException(Throwable throwable) { + if (throwable == null) + return "Null exception."; + if (throwable.getMessage() != null) { + return throwable.getClass().getSimpleName() + ": " + throwable.getMessage(); + } + return throwable.getClass().getSimpleName(); + } + + public static KSPartialKafkaAdminClient create(Properties props) { + return KSPartialKafkaAdminClient.createInternal(new AdminClientConfig(props), null); + } + + static KSPartialKafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory) { + Metrics metrics = null; + NetworkClient networkClient = null; + Time time = Time.SYSTEM; + String clientId = generateClientId(config); + ChannelBuilder channelBuilder = null; + Selector selector = null; + ApiVersions apiVersions = new ApiVersions(); + LogContext logContext = createLogContext(clientId); + + try { + // Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it + // simplifies communication with older brokers) + AdminMetadataManager metadataManager = new AdminMetadataManager(logContext, + config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), + config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG)); + List addresses = ClientUtils.parseAndValidateAddresses( + config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), + config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)); + metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds()); + List reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class, + Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId)); + Map metricTags = Collections.singletonMap("client-id", clientId); + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG))) + .tags(metricTags); + JmxReporter jmxReporter = new JmxReporter(); + jmxReporter.configure(config.originals()); + reporters.add(jmxReporter); + MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, + config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); + metrics = new Metrics(metricConfig, reporters, time, metricsContext); + String metricGrpPrefix = "admin-client"; + channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext); + selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), + metrics, time, metricGrpPrefix, channelBuilder, logContext); + networkClient = new NetworkClient( + selector, + metadataManager.updater(), + clientId, + 1, + config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG), + config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), + config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG), + config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG), + (int) TimeUnit.HOURS.toMillis(1), + config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG), + config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG), + ClientDnsLookup.forConfig(config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)), + time, + true, + apiVersions, + logContext); + return new KSPartialKafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient, + timeoutProcessorFactory, logContext); + } catch (Throwable exc) { + closeQuietly(metrics, "Metrics"); + closeQuietly(networkClient, "NetworkClient"); + closeQuietly(selector, "Selector"); + closeQuietly(channelBuilder, "ChannelBuilder"); + throw new KafkaException("Failed to create new KafkaAdminClient", exc); + } + } + + static LogContext createLogContext(String clientId) { + return new LogContext("[AdminClient clientId=" + clientId + "] "); + } + + private KSPartialKafkaAdminClient(AdminClientConfig config, + String clientId, + Time time, + AdminMetadataManager metadataManager, + Metrics metrics, + KafkaClient client, + TimeoutProcessorFactory timeoutProcessorFactory, + LogContext logContext) { + this.clientId = clientId; + this.log = logContext.logger(KafkaAdminClient.class); + this.requestTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.defaultApiTimeoutMs = configureDefaultApiTimeoutMs(config); + this.time = time; + this.metadataManager = metadataManager; + this.metrics = metrics; + this.client = client; + this.runnable = new AdminClientRunnable(); + String threadName = NETWORK_THREAD_PREFIX + " | " + clientId; + this.thread = new KafkaThread(threadName, runnable, true); + this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ? + new TimeoutProcessorFactory() : timeoutProcessorFactory; + this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG); + this.retryBackoffMs = config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG); + config.logUnused(); + AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); + log.debug("Kafka admin client initialized"); + thread.start(); + } + + /** + * If a default.api.timeout.ms has been explicitly specified, raise an error if it conflicts with request.timeout.ms. + * If no default.api.timeout.ms has been configured, then set its value as the max of the default and request.timeout.ms. Also we should probably log a warning. + * Otherwise, use the provided values for both configurations. + * + * @param config The configuration + */ + private int configureDefaultApiTimeoutMs(AdminClientConfig config) { + int requestTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG); + int defaultApiTimeoutMs = config.getInt(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); + + if (defaultApiTimeoutMs < requestTimeoutMs) { + if (config.originals().containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) { + throw new ConfigException("The specified value of " + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG + + " must be no smaller than the value of " + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG + "."); + } else { + log.warn("Overriding the default value for {} ({}) with the explicitly configured request timeout {}", + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, this.defaultApiTimeoutMs, + requestTimeoutMs); + return requestTimeoutMs; + } + } + return defaultApiTimeoutMs; + } + + public void close(Duration timeout) { + long waitTimeMs = timeout.toMillis(); + if (waitTimeMs < 0) + throw new IllegalArgumentException("The timeout cannot be negative."); + waitTimeMs = Math.min(TimeUnit.DAYS.toMillis(365), waitTimeMs); // Limit the timeout to a year. + long now = time.milliseconds(); + long newHardShutdownTimeMs = now + waitTimeMs; + long prev = INVALID_SHUTDOWN_TIME; + while (true) { + if (hardShutdownTimeMs.compareAndSet(prev, newHardShutdownTimeMs)) { + if (prev == INVALID_SHUTDOWN_TIME) { + log.debug("Initiating close operation."); + } else { + log.debug("Moving hard shutdown time forward."); + } + client.wakeup(); // Wake the thread, if it is blocked inside poll(). + break; + } + prev = hardShutdownTimeMs.get(); + if (prev < newHardShutdownTimeMs) { + log.debug("Hard shutdown time is already earlier than requested."); + newHardShutdownTimeMs = prev; + break; + } + } + if (log.isDebugEnabled()) { + long deltaMs = Math.max(0, newHardShutdownTimeMs - time.milliseconds()); + log.debug("Waiting for the I/O thread to exit. Hard shutdown in {} ms.", deltaMs); + } + try { + // close() can be called by AdminClient thread when it invokes callback. That will + // cause deadlock, so check for that condition. + if (Thread.currentThread() != thread) { + // Wait for the thread to be joined. + thread.join(waitTimeMs); + } + log.debug("Kafka admin client closed."); + } catch (InterruptedException e) { + log.debug("Interrupted while joining I/O thread", e); + Thread.currentThread().interrupt(); + } + } + + /** + * An interface for providing a node for a call. + */ + private interface NodeProvider { + Node provide(); + } + + private class MetadataUpdateNodeIdProvider implements NodeProvider { + @Override + public Node provide() { + return client.leastLoadedNode(time.milliseconds()); + } + } + + private class ConstantNodeIdProvider implements NodeProvider { + private final int nodeId; + + ConstantNodeIdProvider(int nodeId) { + this.nodeId = nodeId; + } + + @Override + public Node provide() { + if (metadataManager.isReady() && + (metadataManager.nodeById(nodeId) != null)) { + return metadataManager.nodeById(nodeId); + } + // If we can't find the node with the given constant ID, we schedule a + // metadata update and hope it appears. This behavior is useful for avoiding + // flaky behavior in tests when the cluster is starting up and not all nodes + // have appeared. + metadataManager.requestUpdate(); + return null; + } + } + + /** + * Provides the least loaded node. + */ + private class LeastLoadedNodeProvider implements NodeProvider { + @Override + public Node provide() { + if (metadataManager.isReady()) { + // This may return null if all nodes are busy. + // In that case, we will postpone node assignment. + return client.leastLoadedNode(time.milliseconds()); + } + metadataManager.requestUpdate(); + return null; + } + } + + abstract class Call { + private final boolean internal; + private final String callName; + private final long deadlineMs; + private final NodeProvider nodeProvider; + private int tries = 0; + private boolean aborted = false; + private Node curNode = null; + private long nextAllowedTryMs = 0; + + Call(boolean internal, String callName, long deadlineMs, NodeProvider nodeProvider) { + this.internal = internal; + this.callName = callName; + this.deadlineMs = deadlineMs; + this.nodeProvider = nodeProvider; + } + + Call(String callName, long deadlineMs, NodeProvider nodeProvider) { + this(false, callName, deadlineMs, nodeProvider); + } + + protected Node curNode() { + return curNode; + } + + /** + * Handle a failure. + * + * Depending on what the exception is and how many times we have already tried, we may choose to + * fail the Call, or retry it. It is important to print the stack traces here in some cases, + * since they are not necessarily preserved in ApiVersionException objects. + * + * @param now The current time in milliseconds. + * @param throwable The failure exception. + */ + final void fail(long now, Throwable throwable) { + if (aborted) { + // If the call was aborted while in flight due to a timeout, deliver a + // TimeoutException. In this case, we do not get any more retries - the call has + // failed. We increment tries anyway in order to display an accurate log message. + tries++; + failWithTimeout(now, throwable); + return; + } + // If this is an UnsupportedVersionException that we can retry, do so. Note that a + // protocol downgrade will not count against the total number of retries we get for + // this RPC. That is why 'tries' is not incremented. + if ((throwable instanceof UnsupportedVersionException) && + handleUnsupportedVersionException((UnsupportedVersionException) throwable)) { + log.debug("{} attempting protocol downgrade and then retry.", this); + runnable.enqueue(this, now); + return; + } + tries++; + nextAllowedTryMs = now + retryBackoffMs; + + // If the call has timed out, fail. + if (calcTimeoutMsRemainingAsInt(now, deadlineMs) < 0) { + failWithTimeout(now, throwable); + return; + } + // If the exception is not retriable, fail. + if (!(throwable instanceof RetriableException)) { + if (log.isDebugEnabled()) { + log.debug("{} failed with non-retriable exception after {} attempt(s)", this, tries, + new Exception(prettyPrintException(throwable))); + } + handleFailure(throwable); + return; + } + // If we are out of retries, fail. + if (tries > maxRetries) { + failWithTimeout(now, throwable); + return; + } + if (log.isDebugEnabled()) { + log.debug("{} failed: {}. Beginning retry #{}", + this, prettyPrintException(throwable), tries); + } + runnable.enqueue(this, now); + } + + private void failWithTimeout(long now, Throwable cause) { + if (log.isDebugEnabled()) { + log.debug("{} timed out at {} after {} attempt(s)", this, now, tries, + new Exception(prettyPrintException(cause))); + } + handleFailure(new TimeoutException(this + " timed out at " + now + + " after " + tries + " attempt(s)", cause)); + } + + /** + * Create an AbstractRequest.Builder for this Call. + * + * @param timeoutMs The timeout in milliseconds. + * + * @return The AbstractRequest builder. + */ + @SuppressWarnings("rawtypes") + abstract AbstractRequest.Builder createRequest(int timeoutMs); + + /** + * Process the call response. + * + * @param abstractResponse The AbstractResponse. + * + */ + abstract void handleResponse(AbstractResponse abstractResponse); + + /** + * Handle a failure. This will only be called if the failure exception was not + * retriable, or if we hit a timeout. + * + * @param throwable The exception. + */ + abstract void handleFailure(Throwable throwable); + + /** + * Handle an UnsupportedVersionException. + * + * @param exception The exception. + * + * @return True if the exception can be handled; false otherwise. + */ + boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { + return false; + } + + @Override + public String toString() { + return "Call(callName=" + callName + ", deadlineMs=" + deadlineMs + + ", tries=" + tries + ", nextAllowedTryMs=" + nextAllowedTryMs + ")"; + } + + public boolean isInternal() { + return internal; + } + } + + static class TimeoutProcessorFactory { + TimeoutProcessor create(long now) { + return new TimeoutProcessor(now); + } + } + + static class TimeoutProcessor { + /** + * The current time in milliseconds. + */ + private final long now; + + /** + * The number of milliseconds until the next timeout. + */ + private int nextTimeoutMs; + + /** + * Create a new timeout processor. + * + * @param now The current time in milliseconds since the epoch. + */ + TimeoutProcessor(long now) { + this.now = now; + this.nextTimeoutMs = Integer.MAX_VALUE; + } + + /** + * Check for calls which have timed out. + * Timed out calls will be removed and failed. + * The remaining milliseconds until the next timeout will be updated. + * + * @param calls The collection of calls. + * + * @return The number of calls which were timed out. + */ + int handleTimeouts(Collection calls, String msg) { + int numTimedOut = 0; + for (Iterator iter = calls.iterator(); iter.hasNext(); ) { + Call call = iter.next(); + int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs); + if (remainingMs < 0) { + call.fail(now, new TimeoutException(msg + " Call: " + call.callName)); + iter.remove(); + numTimedOut++; + } else { + nextTimeoutMs = Math.min(nextTimeoutMs, remainingMs); + } + } + return numTimedOut; + } + + /** + * Check whether a call should be timed out. + * The remaining milliseconds until the next timeout will be updated. + * + * @param call The call. + * + * @return True if the call should be timed out. + */ + boolean callHasExpired(Call call) { + int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs); + if (remainingMs < 0) + return true; + nextTimeoutMs = Math.min(nextTimeoutMs, remainingMs); + return false; + } + + int nextTimeoutMs() { + return nextTimeoutMs; + } + } + + private final class AdminClientRunnable implements Runnable { + /** + * Calls which have not yet been assigned to a node. + * Only accessed from this thread. + */ + private final ArrayList pendingCalls = new ArrayList<>(); + + /** + * Maps nodes to calls that we want to send. + * Only accessed from this thread. + */ + private final Map> callsToSend = new HashMap<>(); + + /** + * Maps node ID strings to calls that have been sent. + * Only accessed from this thread. + */ + private final Map> callsInFlight = new HashMap<>(); + + /** + * Maps correlation IDs to calls that have been sent. + * Only accessed from this thread. + */ + private final Map correlationIdToCalls = new HashMap<>(); + + /** + * Pending calls. Protected by the object monitor. + * This will be null only if the thread has shut down. + */ + private List newCalls = new LinkedList<>(); + + /** + * Time out the elements in the pendingCalls list which are expired. + * + * @param processor The timeout processor. + */ + private void timeoutPendingCalls(TimeoutProcessor processor) { + int numTimedOut = processor.handleTimeouts(pendingCalls, "Timed out waiting for a node assignment."); + if (numTimedOut > 0) + log.debug("Timed out {} pending calls.", numTimedOut); + } + + /** + * Time out calls which have been assigned to nodes. + * + * @param processor The timeout processor. + */ + private int timeoutCallsToSend(TimeoutProcessor processor) { + int numTimedOut = 0; + for (List callList : callsToSend.values()) { + numTimedOut += processor.handleTimeouts(callList, + "Timed out waiting to send the call."); + } + if (numTimedOut > 0) + log.debug("Timed out {} call(s) with assigned nodes.", numTimedOut); + return numTimedOut; + } + + /** + * Drain all the calls from newCalls into pendingCalls. + * + * This function holds the lock for the minimum amount of time, to avoid blocking + * users of AdminClient who will also take the lock to add new calls. + */ + private synchronized void drainNewCalls() { + if (!newCalls.isEmpty()) { + pendingCalls.addAll(newCalls); + newCalls.clear(); + } + } + + /** + * Choose nodes for the calls in the pendingCalls list. + * + * @param now The current time in milliseconds. + * @return The minimum time until a call is ready to be retried if any of the pending + * calls are backing off after a failure + */ + private long maybeDrainPendingCalls(long now) { + long pollTimeout = Long.MAX_VALUE; + log.trace("Trying to choose nodes for {} at {}", pendingCalls, now); + + Iterator pendingIter = pendingCalls.iterator(); + while (pendingIter.hasNext()) { + Call call = pendingIter.next(); + + // If the call is being retried, await the proper backoff before finding the node + if (now < call.nextAllowedTryMs) { + pollTimeout = Math.min(pollTimeout, call.nextAllowedTryMs - now); + } else if (maybeDrainPendingCall(call, now)) { + pendingIter.remove(); + } + } + return pollTimeout; + } + + /** + * Check whether a pending call can be assigned a node. Return true if the pending call was either + * transferred to the callsToSend collection or if the call was failed. Return false if it + * should remain pending. + */ + private boolean maybeDrainPendingCall(Call call, long now) { + try { + Node node = call.nodeProvider.provide(); + if (node != null) { + log.trace("Assigned {} to node {}", call, node); + call.curNode = node; + getOrCreateListValue(callsToSend, node).add(call); + return true; + } else { + log.trace("Unable to assign {} to a node.", call); + return false; + } + } catch (Throwable t) { + // Handle authentication errors while choosing nodes. + log.debug("Unable to choose node for {}", call, t); + call.fail(now, t); + return true; + } + } + + /** + * Send the calls which are ready. + * + * @param now The current time in milliseconds. + * @return The minimum timeout we need for poll(). + */ + private long sendEligibleCalls(long now) { + long pollTimeout = Long.MAX_VALUE; + for (Iterator>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) { + Map.Entry> entry = iter.next(); + List calls = entry.getValue(); + if (calls.isEmpty()) { + iter.remove(); + continue; + } + Node node = entry.getKey(); + if (!client.ready(node, now)) { + long nodeTimeout = client.pollDelayMs(node, now); + pollTimeout = Math.min(pollTimeout, nodeTimeout); + log.trace("Client is not ready to send to {}. Must delay {} ms", node, nodeTimeout); + continue; + } + Call call = calls.remove(0); + int requestTimeoutMs = Math.min(KSPartialKafkaAdminClient.this.requestTimeoutMs, + calcTimeoutMsRemainingAsInt(now, call.deadlineMs)); + AbstractRequest.Builder requestBuilder; + try { + requestBuilder = call.createRequest(requestTimeoutMs); + } catch (Throwable throwable) { + call.fail(now, new KafkaException(String.format( + "Internal error sending %s to %s.", call.callName, node))); + continue; + } + ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, + true, requestTimeoutMs, null); + log.debug("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId()); + client.send(clientRequest, now); + getOrCreateListValue(callsInFlight, node.idString()).add(call); + correlationIdToCalls.put(clientRequest.correlationId(), call); + } + return pollTimeout; + } + + /** + * Time out expired calls that are in flight. + * + * Calls that are in flight may have been partially or completely sent over the wire. They may + * even be in the process of being processed by the remote server. At the moment, our only option + * to time them out is to close the entire connection. + * + * @param processor The timeout processor. + */ + private void timeoutCallsInFlight(TimeoutProcessor processor) { + int numTimedOut = 0; + for (Map.Entry> entry : callsInFlight.entrySet()) { + List contexts = entry.getValue(); + if (contexts.isEmpty()) + continue; + String nodeId = entry.getKey(); + // We assume that the first element in the list is the earliest. So it should be the + // only one we need to check the timeout for. + Call call = contexts.get(0); + if (processor.callHasExpired(call)) { + if (call.aborted) { + log.warn("Aborted call {} is still in callsInFlight.", call); + } else { + log.debug("Closing connection to {} to time out {}", nodeId, call); + call.aborted = true; + client.disconnect(nodeId); + numTimedOut++; + // We don't remove anything from the callsInFlight data structure. Because the connection + // has been closed, the calls should be returned by the next client#poll(), + // and handled at that point. + } + } + } + if (numTimedOut > 0) + log.debug("Timed out {} call(s) in flight.", numTimedOut); + } + + /** + * Handle responses from the server. + * + * @param now The current time in milliseconds. + * @param responses The latest responses from KafkaClient. + **/ + private void handleResponses(long now, List responses) { + for (ClientResponse response : responses) { + int correlationId = response.requestHeader().correlationId(); + + Call call = correlationIdToCalls.get(correlationId); + if (call == null) { + // If the server returns information about a correlation ID we didn't use yet, + // an internal server error has occurred. Close the connection and log an error message. + log.error("Internal server error on {}: server returned information about unknown " + + "correlation ID {}, requestHeader = {}", response.destination(), correlationId, + response.requestHeader()); + client.disconnect(response.destination()); + continue; + } + + // Stop tracking this call. + correlationIdToCalls.remove(correlationId); + List calls = callsInFlight.get(response.destination()); + if ((calls == null) || (!calls.remove(call))) { + log.error("Internal server error on {}: ignoring call {} in correlationIdToCall " + + "that did not exist in callsInFlight", response.destination(), call); + continue; + } + + // Handle the result of the call. This may involve retrying the call, if we got a + // retriable exception. + if (response.versionMismatch() != null) { + call.fail(now, response.versionMismatch()); + } else if (response.wasDisconnected()) { + AuthenticationException authException = client.authenticationException(call.curNode()); + if (authException != null) { + call.fail(now, authException); + } else { + call.fail(now, new DisconnectException(String.format( + "Cancelled %s request with correlation id %s due to node %s being disconnected", + call.callName, correlationId, response.destination()))); + } + } else { + try { + call.handleResponse(response.responseBody()); + if (log.isTraceEnabled()) + log.trace("{} got response {}", call, response.responseBody()); + } catch (Throwable t) { + if (log.isTraceEnabled()) + log.trace("{} handleResponse failed with {}", call, prettyPrintException(t)); + call.fail(now, t); + } + } + } + } + + /** + * Unassign calls that have not yet been sent based on some predicate. For example, this + * is used to reassign the calls that have been assigned to a disconnected node. + * + * @param shouldUnassign Condition for reassignment. If the predicate is true, then the calls will + * be put back in the pendingCalls collection and they will be reassigned + */ + private void unassignUnsentCalls(Predicate shouldUnassign) { + for (Iterator>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) { + Map.Entry> entry = iter.next(); + Node node = entry.getKey(); + List awaitingCalls = entry.getValue(); + + if (awaitingCalls.isEmpty()) { + iter.remove(); + } else if (shouldUnassign.test(node)) { + pendingCalls.addAll(awaitingCalls); + iter.remove(); + } + } + } + + private boolean hasActiveExternalCalls(Collection calls) { + for (Call call : calls) { + if (!call.isInternal()) { + return true; + } + } + return false; + } + + /** + * Return true if there are currently active external calls. + */ + private boolean hasActiveExternalCalls() { + if (hasActiveExternalCalls(pendingCalls)) { + return true; + } + for (List callList : callsToSend.values()) { + if (hasActiveExternalCalls(callList)) { + return true; + } + } + return hasActiveExternalCalls(correlationIdToCalls.values()); + } + + private boolean threadShouldExit(long now, long curHardShutdownTimeMs) { + if (!hasActiveExternalCalls()) { + log.trace("All work has been completed, and the I/O thread is now exiting."); + return true; + } + if (now >= curHardShutdownTimeMs) { + log.info("Forcing a hard I/O thread shutdown. Requests in progress will be aborted."); + return true; + } + log.debug("Hard shutdown in {} ms.", curHardShutdownTimeMs - now); + return false; + } + + @Override + public void run() { + log.trace("Thread starting"); + try { + processRequests(); + } finally { + AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics); + + int numTimedOut = 0; + TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE); + synchronized (this) { + numTimedOut += timeoutProcessor.handleTimeouts(newCalls, "The AdminClient thread has exited."); + newCalls = null; + } + numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, "The AdminClient thread has exited."); + numTimedOut += timeoutCallsToSend(timeoutProcessor); + numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(), + "The AdminClient thread has exited."); + if (numTimedOut > 0) { + log.debug("Timed out {} remaining operation(s).", numTimedOut); + } + closeQuietly(client, "KafkaClient"); + closeQuietly(metrics, "Metrics"); + log.debug("Exiting AdminClientRunnable thread."); + } + } + + private void processRequests() { + long now = time.milliseconds(); + while (true) { + // Copy newCalls into pendingCalls. + drainNewCalls(); + + // Check if the AdminClient thread should shut down. + long curHardShutdownTimeMs = hardShutdownTimeMs.get(); + if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && threadShouldExit(now, curHardShutdownTimeMs)) + break; + + // Handle timeouts. + TimeoutProcessor timeoutProcessor = timeoutProcessorFactory.create(now); + timeoutPendingCalls(timeoutProcessor); + timeoutCallsToSend(timeoutProcessor); + timeoutCallsInFlight(timeoutProcessor); + + long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs()); + if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) { + pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs - now); + } + + // Choose nodes for our pending calls. + pollTimeout = Math.min(pollTimeout, maybeDrainPendingCalls(now)); + long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now); + if (metadataFetchDelayMs == 0) { + metadataManager.transitionToUpdatePending(now); + Call metadataCall = makeMetadataCall(now); + // Create a new metadata fetch call and add it to the end of pendingCalls. + // Assign a node for just the new call (we handled the other pending nodes above). + + if (!maybeDrainPendingCall(metadataCall, now)) + pendingCalls.add(metadataCall); + } + pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now)); + + if (metadataFetchDelayMs > 0) { + pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs); + } + + // Ensure that we use a small poll timeout if there are pending calls which need to be sent + if (!pendingCalls.isEmpty()) + pollTimeout = Math.min(pollTimeout, retryBackoffMs); + + // Wait for network responses. + log.trace("Entering KafkaClient#poll(timeout={})", pollTimeout); + List responses = client.poll(pollTimeout, now); + log.trace("KafkaClient#poll retrieved {} response(s)", responses.size()); + + // unassign calls to disconnected nodes + unassignUnsentCalls(client::connectionFailed); + + // Update the current time and handle the latest responses. + now = time.milliseconds(); + handleResponses(now, responses); + } + } + + /** + * Queue a call for sending. + * + * If the AdminClient thread has exited, this will fail. Otherwise, it will succeed (even + * if the AdminClient is shutting down). This function should called when retrying an + * existing call. + * + * @param call The new call object. + * @param now The current time in milliseconds. + */ + void enqueue(Call call, long now) { + if (call.tries > maxRetries) { + log.debug("Max retries {} for {} reached", maxRetries, call); + call.fail(time.milliseconds(), new TimeoutException()); + return; + } + if (log.isDebugEnabled()) { + log.debug("Queueing {} with a timeout {} ms from now.", call, call.deadlineMs - now); + } + boolean accepted = false; + synchronized (this) { + if (newCalls != null) { + newCalls.add(call); + accepted = true; + } + } + if (accepted) { + client.wakeup(); // wake the thread if it is in poll() + } else { + log.debug("The AdminClient thread has exited. Timing out {}.", call); + call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread has exited.")); + } + } + + /** + * Initiate a new call. + * + * This will fail if the AdminClient is scheduled to shut down. + * + * @param call The new call object. + * @param now The current time in milliseconds. + */ + void call(Call call, long now) { + if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) { + log.debug("The AdminClient is not accepting new calls. Timing out {}.", call); + call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread is not accepting new calls.")); + } else { + enqueue(call, now); + } + } + + /** + * Create a new metadata call. + */ + private Call makeMetadataCall(long now) { + return new Call(true, "fetchMetadata", calcDeadlineMs(now, requestTimeoutMs), + new MetadataUpdateNodeIdProvider()) { + @Override + public MetadataRequest.Builder createRequest(int timeoutMs) { + // Since this only requests node information, it's safe to pass true + // for allowAutoTopicCreation (and it simplifies communication with + // older brokers) + return new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(Collections.emptyList()) + .setAllowAutoTopicCreation(true)); + } + + @Override + public void handleResponse(AbstractResponse abstractResponse) { + MetadataResponse response = (MetadataResponse) abstractResponse; + long now = time.milliseconds(); + metadataManager.update(response.buildCluster(), now); + + // Unassign all unsent requests after a metadata refresh to allow for a new + // destination to be selected from the new metadata + unassignUnsentCalls(node -> true); + } + + @Override + public void handleFailure(Throwable e) { + metadataManager.updateFailed(e); + } + }; + } + } + + private static boolean groupIdIsUnrepresentable(String groupId) { + return groupId == null; + } + + private void rescheduleFindCoordinatorTask(ConsumerGroupOperationContext context, Supplier nextCall, Call failedCall) { + log.info("Node {} is no longer the Coordinator. Retrying with new coordinator.", + context.node().orElse(null)); + // Requeue the task so that we can try with new coordinator + context.setNode(null); + + Call call = nextCall.get(); + call.tries = failedCall.tries + 1; + call.nextAllowedTryMs = calculateNextAllowedRetryMs(); + + Call findCoordinatorCall = getFindCoordinatorCall(context, nextCall); + runnable.call(findCoordinatorCall, time.milliseconds()); + } + + private static Map> createFutures(Collection groupIds) { + return new HashSet<>(groupIds).stream().collect( + Collectors.toMap(groupId -> groupId, + groupId -> { + if (groupIdIsUnrepresentable(groupId)) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidGroupIdException("The given group id '" + + groupId + "' cannot be represented in a request.")); + return future; + } else { + return new KafkaFutureImpl<>(); + } + } + )); + } + + public KSDescribeGroupsResult describeConsumerGroups(final Collection groupIds, + final DescribeConsumerGroupsOptions options) { + + final Map> futures = createFutures(groupIds); + + // TODO: KAFKA-6788, we should consider grouping the request per coordinator and send one request with a list of + // all consumer groups this coordinator host + for (final Map.Entry> entry : futures.entrySet()) { + // skip sending request for those futures that already failed. + if (entry.getValue().isCompletedExceptionally()) + continue; + + final String groupId = entry.getKey(); + + final long startFindCoordinatorMs = time.milliseconds(); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); + ConsumerGroupOperationContext context = + new ConsumerGroupOperationContext<>(groupId, options, deadline, futures.get(groupId)); + Call findCoordinatorCall = getFindCoordinatorCall(context, + () -> getDescribeConsumerGroupsCall(context)); + runnable.call(findCoordinatorCall, startFindCoordinatorMs); + } + + return new KSDescribeGroupsResult(new HashMap<>(futures)); + } + + /** + * Returns a {@code Call} object to fetch the coordinator for a consumer group id. Takes another Call + * parameter to schedule action that need to be taken using the coordinator. The param is a Supplier + * so that it can be lazily created, so that it can use the results of find coordinator call in its + * construction. + * + * @param The type of return value of the KafkaFuture, like ConsumerGroupDescription, Void etc. + * @param The type of configuration option, like DescribeConsumerGroupsOptions, ListConsumerGroupsOptions etc + */ + private > Call getFindCoordinatorCall(ConsumerGroupOperationContext context, + Supplier nextCall) { + return new Call("findCoordinator", context.deadline(), new LeastLoadedNodeProvider()) { + @Override + FindCoordinatorRequest.Builder createRequest(int timeoutMs) { + return new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(CoordinatorType.GROUP.id()) + .setKey(context.groupId())); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; + + if (handleGroupRequestError(response.error(), context.future())) + return; + + context.setNode(response.node()); + + runnable.call(nextCall.get(), time.milliseconds()); + } + + @Override + void handleFailure(Throwable throwable) { + context.future().completeExceptionally(throwable); + } + }; + } + + private Call getDescribeConsumerGroupsCall( + ConsumerGroupOperationContext context) { + return new Call("describeConsumerGroups", + context.deadline(), + new ConstantNodeIdProvider(context.node().get().id())) { + @Override + DescribeGroupsRequest.Builder createRequest(int timeoutMs) { + return new DescribeGroupsRequest.Builder( + new DescribeGroupsRequestData() + .setGroups(Collections.singletonList(context.groupId())) + .setIncludeAuthorizedOperations(context.options().includeAuthorizedOperations())); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; + + List describedGroups = response.data().groups(); + if (describedGroups.isEmpty()) { + context.future().completeExceptionally( + new InvalidGroupIdException("No consumer group found for GroupId: " + context.groupId())); + return; + } + + if (describedGroups.size() > 1 || + !describedGroups.get(0).groupId().equals(context.groupId())) { + String ids = Arrays.toString(describedGroups.stream().map(DescribedGroup::groupId).toArray()); + context.future().completeExceptionally(new InvalidGroupIdException( + "DescribeConsumerGroup request for GroupId: " + context.groupId() + " returned " + ids)); + return; + } + + final DescribedGroup describedGroup = describedGroups.get(0); + + // If coordinator changed since we fetched it, retry + if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) { + Call call = getDescribeConsumerGroupsCall(context); + rescheduleFindCoordinatorTask(context, () -> call, this); + return; + } + + final Errors groupError = Errors.forCode(describedGroup.errorCode()); + if (handleGroupRequestError(groupError, context.future())) { + return; + } + + final String protocolType = describedGroup.protocolType(); + + final List memberDescriptions = new ArrayList<>(describedGroup.members().size()); + for (DescribedGroupMember groupMember : describedGroup.members()) { + KSMemberBaseAssignment memberBaseAssignment = null; + + if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { + if (groupMember.memberAssignment().length > 0) { + final Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); + memberBaseAssignment = new KSMemberConsumerAssignment(new HashSet<>(assignment.partitions())); + } + } else { + ConnectProtocol.Assignment assignment = null; + if (groupMember.memberAssignment().length > 0) { + assignment = ConnectProtocol. + deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); + } + + ConnectProtocol.WorkerState workerState = null; + if (groupMember.memberMetadata().length > 0) { + workerState = ConnectProtocol. + deserializeMetadata(ByteBuffer.wrap(groupMember.memberMetadata())); + } + + memberBaseAssignment = new KSMemberConnectAssignment(assignment, workerState); + } + + memberDescriptions.add(new KSMemberDescription( + groupMember.memberId(), + Optional.ofNullable(groupMember.groupInstanceId()), + groupMember.clientId(), + groupMember.clientHost(), + memberBaseAssignment + )); + } + + context.future().complete(new KSGroupDescription( + context.groupId(), + protocolType, + memberDescriptions, + describedGroup.protocolData(), + ConsumerGroupState.parse(describedGroup.groupState()), + context.node().get()) + ); + } + + @Override + void handleFailure(Throwable throwable) { + context.future().completeExceptionally(throwable); + } + }; + } + + + private Set validAclOperations(final int authorizedOperations) { + if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) { + return null; + } + return Utils.from32BitField(authorizedOperations) + .stream() + .map(AclOperation::fromCode) + .filter(operation -> operation != AclOperation.UNKNOWN + && operation != AclOperation.ALL + && operation != AclOperation.ANY) + .collect(Collectors.toSet()); + } + + private boolean handleGroupRequestError(Errors error, KafkaFutureImpl future) { + if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) { + throw error.exception(); + } else if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); + return true; + } + return false; + } + + private final static class ListConsumerGroupsResults { + private final List errors; + private final HashMap listings; + private final HashSet remaining; + private final KafkaFutureImpl> future; + + ListConsumerGroupsResults(Collection leaders, + KafkaFutureImpl> future) { + this.errors = new ArrayList<>(); + this.listings = new HashMap<>(); + this.remaining = new HashSet<>(leaders); + this.future = future; + tryComplete(); + } + + synchronized void addError(Throwable throwable, Node node) { + ApiError error = ApiError.fromThrowable(throwable); + if (error.message() == null || error.message().isEmpty()) { + errors.add(error.error().exception("Error listing groups on " + node)); + } else { + errors.add(error.error().exception("Error listing groups on " + node + ": " + error.message())); + } + } + + synchronized void addListing(ConsumerGroupListing listing) { + listings.put(listing.groupId(), listing); + } + + synchronized void tryComplete(Node leader) { + remaining.remove(leader); + tryComplete(); + } + + private synchronized void tryComplete() { + if (remaining.isEmpty()) { + ArrayList results = new ArrayList<>(listings.values()); + results.addAll(errors); + future.complete(results); + } + } + } + + public KSListGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) { + final KafkaFutureImpl> all = new KafkaFutureImpl<>(); + final long nowMetadata = time.milliseconds(); + final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs()); + runnable.call(new Call("findAllBrokers", deadline, new LeastLoadedNodeProvider()) { + @Override + MetadataRequest.Builder createRequest(int timeoutMs) { + return new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(Collections.emptyList()) + .setAllowAutoTopicCreation(true)); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + MetadataResponse metadataResponse = (MetadataResponse) abstractResponse; + Collection nodes = metadataResponse.brokers(); + if (nodes.isEmpty()) + throw new StaleMetadataException("Metadata fetch failed due to missing broker list"); + + HashSet allNodes = new HashSet<>(nodes); + final ListConsumerGroupsResults results = new ListConsumerGroupsResults(allNodes, all); + + for (final Node node : allNodes) { + final long nowList = time.milliseconds(); + runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(node.id())) { + @Override + ListGroupsRequest.Builder createRequest(int timeoutMs) { + List states = options.states() + .stream() + .map(s -> s.toString()) + .collect(Collectors.toList()); + return new ListGroupsRequest.Builder(new ListGroupsRequestData().setStatesFilter(states)); + } + + private void maybeAddConsumerGroup(ListGroupsResponseData.ListedGroup group) { + String protocolType = group.protocolType(); + + @KafkaSource(modified = 1, modifyDesc = "原先代码忽略了connect的消费组,这里修改为将其放开") + final String groupId = group.groupId(); + final Optional state = group.groupState().equals("") + ? Optional.empty() + : Optional.of(ConsumerGroupState.parse(group.groupState())); + final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty(), state); + results.addListing(groupListing); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final ListGroupsResponse response = (ListGroupsResponse) abstractResponse; + synchronized (results) { + Errors error = Errors.forCode(response.data().errorCode()); + if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) { + throw error.exception(); + } else if (error != Errors.NONE) { + results.addError(error.exception(), node); + } else { + for (ListGroupsResponseData.ListedGroup group : response.data().groups()) { + maybeAddConsumerGroup(group); + } + } + results.tryComplete(node); + } + } + + @Override + void handleFailure(Throwable throwable) { + synchronized (results) { + results.addError(throwable, node); + results.tryComplete(node); + } + } + }, nowList); + } + } + + @Override + void handleFailure(Throwable throwable) { + KafkaException exception = new KafkaException("Failed to find brokers to send ListGroups", throwable); + all.complete(Collections.singletonList(exception)); + } + }, nowMetadata); + + return new KSListGroupsResult(all); + } + + + private long calculateNextAllowedRetryMs() { + return time.milliseconds() + retryBackoffMs; + } +}