Getting Started

Configuring Destination Fields

  • KAFKA_PROTOCOL_VERSION string : Protocol Version. Kafka protocol version.
    • This field is required
    • Example: 2.0.0
  • KAFKA_BROKERS string[] : Brokers. The list of kafka brokers
    • This field is optional and defaults to ["localhost:9092"]
  • KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY boolean : Resolve Canonical Bootstrap Servers Only. Whether to resolve then reverse-lookup broker IPs during startup.
    • This field is optional and defaults to False
  • KAFKA_CLIENT_ID string : Client ID. The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
    • This field is optional and defaults to sarama
  • KAFKA_TOPIC string : Topic. The name of the default kafka topic to export to (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs).
    • This field is optional
  • KAFKA_TOPIC_FROM_ATTRIBUTE string : Topic from Attribute. Specify the resource attribute whose value should be used as the message’s topic.
    • This field is optional
  • KAFKA_ENCODING string : Encoding. The encoding of the traces sent to kafka.
    • This field is optional and defaults to otlp_proto
  • KAFKA_PARTITION_TRACES_BY_ID boolean : Partition Traces by ID. Configures the exporter to include the trace ID as the message key in trace messages sent to kafka. Please note: this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
    • This field is optional and defaults to False
  • KAFKA_PARTITION_METRICS_BY_RESOURCE_ATTRIBUTES boolean : Partition Metrics by Resource Attributes. Configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
    • This field is optional and defaults to False
  • KAFKA_PARTITION_LOGS_BY_RESOURCE_ATTRIBUTES boolean : Partition Logs by Resource Attributes. Configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.
    • This field is optional and defaults to False
  • KAFKA_AUTH_METHOD string : Auth Method. The auth method to use.
    • This field is required and defaults to none
  • KAFKA_USERNAME string : Username. The username to use.
    • This field is optional
  • KAFKA_PASSWORD string : Password. The password to use.
    • This field is optional
  • KAFKA_METADATA_FULL boolean : Metadata Full. Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup.
    • This field is optional and defaults to False
  • KAFKA_METADATA_MAX_RETRY string : Metadata Max Retry. The number of retries to get metadata.
    • This field is optional and defaults to 3
  • KAFKA_METADATA_BACKOFF_RETRY string : Metadata Backoff Retry. How long to wait between metadata retries.
    • This field is optional and defaults to 250ms
  • KAFKA_TIMEOUT string : Timeout. Is the timeout for every attempt to send data to the backend.
    • This field is optional and defaults to 5s
  • KAFKA_RETRY_ON_FAILURE_ENABLED boolean : Enable Retry on Failure.
    • This field is optional and defaults to True
  • KAFKA_RETRY_ON_FAILURE_INITIAL_INTERVAL string : Initial Interval. Time to wait after the first failure before retrying; ignored if enabled is false.
    • This field is optional and defaults to 5s
  • KAFKA_RETRY_ON_FAILURE_MAX_INTERVAL string : Max Interval. Is the upper bound on backoff; ignored if enabled is false.
    • This field is optional and defaults to 30s
  • KAFKA_RETRY_ON_FAILURE_MAX_ELAPSED_TIME string : Max Elapsed Time. Is the maximum amount of time spent trying to send a batch; ignored if enabled is false.
    • This field is optional and defaults to 120s
  • KAFKA_PRODUCER_MAX_MESSAGE_BYTES string : Producer Max Message Bytes. The maximum permitted size of a message in bytes.
    • This field is optional and defaults to 1000000
  • KAFKA_PRODUCER_REQUIRED_ACKS string : Producer Required Acks. Controls when a message is regarded as transmitted.
    • This field is optional and defaults to 1
  • KAFKA_PRODUCER_COMPRESSION string : Producer Compression. The compression used when producing messages to kafka.
    • This field is optional and defaults to none
  • KAFKA_PRODUCER_FLUSH_MAX_MESSAGES string : Producer Flush Max Messages. The maximum number of messages the producer will send in a single broker request.
    • This field is optional and defaults to 0

The destination topic can be defined in a few different ways and takes priority in the following order:

  1. When topic_from_attribute is configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used.
  2. If a prior component in the collector pipeline sets the topic on the context via the topic.WithTopic function (from the github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic package), the value set in the context is used.
  3. Finally, the topic configuration is used as a default/fallback destination.

Adding Destination to Odigos

There are two primary methods for configuring destinations in Odigos:

Using the UI
1

Use the Odigos CLI to access the UI

odigos ui
2

Click on Add Destination, select Kafka and follow the on-screen instructions

Using Kubernetes manifests
1

Save the YAML below to a file (e.g. kafka.yaml)

apiVersion: odigos.io/v1alpha1
kind: Destination
metadata:
  name: kafka-example
  namespace: odigos-system
spec:
  data:
    KAFKA_AUTH_METHOD: '<Auth Method (default: none) (options: [none, plain_text])>'
    KAFKA_PROTOCOL_VERSION: <Protocol Version>
    # Note: The commented fields below are optional.
    # KAFKA_BROKERS: <Brokers (default: ["localhost:9092"])>
    # KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY: <Resolve Canonical Bootstrap Servers Only>
    # KAFKA_CLIENT_ID: <Client ID (default: sarama)>
    # KAFKA_TOPIC: <Topic>
    # KAFKA_TOPIC_FROM_ATTRIBUTE: <Topic from Attribute>
    # KAFKA_ENCODING: <Encoding (default: otlp_proto) (options: [otlp_proto, otlp_json])>
    # KAFKA_PARTITION_TRACES_BY_ID: <Partition Traces by ID>
    # KAFKA_PARTITION_METRICS_BY_RESOURCE_ATTRIBUTES: <Partition Metrics by Resource Attributes>
    # KAFKA_PARTITION_LOGS_BY_RESOURCE_ATTRIBUTES: <Partition Logs by Resource Attributes>
    # KAFKA_USERNAME: <Username>
    # KAFKA_METADATA_FULL: <Metadata Full>
    # KAFKA_METADATA_MAX_RETRY: <Metadata Max Retry (default: 3)>
    # KAFKA_METADATA_BACKOFF_RETRY: <Metadata Backoff Retry (default: 250ms)>
    # KAFKA_TIMEOUT: <Timeout (default: 5s)>
    # KAFKA_RETRY_ON_FAILURE_ENABLED: <Enable Retry on Failure (default: True)>
    # KAFKA_RETRY_ON_FAILURE_INITIAL_INTERVAL: <Initial Interval (default: 5s)>
    # KAFKA_RETRY_ON_FAILURE_MAX_INTERVAL: <Max Interval (default: 30s)>
    # KAFKA_RETRY_ON_FAILURE_MAX_ELAPSED_TIME: <Max Elapsed Time (default: 120s)>
    # KAFKA_PRODUCER_MAX_MESSAGE_BYTES: <Producer Max Message Bytes (default: 1000000)>
    # KAFKA_PRODUCER_REQUIRED_ACKS: <Producer Required Acks (default: 1)>
    # KAFKA_PRODUCER_COMPRESSION: <Producer Compression (default: none) (options: [none, gzip, snappy, lz4, zstd])>
    # KAFKA_PRODUCER_FLUSH_MAX_MESSAGES: <Producer Flush Max Messages (default: 0)>
  destinationName: kafka
  # Uncomment the 'secretRef' below if you are using the optional Secret.
  # secretRef:
  #   name: kafka-secret
  signals:
  - TRACES
  - METRICS
  - LOGS
  type: kafka

---

# The following Secret is optional. Uncomment the entire block if you need to use it.
# apiVersion: v1
# data:
#   KAFKA_PASSWORD: <Base64 Password>
# kind: Secret
# metadata:
#   name: kafka-secret
#   namespace: odigos-system
# type: Opaque
2

Apply the YAML using kubectl

kubectl apply -f kafka.yaml