Kafka
Kafka output plugin for Fluentd
Overview
For details, see https://github.com/fluent/fluent-plugin-kafka.
For an example deployment, see Transport Nginx Access Logs into Kafka with Logging Operator.
Example output configurations
spec:
kafka:
brokers: kafka-headless.kafka.svc.cluster.local:29092
default_topic: topic
sasl_over_ssl: false
format:
type: json
buffer:
tags: topic
timekey: 1m
timekey_wait: 30s
timekey_use_utc: true
Configuration
Kafka
Send your logs to Kafka. Set use_rdkafka
to true
to use the rdkafka2 client, which offers higher performance than ruby-kafka.
ack_timeout (int, optional)
How long the producer waits for acks. The unit is seconds
Default: nil => Uses default of ruby-kafka library
brokers (string, required)
The list of all seed brokers, with their host and port information.
buffer (*Buffer, optional)
client_id (string, optional)
Client ID
Default: “kafka”
compression_codec (string, optional)
The codec the producer uses to compress messages . The available options are gzip and snappy.
Default: nil
default_message_key (string, optional)
The name of default message key .
Default: nil
default_partition_key (string, optional)
The name of default partition key .
Default: nil
default_topic (string, optional)
The name of default topic .
Default: nil
discard_kafka_delivery_failed (bool, optional)
Discard the record where Kafka DeliveryFailed occurred
Default: false
exclude_partion_key (bool, optional)
Exclude Partition key
Default: false
exclude_topic_key (bool, optional)
Exclude Topic key
Default: false
format (*Format, required)
get_kafka_client_log (bool, optional)
Get Kafka Client log
Default: false
headers (map[string]string, optional)
Headers
Default: {}
headers_from_record (map[string]string, optional)
Headers from Record
Default: {}
idempotent (bool, optional)
Idempotent
Default: false
kafka_agg_max_bytes (int, optional)
Maximum value of total message size to be included in one batch transmission. .
Default: 4096
kafka_agg_max_messages (int, optional)
Maximum number of messages to include in one batch transmission. .
Default: nil
keytab (*secret.Secret, optional)
max_send_retries (int, optional)
Number of times to retry sending of messages to a leader
Default: 1
message_key_key (string, optional)
Message Key
Default: “message_key”
partition_key (string, optional)
Partition
Default: “partition”
partition_key_key (string, optional)
Partition Key
Default: “partition_key”
password (*secret.Secret, optional)
Password when using PLAIN/SCRAM SASL authentication
principal (string, optional)
required_acks (int, optional)
The number of acks required per request .
Default: -1
ssl_ca_cert (*secret.Secret, optional)
CA certificate
ssl_ca_certs_from_system (*bool, optional)
System’s CA cert store
Default: false
ssl_client_cert (*secret.Secret, optional)
Client certificate
ssl_client_cert_chain (*secret.Secret, optional)
Client certificate chain
ssl_client_cert_key (*secret.Secret, optional)
Client certificate key
ssl_verify_hostname (*bool, optional)
Verify certificate hostname
sasl_over_ssl (bool, required)
SASL over SSL
Default: true
scram_mechanism (string, optional)
If set, use SCRAM authentication with specified mechanism. When unset, default to PLAIN authentication
slow_flush_log_threshold (string, optional)
The threshold for chunk flush performance check. Parameter type is float, not time, default: 20.0 (seconds) If chunk flush takes longer time than this threshold, Fluentd logs a warning message and increases the fluentd_output_status_slow_flush_count
metric.
topic_key (string, optional)
Topic Key
Default: “topic”
use_default_for_unknown_topic (bool, optional)
Use default for unknown topics
Default: false
use_rdkafka (bool, optional)
Use rdkafka2 instead of the legacy kafka2 output plugin. This plugin requires fluentd image version v1.16-4.9-full or higher.
username (*secret.Secret, optional)
Username when using PLAIN/SCRAM SASL authentication