Kafka
Kafka output plugin for Fluentd
Overview
More info at https://github.com/fluent/fluent-plugin-kafka
Example Deployment: Transport Nginx Access Logs into Kafka with Logging Operator
Example output configurations
spec:
kafka:
brokers: kafka-headless.kafka.svc.cluster.local:29092
default_topic: topic
sasl_over_ssl: false
format:
type: json
buffer:
tags: topic
timekey: 1m
timekey_wait: 30s
timekey_use_utc: true
Configuration
Kafka
Send your logs to Kafka
brokers (string, required)
The list of all seed brokers, with their host and port information.
Default: -
topic_key (string, optional)
Topic Key
Default: “topic”
partition_key (string, optional)
Partition
Default: “partition”
partition_key_key (string, optional)
Partition Key
Default: “partition_key”
message_key_key (string, optional)
Message Key
Default: “message_key”
client_id (string, optional)
Client ID
Default: “kafka”
default_topic (string, optional)
The name of default topic .
Default: nil
default_partition_key (string, optional)
The name of default partition key .
Default: nil
default_message_key (string, optional)
The name of default message key .
Default: nil
exclude_topic_key (bool, optional)
Exclude Topic key
Default: false
exclude_partion_key (bool, optional)
Exclude Partition key
Default: false
get_kafka_client_log (bool, optional)
Get Kafka Client log
Default: false
headers (map[string]string, optional)
Headers
Default: {}
headers_from_record (map[string]string, optional)
Headers from Record
Default: {}
use_default_for_unknown_topic (bool, optional)
Use default for unknown topics
Default: false
idempotent (bool, optional)
Idempotent
Default: false
sasl_over_ssl (bool, required)
SASL over SSL
Default: true
principal (string, optional)
Default: -
keytab (*secret.Secret, optional)
Default: -
username (*secret.Secret, optional)
Username when using PLAIN/SCRAM SASL authentication
Default: -
password (*secret.Secret, optional)
Password when using PLAIN/SCRAM SASL authentication
Default: -
scram_mechanism (string, optional)
If set, use SCRAM authentication with specified mechanism. When unset, default to PLAIN authentication
Default: -
max_send_retries (int, optional)
Number of times to retry sending of messages to a leader
Default: 1
required_acks (int, optional)
The number of acks required per request .
Default: -1
ack_timeout (int, optional)
How long the producer waits for acks. The unit is seconds
Default: nil => Uses default of ruby-kafka library
compression_codec (string, optional)
The codec the producer uses to compress messages . The available options are gzip and snappy.
Default: nil
kafka_agg_max_bytes (int, optional)
Maximum value of total message size to be included in one batch transmission. .
Default: 4096
kafka_agg_max_messages (int, optional)
Maximum number of messages to include in one batch transmission. .
Default: nil
discard_kafka_delivery_failed (bool, optional)
Discard the record where Kafka DeliveryFailed occurred
Default: false
ssl_ca_certs_from_system (*bool, optional)
System’s CA cert store
Default: false
ssl_ca_cert (*secret.Secret, optional)
CA certificate
Default: -
ssl_client_cert (*secret.Secret, optional)
Client certificate
Default: -
ssl_client_cert_chain (*secret.Secret, optional)
Client certificate chain
Default: -
ssl_client_cert_key (*secret.Secret, optional)
Client certificate key
Default: -
ssl_verify_hostname (*bool, optional)
Verify certificate hostname
Default: -
format (*Format, required)
Default: -
buffer (*Buffer, optional)
Default: -
slow_flush_log_threshold (string, optional)
The threshold for chunk flush performance check. Parameter type is float, not time, default: 20.0 (seconds) If chunk flush takes longer time than this threshold, fluentd logs warning message and increases metric fluentd_output_status_slow_flush_count.
Default: -