> ## Documentation Index
> Fetch the complete documentation index at: https://docs.odigos.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

> Configuring the Kafka backend (Self-Hosted)

### Getting Started

<img src="https://d15jtxgb40qetw.cloudfront.net/kafka.svg" alt="kafka" className="not-prose h-20" />

{/*
  !! Do not remove this comment, this acts as a key indicator in `docs/sync-dest-doc.py` !!
  !! END CUSTOM EDIT !!
*/}

### Configuring Destination Fields

<Accordion title="Supported Signals:">
  ✅ Traces
  ✅ Metrics
  ✅ Logs
  ❌ Profiles
</Accordion>

* **KAFKA\_PROTOCOL\_VERSION** `string` : Protocol Version. Kafka protocol version.
  * This field is required
  * Example: `2.0.0`
* **KAFKA\_BROKERS** `string[]` : Brokers. The list of kafka brokers
  * This field is optional and defaults to `["localhost:9092"]`
* **KAFKA\_RESOLVE\_CANONICAL\_BOOTSTRAP\_SERVERS\_ONLY** `boolean` : Resolve Canonical Bootstrap Servers Only. Whether to resolve then reverse-lookup broker IPs during startup.
  * This field is optional and defaults to `False`
* **KAFKA\_CLIENT\_ID** `string` : Client ID. The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
  * This field is optional and defaults to `sarama`
* **KAFKA\_TOPIC** `string` : Topic. The name of the default kafka topic to export to (default = otlp\_spans for traces, otlp\_metrics for metrics, otlp\_logs for logs).
  * This field is optional
* **KAFKA\_TOPIC\_FROM\_ATTRIBUTE** `string` : Topic from Attribute. Specify the resource attribute whose value should be used as the message's topic.
  * This field is optional
* **KAFKA\_ENCODING** `string` : Encoding. The encoding of the traces sent to kafka.
  * This field is optional and defaults to `otlp_proto`
* **KAFKA\_PARTITION\_TRACES\_BY\_ID** `boolean` : Partition Traces by ID. Configures the exporter to include the trace ID as the message key in trace messages sent to kafka. Please note: this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
  * This field is optional and defaults to `False`
* **KAFKA\_PARTITION\_METRICS\_BY\_RESOURCE\_ATTRIBUTES** `boolean` : Partition Metrics by Resource Attributes. Configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
  * This field is optional and defaults to `False`
* **KAFKA\_PARTITION\_LOGS\_BY\_RESOURCE\_ATTRIBUTES** `boolean` : Partition Logs by Resource Attributes. Configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.
  * This field is optional and defaults to `False`
* **KAFKA\_AUTH\_METHOD** `string` : Auth Method. The auth method to use.
  * This field is required and defaults to `none`
* **KAFKA\_USERNAME** `string` : Username. The username to use.
  * This field is optional
* **KAFKA\_PASSWORD** `string` : Password. The password to use.
  * This field is optional
* **KAFKA\_METADATA\_FULL** `boolean` : Metadata Full. Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup.
  * This field is optional and defaults to `False`
* **KAFKA\_METADATA\_MAX\_RETRY** `string` : Metadata Max Retry. The number of retries to get metadata.
  * This field is optional and defaults to `3`
* **KAFKA\_METADATA\_BACKOFF\_RETRY** `string` : Metadata Backoff Retry. How long to wait between metadata retries.
  * This field is optional and defaults to `250ms`
* **KAFKA\_TIMEOUT** `string` : Timeout. Is the timeout for every attempt to send data to the backend.
  * This field is optional and defaults to `5s`
* **KAFKA\_RETRY\_ON\_FAILURE\_ENABLED** `boolean` : Enable Retry on Failure.
  * This field is optional and defaults to `True`
* **KAFKA\_RETRY\_ON\_FAILURE\_INITIAL\_INTERVAL** `string` : Initial Interval. Time to wait after the first failure before retrying; ignored if `enabled` is `false`.
  * This field is optional and defaults to `5s`
* **KAFKA\_RETRY\_ON\_FAILURE\_MAX\_INTERVAL** `string` : Max Interval. Is the upper bound on backoff; ignored if `enabled` is `false`.
  * This field is optional and defaults to `30s`
* **KAFKA\_RETRY\_ON\_FAILURE\_MAX\_ELAPSED\_TIME** `string` : Max Elapsed Time. Is the maximum amount of time spent trying to send a batch; ignored if `enabled` is `false`.
  * This field is optional and defaults to `120s`
* **KAFKA\_PRODUCER\_MAX\_MESSAGE\_BYTES** `string` : Producer Max Message Bytes. The maximum permitted size of a message in bytes.
  * This field is optional and defaults to `1000000`
* **KAFKA\_PRODUCER\_REQUIRED\_ACKS** `string` : Producer Required Acks. Controls when a message is regarded as transmitted.
  * This field is optional and defaults to `1`
* **KAFKA\_PRODUCER\_COMPRESSION** `string` : Producer Compression. The compression used when producing messages to kafka.
  * This field is optional and defaults to `none`
* **KAFKA\_PRODUCER\_FLUSH\_MAX\_MESSAGES** `string` : Producer Flush Max Messages. The maximum number of messages the producer will send in a single broker request.
  * This field is optional and defaults to `0`

<Note>
  The destination topic can be defined in a few different ways and takes priority in the following order:

  1. When `topic_from_attribute` is configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used.
  2. If a prior component in the collector pipeline sets the topic on the context via the `topic.WithTopic` function (from the [github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/kafka/topic) package), the value set in the context is used.
  3. Finally, the topic configuration is used as a default/fallback destination.
</Note>

### Adding Destination to Odigos

There are two primary methods for configuring destinations in Odigos:

##### **Using the UI**

<Steps>
  <Step>
    Use the [Odigos CLI](https://docs.odigos.io/cli/odigos_ui) to access the UI

    ```bash theme={null}
    odigos ui
    ```
  </Step>

  <Step>
    Click on `Add Destination`, select `Kafka` and follow the on-screen instructions
  </Step>
</Steps>

##### **Using Kubernetes manifests**

<Steps>
  <Step>
    Save the YAML below to a file (e.g. `kafka.yaml`)

    ```yaml theme={null}
    apiVersion: odigos.io/v1alpha1
    kind: Destination
    metadata:
      name: kafka-example
      namespace: odigos-system
    spec:
      data:
        KAFKA_AUTH_METHOD: '<Auth Method (default: none) (options: [none, plain_text])>'
        KAFKA_PROTOCOL_VERSION: <Protocol Version>
        # Note: The commented fields below are optional.
        # KAFKA_BROKERS: <Brokers (default: ["localhost:9092"])>
        # KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY: <Resolve Canonical Bootstrap Servers Only>
        # KAFKA_CLIENT_ID: <Client ID (default: sarama)>
        # KAFKA_TOPIC: <Topic>
        # KAFKA_TOPIC_FROM_ATTRIBUTE: <Topic from Attribute>
        # KAFKA_ENCODING: <Encoding (default: otlp_proto) (options: [otlp_proto, otlp_json])>
        # KAFKA_PARTITION_TRACES_BY_ID: <Partition Traces by ID>
        # KAFKA_PARTITION_METRICS_BY_RESOURCE_ATTRIBUTES: <Partition Metrics by Resource Attributes>
        # KAFKA_PARTITION_LOGS_BY_RESOURCE_ATTRIBUTES: <Partition Logs by Resource Attributes>
        # KAFKA_USERNAME: <Username>
        # KAFKA_METADATA_FULL: <Metadata Full>
        # KAFKA_METADATA_MAX_RETRY: <Metadata Max Retry (default: 3)>
        # KAFKA_METADATA_BACKOFF_RETRY: <Metadata Backoff Retry (default: 250ms)>
        # KAFKA_TIMEOUT: <Timeout (default: 5s)>
        # KAFKA_RETRY_ON_FAILURE_ENABLED: <Enable Retry on Failure (default: True)>
        # KAFKA_RETRY_ON_FAILURE_INITIAL_INTERVAL: <Initial Interval (default: 5s)>
        # KAFKA_RETRY_ON_FAILURE_MAX_INTERVAL: <Max Interval (default: 30s)>
        # KAFKA_RETRY_ON_FAILURE_MAX_ELAPSED_TIME: <Max Elapsed Time (default: 120s)>
        # KAFKA_PRODUCER_MAX_MESSAGE_BYTES: <Producer Max Message Bytes (default: 1000000)>
        # KAFKA_PRODUCER_REQUIRED_ACKS: <Producer Required Acks (default: 1)>
        # KAFKA_PRODUCER_COMPRESSION: <Producer Compression (default: none) (options: [none, gzip, snappy, lz4, zstd])>
        # KAFKA_PRODUCER_FLUSH_MAX_MESSAGES: <Producer Flush Max Messages (default: 0)>
      destinationName: kafka
      # Uncomment the 'secretRef' below if you are using the optional Secret.
      # secretRef:
      #   name: kafka-secret
      signals:
      - TRACES
      - METRICS
      - LOGS
      type: kafka

    ---

    # The following Secret is optional. Uncomment the entire block if you need to use it.
    # apiVersion: v1
    # data:
    #   KAFKA_PASSWORD: <Base64 Password>
    # kind: Secret
    # metadata:
    #   name: kafka-secret
    #   namespace: odigos-system
    # type: Opaque
    ```
  </Step>

  <Step>
    Apply the YAML using `kubectl`

    ```bash theme={null}
    kubectl apply -f kafka.yaml
    ```
  </Step>
</Steps>
