Interface ProducerInterceptor<K,​V>

  • All Superinterfaces:
    Configurable

    public interface ProducerInterceptor<K,​V>
    extends Configurable
    A plugin interface that allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster.

    This class will get producer config properties via configure() method, including clientId assigned by KafkaProducer if not specified in the producer config. The interceptor implementation needs to be aware that it will be sharing producer config namespace with other interceptors and serializers, and ensure that there are no conflicts.

    Exceptions thrown by ProducerInterceptor methods will be caught, logged, but not propagated further. As a result, if the user configures the interceptor with the wrong key and value type parameters, the producer will not throw an exception, just log the errors.

    ProducerInterceptor callbacks may be called from multiple threads. Interceptor implementation must ensure thread-safety, if needed.

    Implement ClusterResourceListener to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.

    • Method Detail

      • onSend

        ProducerRecord<K,​V> onSend​(ProducerRecord<K,​V> record)
        This is called from KafkaProducer.send(ProducerRecord) and KafkaProducer.send(ProducerRecord, Callback) methods, before key and value get serialized and partition is assigned (if partition is not specified in ProducerRecord).

        This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value, not key/value from the client. Consequently, key and value transformation done in onSend() needs to be consistent: same key and value should mutate to the same (modified) key and value. Otherwise, log compaction would not work as expected.

        Similarly, it is up to interceptor implementation to ensure that correct topic/partition is returned in ProducerRecord. Most often, it should be the same topic/partition from 'record'.

        Any exception thrown by this method will be caught by the caller and logged, but not propagated further.

        Since the producer may run multiple interceptors, a particular interceptor's onSend() callback will be called in the order specified by ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. The first interceptor in the list gets the record passed from the client, the following interceptor will be passed the record returned by the previous interceptor, and so on. Since interceptors are allowed to modify records, interceptors may potentially get the record already modified by other interceptors. However, building a pipeline of mutable interceptors that depend on the output of the previous interceptor is discouraged, because of potential side-effects caused by interceptors potentially failing to modify the record and throwing an exception. If one of the interceptors in the list throws an exception from onSend(), the exception is caught, logged, and the next interceptor is called with the record returned by the last successful interceptor in the list, or otherwise the client.

        Parameters:
        record - the record from client or the record returned by the previous interceptor in the chain of interceptors.
        Returns:
        producer record to send to topic/partition
      • onAcknowledgement

        void onAcknowledgement​(RecordMetadata metadata,
                               java.lang.Exception exception)
        This method is called when the record sent to the server has been acknowledged, or when sending the record fails before it gets sent to the server.

        This method is generally called just before the user callback is called, and in additional cases when KafkaProducer.send() throws an exception.

        Any exception thrown by this method will be ignored by the caller.

        This method will generally execute in the background I/O thread, so the implementation should be reasonably fast. Otherwise, sending of messages from other threads could be delayed.

        Parameters:
        metadata - The metadata for the record that was sent (i.e. the partition and offset). If an error occurred, metadata will contain only valid topic and maybe partition. If partition is not given in ProducerRecord and an error occurs before partition gets assigned, then partition will be set to RecordMetadata.NO_PARTITION. The metadata may be null if the client passed null record to KafkaProducer.send(ProducerRecord).
        exception - The exception thrown during processing of this record. Null if no error occurred.
      • close

        void close()
        This is called when interceptor is closed