  • KAFKA_PROTOCOL_VERSION string : Protocol Version. Kafka protocol version.
    • This field is required
    • Example: 2.0.0
  • KAFKA_BROKERS string[] : Brokers. The list of kafka brokers
    • This field is optional and defaults to ["localhost:9092"]
  • KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY boolean : Resolve Canonical Bootstrap Servers Only. Whether to resolve then reverse-lookup broker IPs during startup.
    • This field is optional and defaults to False
  • KAFKA_CLIENT_ID string : Client ID. The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
    • This field is optional and defaults to sarama
  • KAFKA_TOPIC string : Topic. The name of the default kafka topic to export to (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs).
    • This field is optional
  • KAFKA_TOPIC_FROM_ATTRIBUTE string : Topic from Attribute. Specify the resource attribute whose value should be used as the message’s topic.
    • This field is optional
  • KAFKA_ENCODING string : Encoding. The encoding of the traces sent to kafka.
    • This field is optional and defaults to otlp_proto
  • KAFKA_PARTITION_TRACES_BY_ID boolean : Partition Traces by ID. Configures the exporter to include the trace ID as the message key in trace messages sent to kafka. Please note: this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
    • This field is optional and defaults to False
  • KAFKA_PARTITION_METRICS_BY_RESOURCE_ATTRIBUTES boolean : Partition Metrics by Resource Attributes. Configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
    • This field is optional and defaults to False
  • KAFKA_PARTITION_LOGS_BY_RESOURCE_ATTRIBUTES boolean : Partition Logs by Resource Attributes. Configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.
    • This field is optional and defaults to False
  • KAFKA_AUTH_METHOD string : Auth Method. The auth method to use.
    • This field is required and defaults to none
  • KAFKA_USERNAME string : Username. The username to use.
    • This field is optional
  • KAFKA_PASSWORD string : Password. The password to use.
    • This field is optional
  • KAFKA_METADATA_FULL boolean : Metadata Full. Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup.
    • This field is optional and defaults to False
  • KAFKA_METADATA_MAX_RETRY string : Metadata Max Retry. The number of retries to get metadata.
    • This field is optional and defaults to 3
  • KAFKA_METADATA_BACKOFF_RETRY string : Metadata Backoff Retry. How long to wait between metadata retries.
    • This field is optional and defaults to 250ms
  • KAFKA_TIMEOUT string : Timeout. Is the timeout for every attempt to send data to the backend.
    • This field is optional and defaults to 5s
  • KAFKA_RETRY_ON_FAILURE_ENABLED boolean : Enable Retry on Failure.
    • This field is optional and defaults to True
  • KAFKA_RETRY_ON_FAILURE_INITIAL_INTERVAL string : Initial Interval. Time to wait after the first failure before retrying; ignored if enabled is false.
    • This field is optional and defaults to 5s
  • KAFKA_RETRY_ON_FAILURE_MAX_INTERVAL string : Max Interval. Is the upper bound on backoff; ignored if enabled is false.
    • This field is optional and defaults to 30s
  • KAFKA_RETRY_ON_FAILURE_MAX_ELAPSED_TIME string : Max Elapsed Time. Is the maximum amount of time spent trying to send a batch; ignored if enabled is false.
    • This field is optional and defaults to 120s
  • KAFKA_PRODUCER_MAX_MESSAGE_BYTES string : Producer Max Message Bytes. The maximum permitted size of a message in bytes.
    • This field is optional and defaults to 1000000
  • KAFKA_PRODUCER_REQUIRED_ACKS string : Producer Required Acks. Controls when a message is regarded as transmitted.
    • This field is optional and defaults to 1
  • KAFKA_PRODUCER_COMPRESSION string : Producer Compression. The compression used when producing messages to kafka.
    • This field is optional and defaults to none
  • KAFKA_PRODUCER_FLUSH_MAX_MESSAGES string : Producer Flush Max Messages. The maximum number of messages the producer will send in a single broker request.
    • This field is optional and defaults to 0

The destination topic can be defined in a few different ways and takes priority in the following order:

  1. When topic_from_attribute is configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used.
  2. If a prior component in the collector pipeline sets the topic on the context via the topic.WithTopic function (from the package), the value set in the context is used.
  3. Finally, the topic configuration is used as a default/fallback destination.

Adding Destination to Odigos

There are two primary methods for configuring destinations in Odigos:

Using the UI

Use the Odigos CLI to access the UI

odigos ui

Click on Add Destination, select Kafka and follow the on-screen instructions

Using Kubernetes manifests

Save the YAML below to a file (e.g. kafka.yaml)

kind: Destination
  name: kafka-example
  namespace: odigos-system
    KAFKA_AUTH_METHOD: '<Auth Method (default: none) (options: [none, plain_text])>'
    KAFKA_PROTOCOL_VERSION: <Protocol Version>
    # Note: The commented fields below are optional.
    # KAFKA_BROKERS: <Brokers (default: ["localhost:9092"])>
    # KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY: <Resolve Canonical Bootstrap Servers Only>
    # KAFKA_CLIENT_ID: <Client ID (default: sarama)>
    # KAFKA_TOPIC: <Topic>
    # KAFKA_TOPIC_FROM_ATTRIBUTE: <Topic from Attribute>
    # KAFKA_ENCODING: <Encoding (default: otlp_proto) (options: [otlp_proto, otlp_json])>
    # KAFKA_PARTITION_TRACES_BY_ID: <Partition Traces by ID>
    # KAFKA_PARTITION_METRICS_BY_RESOURCE_ATTRIBUTES: <Partition Metrics by Resource Attributes>
    # KAFKA_PARTITION_LOGS_BY_RESOURCE_ATTRIBUTES: <Partition Logs by Resource Attributes>
    # KAFKA_USERNAME: <Username>
    # KAFKA_METADATA_FULL: <Metadata Full>
    # KAFKA_METADATA_MAX_RETRY: <Metadata Max Retry (default: 3)>
    # KAFKA_METADATA_BACKOFF_RETRY: <Metadata Backoff Retry (default: 250ms)>
    # KAFKA_TIMEOUT: <Timeout (default: 5s)>
    # KAFKA_RETRY_ON_FAILURE_ENABLED: <Enable Retry on Failure (default: True)>
    # KAFKA_RETRY_ON_FAILURE_INITIAL_INTERVAL: <Initial Interval (default: 5s)>
    # KAFKA_RETRY_ON_FAILURE_MAX_INTERVAL: <Max Interval (default: 30s)>
    # KAFKA_RETRY_ON_FAILURE_MAX_ELAPSED_TIME: <Max Elapsed Time (default: 120s)>
    # KAFKA_PRODUCER_MAX_MESSAGE_BYTES: <Producer Max Message Bytes (default: 1000000)>
    # KAFKA_PRODUCER_REQUIRED_ACKS: <Producer Required Acks (default: 1)>
    # KAFKA_PRODUCER_COMPRESSION: <Producer Compression (default: none) (options: [none, gzip, snappy, lz4, zstd])>
    # KAFKA_PRODUCER_FLUSH_MAX_MESSAGES: <Producer Flush Max Messages (default: 0)>
  destinationName: kafka
  # Uncomment the 'secretRef' below if you are using the optional Secret.
  # secretRef:
  #   name: kafka-secret
  - LOGS
  type: kafka


# The following Secret is optional. Uncomment the entire block if you need to use it.
# apiVersion: v1
# data:
#   KAFKA_PASSWORD: <Base64 Password>
# kind: Secret
# metadata:
#   name: kafka-secret
#   namespace: odigos-system
# type: Opaque

Apply the YAML using kubectl

kubectl apply -f kafka.yaml