Untitled
unknown
java
2 years ago
35 kB
4
Indexable
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.apache.kafka.clients.producer;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
public class KafkaProducer<K, V> implements Producer<K, V> {
private final Logger log;
private static final String JMX_PREFIX = "kafka.producer";
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
private final String clientId;
final Metrics metrics;
private final Partitioner partitioner;
private final int maxRequestSize;
private final long totalMemorySize;
private final ProducerMetadata metadata;
private final RecordAccumulator accumulator;
private final Sender sender;
private final Thread ioThread;
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
private final ProducerConfig producerConfig;
private final long maxBlockTimeMs;
private final ProducerInterceptors<K, V> interceptors;
private final ApiVersions apiVersions;
private final TransactionManager transactionManager;
public KafkaProducer(Map<String, Object> configs) {
this((Map)configs, (Serializer)null, (Serializer)null);
}
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)), keySerializer, valueSerializer, (ProducerMetadata)null, (KafkaClient)null, (ProducerInterceptors)null, Time.SYSTEM);
}
public KafkaProducer(Properties properties) {
this((Properties)properties, (Serializer)null, (Serializer)null);
}
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(Utils.propsToMap(properties), keySerializer, valueSerializer);
}
KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors<K, V> interceptors, Time time) {
try {
this.producerConfig = config;
this.time = time;
String transactionalId = config.getString("transactional.id");
this.clientId = config.getString("client.id");
LogContext logContext;
if (transactionalId == null) {
logContext = new LogContext(String.format("[Producer clientId=%s] ", this.clientId));
} else {
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", this.clientId, transactionalId));
}
this.log = logContext.logger(KafkaProducer.class);
this.log.trace("Starting the Kafka producer");
Map<String, String> metricTags = Collections.singletonMap("client-id", this.clientId);
MetricConfig metricConfig = (new MetricConfig()).samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", this.clientId));
JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(config.originals(Collections.singletonMap("client.id", this.clientId)));
reporters.add(jmxReporter);
MetricsContext metricsContext = new KafkaMetricsContext("kafka.producer", config.originalsWithPrefix("metrics.context."));
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class, Collections.singletonMap("client.id", this.clientId));
long retryBackoffMs = config.getLong("retry.backoff.ms");
if (keySerializer == null) {
this.keySerializer = (Serializer)config.getConfiguredInstance("key.serializer", Serializer.class);
this.keySerializer.configure(config.originals(Collections.singletonMap("client.id", this.clientId)), true);
} else {
config.ignore("key.serializer");
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = (Serializer)config.getConfiguredInstance("value.serializer", Serializer.class);
this.valueSerializer.configure(config.originals(Collections.singletonMap("client.id", this.clientId)), false);
} else {
config.ignore("value.serializer");
this.valueSerializer = valueSerializer;
}
List<ProducerInterceptor<K, V>> interceptorList = config.getConfiguredInstances("interceptor.classes", ProducerInterceptor.class, Collections.singletonMap("client.id", this.clientId));
if (interceptors != null) {
this.interceptors = interceptors;
} else {
this.interceptors = new ProducerInterceptors(interceptorList);
}
ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.maxRequestSize = config.getInt("max.request.size");
this.totalMemorySize = config.getLong("buffer.memory");
this.compressionType = CompressionType.forName(config.getString("compression.type"));
this.maxBlockTimeMs = config.getLong("max.block.ms");
int deliveryTimeoutMs = configureDeliveryTimeout(config, this.log);
this.apiVersions = new ApiVersions();
this.transactionManager = this.configureTransactionState(config, logContext);
this.accumulator = new RecordAccumulator(logContext, config.getInt("batch.size"), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs, this.metrics, "producer-metrics", time, this.apiVersions, this.transactionManager, new BufferPool(this.totalMemorySize, config.getInt("batch.size"), this.metrics, time, "producer-metrics"));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup"));
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), config.getLong("metadata.max.idle.ms"), logContext, clusterResourceListeners, Time.SYSTEM);
this.metadata.bootstrap(addresses);
}
this.errors = this.metrics.sensor("errors");
this.sender = this.newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = "kafka-producer-network-thread | " + this.clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo("kafka.producer", this.clientId, this.metrics, time.milliseconds());
this.log.debug("Kafka producer started");
} catch (Throwable var22) {
this.close(Duration.ofMillis(0L), true);
throw new KafkaException("Failed to construct kafka producer", var22);
}
}
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
int maxInflightRequests = configureInflightRequests(this.producerConfig);
int requestTimeoutMs = this.producerConfig.getInt("request.timeout.ms");
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(this.producerConfig, this.time, logContext);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(new Selector(this.producerConfig.getLong("connections.max.idle.ms"), this.metrics, this.time, "producer", channelBuilder, logContext), metadata, this.clientId, maxInflightRequests, this.producerConfig.getLong("reconnect.backoff.ms"), this.producerConfig.getLong("reconnect.backoff.max.ms"), this.producerConfig.getInt("send.buffer.bytes"), this.producerConfig.getInt("receive.buffer.bytes"), requestTimeoutMs, this.producerConfig.getLong("socket.connection.setup.timeout.ms"), this.producerConfig.getLong("socket.connection.setup.timeout.max.ms"), ClientDnsLookup.forConfig(this.producerConfig.getString("client.dns.lookup")), this.time, true, this.apiVersions, throttleTimeSensor, logContext);
short acks = configureAcks(this.producerConfig, this.log);
return new Sender(logContext, (KafkaClient)client, metadata, this.accumulator, maxInflightRequests == 1, this.producerConfig.getInt("max.request.size"), acks, this.producerConfig.getInt("retries"), metricsRegistry.senderMetrics, this.time, requestTimeoutMs, this.producerConfig.getLong("retry.backoff.ms"), this.transactionManager, this.apiVersions);
}
private static int lingerMs(ProducerConfig config) {
return (int)Math.min(config.getLong("linger.ms"), 2147483647L);
}
private static int configureDeliveryTimeout(ProducerConfig config, Logger log) {
int deliveryTimeoutMs = config.getInt("delivery.timeout.ms");
int lingerMs = lingerMs(config);
int requestTimeoutMs = config.getInt("request.timeout.ms");
int lingerAndRequestTimeoutMs = (int)Math.min((long)lingerMs + (long)requestTimeoutMs, 2147483647L);
if (deliveryTimeoutMs < lingerAndRequestTimeoutMs) {
if (config.originals().containsKey("delivery.timeout.ms")) {
throw new ConfigException("delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms");
}
deliveryTimeoutMs = lingerAndRequestTimeoutMs;
log.warn("{} should be equal to or larger than {} + {}. Setting it to {}.", new Object[]{"delivery.timeout.ms", "linger.ms", "request.timeout.ms", lingerAndRequestTimeoutMs});
}
return deliveryTimeoutMs;
}
private TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext) {
TransactionManager transactionManager = null;
boolean userConfiguredIdempotence = config.originals().containsKey("enable.idempotence");
boolean userConfiguredTransactions = config.originals().containsKey("transactional.id");
if (userConfiguredTransactions && !userConfiguredIdempotence) {
this.log.info("Overriding the default {} to true since {} is specified.", "enable.idempotence", "transactional.id");
}
if (config.idempotenceEnabled()) {
String transactionalId = config.getString("transactional.id");
int transactionTimeoutMs = config.getInt("transaction.timeout.ms");
long retryBackoffMs = config.getLong("retry.backoff.ms");
boolean autoDowngradeTxnCommit = config.getBoolean("internal.auto.downgrade.txn.commit");
transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, this.apiVersions, autoDowngradeTxnCommit);
if (transactionManager.isTransactional()) {
this.log.info("Instantiated a transactional producer.");
} else {
this.log.info("Instantiated an idempotent producer.");
}
}
return transactionManager;
}
private static int configureInflightRequests(ProducerConfig config) {
if (config.idempotenceEnabled() && 5 < config.getInt("max.in.flight.requests.per.connection")) {
throw new ConfigException("Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.");
} else {
return config.getInt("max.in.flight.requests.per.connection");
}
}
private static short configureAcks(ProducerConfig config, Logger log) {
boolean userConfiguredAcks = config.originals().containsKey("acks");
short acks = Short.parseShort(config.getString("acks"));
if (config.idempotenceEnabled()) {
if (!userConfiguredAcks) {
log.info("Overriding the default {} to all since idempotence is enabled.", "acks");
} else if (acks != -1) {
throw new ConfigException("Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.");
}
}
return acks;
}
public void initTransactions() {
this.throwIfNoTransactionManager();
this.throwIfProducerClosed();
TransactionalRequestResult result = this.transactionManager.initializeTransactions();
this.sender.wakeup();
result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
}
public void beginTransaction() throws ProducerFencedException {
this.throwIfNoTransactionManager();
this.throwIfProducerClosed();
this.transactionManager.beginTransaction();
}
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
this.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata(consumerGroupId));
}
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) throws ProducerFencedException {
this.throwIfInvalidGroupMetadata(groupMetadata);
this.throwIfNoTransactionManager();
this.throwIfProducerClosed();
TransactionalRequestResult result = this.transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
this.sender.wakeup();
result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
}
public void commitTransaction() throws ProducerFencedException {
this.throwIfNoTransactionManager();
this.throwIfProducerClosed();
TransactionalRequestResult result = this.transactionManager.beginCommit();
this.sender.wakeup();
result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
}
public void abortTransaction() throws ProducerFencedException {
this.throwIfNoTransactionManager();
this.throwIfProducerClosed();
this.log.info("Aborting incomplete transaction");
TransactionalRequestResult result = this.transactionManager.beginAbort();
this.sender.wakeup();
result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return this.send(record, (Callback)null);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return this.doSend(interceptedRecord, callback);
}
private void throwIfProducerClosed() {
if (this.sender == null || !this.sender.isRunning()) {
throw new IllegalStateException("Cannot perform operation after producer has been closed");
}
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
this.throwIfProducerClosed();
long nowMs = this.time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);
} catch (KafkaException var22) {
if (this.metadata.isClosed()) {
throw new KafkaException("Producer closed while send in progress", var22);
}
throw var22;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException var21) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21);
}
byte[] serializedValue;
try {
serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException var20) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20);
}
int partition = this.partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
this.setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
this.ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (this.log.isTraceEnabled()) {
this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
}
Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
if (this.transactionManager != null && this.transactionManager.isTransactional()) {
this.transactionManager.failIfNotReadyForSend();
}
RecordAccumulator.RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
if (result.abortForNewBatch) {
int prevPartition = partition;
this.partitioner.onNewBatch(record.topic(), cluster, partition);
partition = this.partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (this.log.isTraceEnabled()) {
this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});
}
interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
if (this.transactionManager != null && this.transactionManager.isTransactional()) {
this.transactionManager.maybeAddPartitionToTransaction(tp);
}
if (result.batchIsFull || result.newBatchCreated) {
this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
} catch (ApiException var23) {
this.log.debug("Exception occurred during message send:", var23);
if (callback != null) {
callback.onCompletion((RecordMetadata)null, var23);
}
this.errors.record();
this.interceptors.onSendError(record, tp, var23);
return new FutureFailure(var23);
} catch (InterruptedException var24) {
this.errors.record();
this.interceptors.onSendError(record, tp, var24);
throw new InterruptException(var24);
} catch (KafkaException var25) {
this.errors.record();
this.interceptors.onSendError(record, tp, var25);
throw var25;
} catch (Exception var26) {
this.interceptors.onSendError(record, tp, var26);
throw var26;
}
}
private void setReadOnly(Headers headers) {
if (headers instanceof RecordHeaders) {
((RecordHeaders)headers).setReadOnly();
}
}
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
Cluster cluster = this.metadata.fetch();
if (cluster.invalidTopics().contains(topic)) {
throw new InvalidTopicException(topic);
} else {
this.metadata.add(topic, nowMs);
Integer partitionsCount = cluster.partitionCountForTopic(topic);
if (partitionsCount != null && (partition == null || partition < partitionsCount)) {
return new ClusterAndWaitTime(cluster, 0L);
} else {
long remainingWaitMs = maxWaitMs;
long elapsed = 0L;
do {
do {
if (partition != null) {
this.log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
this.log.trace("Requesting metadata update for topic {}.", topic);
}
this.metadata.add(topic, nowMs + elapsed);
int version = this.metadata.requestUpdateForTopic(topic);
this.sender.wakeup();
try {
this.metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException var15) {
throw new TimeoutException(String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs));
}
cluster = this.metadata.fetch();
elapsed = this.time.milliseconds() - nowMs;
if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null ? String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs) : String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", partition, topic, partitionsCount, maxWaitMs));
}
this.metadata.maybeThrowExceptionForTopic(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while(partitionsCount == null);
} while(partition != null && partition >= partitionsCount);
return new ClusterAndWaitTime(cluster, elapsed);
}
}
}
private void ensureValidRecordSize(int size) {
if (size > this.maxRequestSize) {
throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than " + this.maxRequestSize + ", which is the value of the " + "max.request.size" + " configuration.");
} else if ((long)size > this.totalMemorySize) {
throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + "buffer.memory" + " configuration.");
}
}
public void flush() {
this.log.trace("Flushing accumulated records in producer.");
this.accumulator.beginFlush();
this.sender.wakeup();
try {
this.accumulator.awaitFlushCompletion();
} catch (InterruptedException var2) {
throw new InterruptException("Flush interrupted.", var2);
}
}
public List<PartitionInfo> partitionsFor(String topic) {
Objects.requireNonNull(topic, "topic cannot be null");
try {
return this.waitOnMetadata(topic, (Integer)null, this.time.milliseconds(), this.maxBlockTimeMs).cluster.partitionsForTopic(topic);
} catch (InterruptedException var3) {
throw new InterruptException(var3);
}
}
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());
}
public void close() {
this.close(Duration.ofMillis(Long.MAX_VALUE));
}
public void close(Duration timeout) {
this.close(timeout, false);
}
private void close(Duration timeout, boolean swallowException) {
long timeoutMs = timeout.toMillis();
if (timeoutMs < 0L) {
throw new IllegalArgumentException("The timeout cannot be negative.");
} else {
this.log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs);
AtomicReference<Throwable> firstException = new AtomicReference();
boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
if (timeoutMs > 0L) {
if (invokedFromCallback) {
this.log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeoutMs);
} else {
if (this.sender != null) {
this.sender.initiateClose();
}
if (this.ioThread != null) {
try {
this.ioThread.join(timeoutMs);
} catch (InterruptedException var9) {
firstException.compareAndSet((Object)null, new InterruptException(var9));
this.log.error("Interrupted while joining ioThread", var9);
}
}
}
}
if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) {
this.log.info("Proceeding to force close the producer since pending requests could not be completed within timeout {} ms.", timeoutMs);
this.sender.forceClose();
if (!invokedFromCallback) {
try {
this.ioThread.join();
} catch (InterruptedException var8) {
firstException.compareAndSet((Object)null, new InterruptException(var8));
}
}
}
Utils.closeQuietly(this.interceptors, "producer interceptors", firstException);
Utils.closeQuietly(this.metrics, "producer metrics", firstException);
Utils.closeQuietly(this.keySerializer, "producer keySerializer", firstException);
Utils.closeQuietly(this.valueSerializer, "producer valueSerializer", firstException);
Utils.closeQuietly(this.partitioner, "producer partitioner", firstException);
AppInfoParser.unregisterAppInfo("kafka.producer", this.clientId, this.metrics);
Throwable exception = (Throwable)firstException.get();
if (exception != null && !swallowException) {
if (exception instanceof InterruptException) {
throw (InterruptException)exception;
} else {
throw new KafkaException("Failed to close kafka producer", exception);
}
} else {
this.log.debug("Kafka producer has been closed");
}
}
}
private ClusterResourceListeners configureClusterResourceListeners(Serializer<K> keySerializer, Serializer<V> valueSerializer, List<?>... candidateLists) {
ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
List[] var5 = candidateLists;
int var6 = candidateLists.length;
for(int var7 = 0; var7 < var6; ++var7) {
List<?> candidateList = var5[var7];
clusterResourceListeners.maybeAddAll(candidateList);
}
clusterResourceListeners.maybeAdd(keySerializer);
clusterResourceListeners.maybeAdd(valueSerializer);
return clusterResourceListeners;
}
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
private void throwIfInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) {
if (groupMetadata == null) {
throw new IllegalArgumentException("Consumer group metadata could not be null");
} else if (groupMetadata.generationId() > 0 && "".equals(groupMetadata.memberId())) {
throw new IllegalArgumentException("Passed in group metadata " + groupMetadata + " has generationId > 0 but member.id ");
}
}
private void throwIfNoTransactionManager() {
if (this.transactionManager == null) {
throw new IllegalStateException("Cannot use transactional methods without enabling transactions by setting the transactional.id configuration property");
}
}
String getClientId() {
return this.clientId;
}
private static class InterceptorCallback<K, V> implements Callback {
private final Callback userCallback;
private final ProducerInterceptors<K, V> interceptors;
private final TopicPartition tp;
private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, TopicPartition tp) {
this.userCallback = userCallback;
this.interceptors = interceptors;
this.tp = tp;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
metadata = metadata != null ? metadata : new RecordMetadata(this.tp, -1L, -1L, -1L, -1L, -1, -1);
this.interceptors.onAcknowledgement(metadata, exception);
if (this.userCallback != null) {
this.userCallback.onCompletion(metadata, exception);
}
}
}
private static class FutureFailure implements Future<RecordMetadata> {
private final ExecutionException exception;
public FutureFailure(Exception exception) {
this.exception = new ExecutionException(exception);
}
public boolean cancel(boolean interrupt) {
return false;
}
public RecordMetadata get() throws ExecutionException {
throw this.exception;
}
public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
throw this.exception;
}
public boolean isCancelled() {
return false;
}
public boolean isDone() {
return true;
}
}
private static class ClusterAndWaitTime {
final Cluster cluster;
final long waitedOnMetadataMs;
ClusterAndWaitTime(Cluster cluster, long waitedOnMetadataMs) {
this.cluster = cluster;
this.waitedOnMetadataMs = waitedOnMetadataMs;
}
}
}
Editor is loading...