Untitled
unknown
java
2 years ago
35 kB
3
Indexable
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.kafka.clients.producer; import java.net.InetSocketAddress; import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.internals.BufferPool; import org.apache.kafka.clients.producer.internals.ProducerInterceptors; import org.apache.kafka.clients.producer.internals.ProducerMetadata; import org.apache.kafka.clients.producer.internals.ProducerMetrics; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.clients.producer.internals.TransactionManager; import org.apache.kafka.clients.producer.internals.TransactionalRequestResult; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.KafkaMetricsContext; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsContext; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; public class KafkaProducer<K, V> implements Producer<K, V> { private final Logger log; private static final String JMX_PREFIX = "kafka.producer"; public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread"; public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics"; private final String clientId; final Metrics metrics; private final Partitioner partitioner; private final int maxRequestSize; private final long totalMemorySize; private final ProducerMetadata metadata; private final RecordAccumulator accumulator; private final Sender sender; private final Thread ioThread; private final CompressionType compressionType; private final Sensor errors; private final Time time; private final Serializer<K> keySerializer; private final Serializer<V> valueSerializer; private final ProducerConfig producerConfig; private final long maxBlockTimeMs; private final ProducerInterceptors<K, V> interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; public KafkaProducer(Map<String, Object> configs) { this((Map)configs, (Serializer)null, (Serializer)null); } public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) { this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)), keySerializer, valueSerializer, (ProducerMetadata)null, (KafkaClient)null, (ProducerInterceptors)null, Time.SYSTEM); } public KafkaProducer(Properties properties) { this((Properties)properties, (Serializer)null, (Serializer)null); } public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) { this(Utils.propsToMap(properties), keySerializer, valueSerializer); } KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors<K, V> interceptors, Time time) { try { this.producerConfig = config; this.time = time; String transactionalId = config.getString("transactional.id"); this.clientId = config.getString("client.id"); LogContext logContext; if (transactionalId == null) { logContext = new LogContext(String.format("[Producer clientId=%s] ", this.clientId)); } else { logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", this.clientId, transactionalId)); } this.log = logContext.logger(KafkaProducer.class); this.log.trace("Starting the Kafka producer"); Map<String, String> metricTags = Collections.singletonMap("client-id", this.clientId); MetricConfig metricConfig = (new MetricConfig()).samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags); List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", this.clientId)); JmxReporter jmxReporter = new JmxReporter(); jmxReporter.configure(config.originals(Collections.singletonMap("client.id", this.clientId))); reporters.add(jmxReporter); MetricsContext metricsContext = new KafkaMetricsContext("kafka.producer", config.originalsWithPrefix("metrics.context.")); this.metrics = new Metrics(metricConfig, reporters, time, metricsContext); this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class, Collections.singletonMap("client.id", this.clientId)); long retryBackoffMs = config.getLong("retry.backoff.ms"); if (keySerializer == null) { this.keySerializer = (Serializer)config.getConfiguredInstance("key.serializer", Serializer.class); this.keySerializer.configure(config.originals(Collections.singletonMap("client.id", this.clientId)), true); } else { config.ignore("key.serializer"); this.keySerializer = keySerializer; } if (valueSerializer == null) { this.valueSerializer = (Serializer)config.getConfiguredInstance("value.serializer", Serializer.class); this.valueSerializer.configure(config.originals(Collections.singletonMap("client.id", this.clientId)), false); } else { config.ignore("value.serializer"); this.valueSerializer = valueSerializer; } List<ProducerInterceptor<K, V>> interceptorList = config.getConfiguredInstances("interceptor.classes", ProducerInterceptor.class, Collections.singletonMap("client.id", this.clientId)); if (interceptors != null) { this.interceptors = interceptors; } else { this.interceptors = new ProducerInterceptors(interceptorList); } ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); this.maxRequestSize = config.getInt("max.request.size"); this.totalMemorySize = config.getLong("buffer.memory"); this.compressionType = CompressionType.forName(config.getString("compression.type")); this.maxBlockTimeMs = config.getLong("max.block.ms"); int deliveryTimeoutMs = configureDeliveryTimeout(config, this.log); this.apiVersions = new ApiVersions(); this.transactionManager = this.configureTransactionState(config, logContext); this.accumulator = new RecordAccumulator(logContext, config.getInt("batch.size"), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs, this.metrics, "producer-metrics", time, this.apiVersions, this.transactionManager, new BufferPool(this.totalMemorySize, config.getInt("batch.size"), this.metrics, time, "producer-metrics")); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup")); if (metadata != null) { this.metadata = metadata; } else { this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), config.getLong("metadata.max.idle.ms"), logContext, clusterResourceListeners, Time.SYSTEM); this.metadata.bootstrap(addresses); } this.errors = this.metrics.sensor("errors"); this.sender = this.newSender(logContext, kafkaClient, this.metadata); String ioThreadName = "kafka-producer-network-thread | " + this.clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); config.logUnused(); AppInfoParser.registerAppInfo("kafka.producer", this.clientId, this.metrics, time.milliseconds()); this.log.debug("Kafka producer started"); } catch (Throwable var22) { this.close(Duration.ofMillis(0L), true); throw new KafkaException("Failed to construct kafka producer", var22); } } Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) { int maxInflightRequests = configureInflightRequests(this.producerConfig); int requestTimeoutMs = this.producerConfig.getInt("request.timeout.ms"); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(this.producerConfig, this.time, logContext); ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics); KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(new Selector(this.producerConfig.getLong("connections.max.idle.ms"), this.metrics, this.time, "producer", channelBuilder, logContext), metadata, this.clientId, maxInflightRequests, this.producerConfig.getLong("reconnect.backoff.ms"), this.producerConfig.getLong("reconnect.backoff.max.ms"), this.producerConfig.getInt("send.buffer.bytes"), this.producerConfig.getInt("receive.buffer.bytes"), requestTimeoutMs, this.producerConfig.getLong("socket.connection.setup.timeout.ms"), this.producerConfig.getLong("socket.connection.setup.timeout.max.ms"), ClientDnsLookup.forConfig(this.producerConfig.getString("client.dns.lookup")), this.time, true, this.apiVersions, throttleTimeSensor, logContext); short acks = configureAcks(this.producerConfig, this.log); return new Sender(logContext, (KafkaClient)client, metadata, this.accumulator, maxInflightRequests == 1, this.producerConfig.getInt("max.request.size"), acks, this.producerConfig.getInt("retries"), metricsRegistry.senderMetrics, this.time, requestTimeoutMs, this.producerConfig.getLong("retry.backoff.ms"), this.transactionManager, this.apiVersions); } private static int lingerMs(ProducerConfig config) { return (int)Math.min(config.getLong("linger.ms"), 2147483647L); } private static int configureDeliveryTimeout(ProducerConfig config, Logger log) { int deliveryTimeoutMs = config.getInt("delivery.timeout.ms"); int lingerMs = lingerMs(config); int requestTimeoutMs = config.getInt("request.timeout.ms"); int lingerAndRequestTimeoutMs = (int)Math.min((long)lingerMs + (long)requestTimeoutMs, 2147483647L); if (deliveryTimeoutMs < lingerAndRequestTimeoutMs) { if (config.originals().containsKey("delivery.timeout.ms")) { throw new ConfigException("delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms"); } deliveryTimeoutMs = lingerAndRequestTimeoutMs; log.warn("{} should be equal to or larger than {} + {}. Setting it to {}.", new Object[]{"delivery.timeout.ms", "linger.ms", "request.timeout.ms", lingerAndRequestTimeoutMs}); } return deliveryTimeoutMs; } private TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext) { TransactionManager transactionManager = null; boolean userConfiguredIdempotence = config.originals().containsKey("enable.idempotence"); boolean userConfiguredTransactions = config.originals().containsKey("transactional.id"); if (userConfiguredTransactions && !userConfiguredIdempotence) { this.log.info("Overriding the default {} to true since {} is specified.", "enable.idempotence", "transactional.id"); } if (config.idempotenceEnabled()) { String transactionalId = config.getString("transactional.id"); int transactionTimeoutMs = config.getInt("transaction.timeout.ms"); long retryBackoffMs = config.getLong("retry.backoff.ms"); boolean autoDowngradeTxnCommit = config.getBoolean("internal.auto.downgrade.txn.commit"); transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, this.apiVersions, autoDowngradeTxnCommit); if (transactionManager.isTransactional()) { this.log.info("Instantiated a transactional producer."); } else { this.log.info("Instantiated an idempotent producer."); } } return transactionManager; } private static int configureInflightRequests(ProducerConfig config) { if (config.idempotenceEnabled() && 5 < config.getInt("max.in.flight.requests.per.connection")) { throw new ConfigException("Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer."); } else { return config.getInt("max.in.flight.requests.per.connection"); } } private static short configureAcks(ProducerConfig config, Logger log) { boolean userConfiguredAcks = config.originals().containsKey("acks"); short acks = Short.parseShort(config.getString("acks")); if (config.idempotenceEnabled()) { if (!userConfiguredAcks) { log.info("Overriding the default {} to all since idempotence is enabled.", "acks"); } else if (acks != -1) { throw new ConfigException("Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence."); } } return acks; } public void initTransactions() { this.throwIfNoTransactionManager(); this.throwIfProducerClosed(); TransactionalRequestResult result = this.transactionManager.initializeTransactions(); this.sender.wakeup(); result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS); } public void beginTransaction() throws ProducerFencedException { this.throwIfNoTransactionManager(); this.throwIfProducerClosed(); this.transactionManager.beginTransaction(); } public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException { this.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata(consumerGroupId)); } public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) throws ProducerFencedException { this.throwIfInvalidGroupMetadata(groupMetadata); this.throwIfNoTransactionManager(); this.throwIfProducerClosed(); TransactionalRequestResult result = this.transactionManager.sendOffsetsToTransaction(offsets, groupMetadata); this.sender.wakeup(); result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS); } public void commitTransaction() throws ProducerFencedException { this.throwIfNoTransactionManager(); this.throwIfProducerClosed(); TransactionalRequestResult result = this.transactionManager.beginCommit(); this.sender.wakeup(); result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS); } public void abortTransaction() throws ProducerFencedException { this.throwIfNoTransactionManager(); this.throwIfProducerClosed(); this.log.info("Aborting incomplete transaction"); TransactionalRequestResult result = this.transactionManager.beginAbort(); this.sender.wakeup(); result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS); } public Future<RecordMetadata> send(ProducerRecord<K, V> record) { return this.send(record, (Callback)null); } public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return this.doSend(interceptedRecord, callback); } private void throwIfProducerClosed() { if (this.sender == null || !this.sender.isRunning()) { throw new IllegalStateException("Cannot perform operation after producer has been closed"); } } private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { this.throwIfProducerClosed(); long nowMs = this.time.milliseconds(); ClusterAndWaitTime clusterAndWaitTime; try { clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs); } catch (KafkaException var22) { if (this.metadata.isClosed()) { throw new KafkaException("Producer closed while send in progress", var22); } throw var22; } nowMs += clusterAndWaitTime.waitedOnMetadataMs; long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; try { serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException var21) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21); } byte[] serializedValue; try { serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException var20) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20); } int partition = this.partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); this.setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers); this.ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); if (this.log.isTraceEnabled()) { this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition}); } Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp); if (this.transactionManager != null && this.transactionManager.isTransactional()) { this.transactionManager.failIfNotReadyForSend(); } RecordAccumulator.RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); if (result.abortForNewBatch) { int prevPartition = partition; this.partitioner.onNewBatch(record.topic(), cluster, partition); partition = this.partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (this.log.isTraceEnabled()) { this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition}); } interceptCallback = new InterceptorCallback(callback, this.interceptors, tp); result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } if (this.transactionManager != null && this.transactionManager.isTransactional()) { this.transactionManager.maybeAddPartitionToTransaction(tp); } if (result.batchIsFull || result.newBatchCreated) { this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; } catch (ApiException var23) { this.log.debug("Exception occurred during message send:", var23); if (callback != null) { callback.onCompletion((RecordMetadata)null, var23); } this.errors.record(); this.interceptors.onSendError(record, tp, var23); return new FutureFailure(var23); } catch (InterruptedException var24) { this.errors.record(); this.interceptors.onSendError(record, tp, var24); throw new InterruptException(var24); } catch (KafkaException var25) { this.errors.record(); this.interceptors.onSendError(record, tp, var25); throw var25; } catch (Exception var26) { this.interceptors.onSendError(record, tp, var26); throw var26; } } private void setReadOnly(Headers headers) { if (headers instanceof RecordHeaders) { ((RecordHeaders)headers).setReadOnly(); } } private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException { Cluster cluster = this.metadata.fetch(); if (cluster.invalidTopics().contains(topic)) { throw new InvalidTopicException(topic); } else { this.metadata.add(topic, nowMs); Integer partitionsCount = cluster.partitionCountForTopic(topic); if (partitionsCount != null && (partition == null || partition < partitionsCount)) { return new ClusterAndWaitTime(cluster, 0L); } else { long remainingWaitMs = maxWaitMs; long elapsed = 0L; do { do { if (partition != null) { this.log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic); } else { this.log.trace("Requesting metadata update for topic {}.", topic); } this.metadata.add(topic, nowMs + elapsed); int version = this.metadata.requestUpdateForTopic(topic); this.sender.wakeup(); try { this.metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException var15) { throw new TimeoutException(String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs)); } cluster = this.metadata.fetch(); elapsed = this.time.milliseconds() - nowMs; if (elapsed >= maxWaitMs) { throw new TimeoutException(partitionsCount == null ? String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs) : String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", partition, topic, partitionsCount, maxWaitMs)); } this.metadata.maybeThrowExceptionForTopic(topic); remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); } while(partitionsCount == null); } while(partition != null && partition >= partitionsCount); return new ClusterAndWaitTime(cluster, elapsed); } } } private void ensureValidRecordSize(int size) { if (size > this.maxRequestSize) { throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than " + this.maxRequestSize + ", which is the value of the " + "max.request.size" + " configuration."); } else if ((long)size > this.totalMemorySize) { throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + "buffer.memory" + " configuration."); } } public void flush() { this.log.trace("Flushing accumulated records in producer."); this.accumulator.beginFlush(); this.sender.wakeup(); try { this.accumulator.awaitFlushCompletion(); } catch (InterruptedException var2) { throw new InterruptException("Flush interrupted.", var2); } } public List<PartitionInfo> partitionsFor(String topic) { Objects.requireNonNull(topic, "topic cannot be null"); try { return this.waitOnMetadata(topic, (Integer)null, this.time.milliseconds(), this.maxBlockTimeMs).cluster.partitionsForTopic(topic); } catch (InterruptedException var3) { throw new InterruptException(var3); } } public Map<MetricName, ? extends Metric> metrics() { return Collections.unmodifiableMap(this.metrics.metrics()); } public void close() { this.close(Duration.ofMillis(Long.MAX_VALUE)); } public void close(Duration timeout) { this.close(timeout, false); } private void close(Duration timeout, boolean swallowException) { long timeoutMs = timeout.toMillis(); if (timeoutMs < 0L) { throw new IllegalArgumentException("The timeout cannot be negative."); } else { this.log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs); AtomicReference<Throwable> firstException = new AtomicReference(); boolean invokedFromCallback = Thread.currentThread() == this.ioThread; if (timeoutMs > 0L) { if (invokedFromCallback) { this.log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeoutMs); } else { if (this.sender != null) { this.sender.initiateClose(); } if (this.ioThread != null) { try { this.ioThread.join(timeoutMs); } catch (InterruptedException var9) { firstException.compareAndSet((Object)null, new InterruptException(var9)); this.log.error("Interrupted while joining ioThread", var9); } } } } if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) { this.log.info("Proceeding to force close the producer since pending requests could not be completed within timeout {} ms.", timeoutMs); this.sender.forceClose(); if (!invokedFromCallback) { try { this.ioThread.join(); } catch (InterruptedException var8) { firstException.compareAndSet((Object)null, new InterruptException(var8)); } } } Utils.closeQuietly(this.interceptors, "producer interceptors", firstException); Utils.closeQuietly(this.metrics, "producer metrics", firstException); Utils.closeQuietly(this.keySerializer, "producer keySerializer", firstException); Utils.closeQuietly(this.valueSerializer, "producer valueSerializer", firstException); Utils.closeQuietly(this.partitioner, "producer partitioner", firstException); AppInfoParser.unregisterAppInfo("kafka.producer", this.clientId, this.metrics); Throwable exception = (Throwable)firstException.get(); if (exception != null && !swallowException) { if (exception instanceof InterruptException) { throw (InterruptException)exception; } else { throw new KafkaException("Failed to close kafka producer", exception); } } else { this.log.debug("Kafka producer has been closed"); } } } private ClusterResourceListeners configureClusterResourceListeners(Serializer<K> keySerializer, Serializer<V> valueSerializer, List<?>... candidateLists) { ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); List[] var5 = candidateLists; int var6 = candidateLists.length; for(int var7 = 0; var7 < var6; ++var7) { List<?> candidateList = var5[var7]; clusterResourceListeners.maybeAddAll(candidateList); } clusterResourceListeners.maybeAdd(keySerializer); clusterResourceListeners.maybeAdd(valueSerializer); return clusterResourceListeners; } private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); } private void throwIfInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) { if (groupMetadata == null) { throw new IllegalArgumentException("Consumer group metadata could not be null"); } else if (groupMetadata.generationId() > 0 && "".equals(groupMetadata.memberId())) { throw new IllegalArgumentException("Passed in group metadata " + groupMetadata + " has generationId > 0 but member.id "); } } private void throwIfNoTransactionManager() { if (this.transactionManager == null) { throw new IllegalStateException("Cannot use transactional methods without enabling transactions by setting the transactional.id configuration property"); } } String getClientId() { return this.clientId; } private static class InterceptorCallback<K, V> implements Callback { private final Callback userCallback; private final ProducerInterceptors<K, V> interceptors; private final TopicPartition tp; private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, TopicPartition tp) { this.userCallback = userCallback; this.interceptors = interceptors; this.tp = tp; } public void onCompletion(RecordMetadata metadata, Exception exception) { metadata = metadata != null ? metadata : new RecordMetadata(this.tp, -1L, -1L, -1L, -1L, -1, -1); this.interceptors.onAcknowledgement(metadata, exception); if (this.userCallback != null) { this.userCallback.onCompletion(metadata, exception); } } } private static class FutureFailure implements Future<RecordMetadata> { private final ExecutionException exception; public FutureFailure(Exception exception) { this.exception = new ExecutionException(exception); } public boolean cancel(boolean interrupt) { return false; } public RecordMetadata get() throws ExecutionException { throw this.exception; } public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException { throw this.exception; } public boolean isCancelled() { return false; } public boolean isDone() { return true; } } private static class ClusterAndWaitTime { final Cluster cluster; final long waitedOnMetadataMs; ClusterAndWaitTime(Cluster cluster, long waitedOnMetadataMs) { this.cluster = cluster; this.waitedOnMetadataMs = waitedOnMetadataMs; } } }
Editor is loading...