Kafka

Kafka output plugin for Fluentd

Overview

For details, see https://github.com/fluent/fluent-plugin-kafka.

For an example deployment, see Transport Nginx Access Logs into Kafka with Logging Operator.

Example output configurations

spec:
  kafka:
    brokers: kafka-headless.kafka.svc.cluster.local:29092
    default_topic: topic
    sasl_over_ssl: false
    format:
      type: json
    buffer:
      tags: topic
      timekey: 1m
      timekey_wait: 30s
      timekey_use_utc: true

Configuration

Kafka

Send your logs to Kafka. Set use_rdkafka to true to use the rdkafka2 client, which offers higher performance than ruby-kafka.

ack_timeout (int, optional)

How long the producer waits for acks. The unit is seconds

Default: nil => Uses default of ruby-kafka library

brokers (string, required)

The list of all seed brokers, with their host and port information.

buffer (*Buffer, optional)

Buffer

client_id (string, optional)

Client ID

Default: “kafka”

compression_codec (string, optional)

The codec the producer uses to compress messages . The available options are gzip and snappy.

Default: nil

default_message_key (string, optional)

The name of default message key .

Default: nil

default_partition_key (string, optional)

The name of default partition key .

Default: nil

default_topic (string, optional)

The name of default topic .

Default: nil

discard_kafka_delivery_failed (bool, optional)

Discard the record where Kafka DeliveryFailed occurred

Default: false

exclude_partion_key (bool, optional)

Exclude Partition key

Default: false

exclude_topic_key (bool, optional)

Exclude Topic key

Default: false

format (*Format, required)

Format

get_kafka_client_log (bool, optional)

Get Kafka Client log

Default: false

headers (map[string]string, optional)

Headers

Default: {}

headers_from_record (map[string]string, optional)

Headers from Record

Default: {}

idempotent (bool, optional)

Idempotent

Default: false

kafka_agg_max_bytes (int, optional)

Maximum value of total message size to be included in one batch transmission. .

Default: 4096

kafka_agg_max_messages (int, optional)

Maximum number of messages to include in one batch transmission. .

Default: nil

keytab (*secret.Secret, optional)

max_send_retries (int, optional)

Number of times to retry sending of messages to a leader

Default: 1

message_key_key (string, optional)

Message Key

Default: “message_key”

partition_key (string, optional)

Partition

Default: “partition”

partition_key_key (string, optional)

Partition Key

Default: “partition_key”

password (*secret.Secret, optional)

Password when using PLAIN/SCRAM SASL authentication

principal (string, optional)

required_acks (int, optional)

The number of acks required per request .

Default: -1

ssl_ca_cert (*secret.Secret, optional)

CA certificate

ssl_ca_certs_from_system (*bool, optional)

System’s CA cert store

Default: false

ssl_client_cert (*secret.Secret, optional)

Client certificate

ssl_client_cert_chain (*secret.Secret, optional)

Client certificate chain

ssl_client_cert_key (*secret.Secret, optional)

Client certificate key

ssl_verify_hostname (*bool, optional)

Verify certificate hostname

sasl_over_ssl (bool, required)

SASL over SSL

Default: true

scram_mechanism (string, optional)

If set, use SCRAM authentication with specified mechanism. When unset, default to PLAIN authentication

slow_flush_log_threshold (string, optional)

The threshold for chunk flush performance check. Parameter type is float, not time, default: 20.0 (seconds) If chunk flush takes longer time than this threshold, Fluentd logs a warning message and increases the fluentd_output_status_slow_flush_count metric.

topic_key (string, optional)

Topic Key

Default: “topic”

use_default_for_unknown_topic (bool, optional)

Use default for unknown topics

Default: false

use_rdkafka (bool, optional)

Use rdkafka2 instead of the legacy kafka2 output plugin. This plugin requires fluentd image version v1.16-4.9-full or higher.

username (*secret.Secret, optional)

Username when using PLAIN/SCRAM SASL authentication