Kafka
Kafka output plugin for Fluentd
Overview
More info at https://github.com/fluent/fluent-plugin-kafka
Example Deployment: Transport Nginx Access Logs into Kafka with Logging Operator
Example output configurations
Configuration
Kafka
Send your logs to Kafka
brokers (string, required)
The list of all seed brokers, with their host and port information.
Default: -
topic_key (string, optional)
Topic Key
Default: “topic”
partition_key (string, optional)
Partition
Default: “partition”
partition_key_key (string, optional)
Partition Key
Default: “partition_key”
message_key_key (string, optional)
Message Key
Default: “message_key”
client_id (string, optional)
Client ID
Default: “kafka”
default_topic (string, optional)
The name of default topic .
Default: nil
default_partition_key (string, optional)
The name of default partition key .
Default: nil
default_message_key (string, optional)
The name of default message key .
Default: nil
exclude_topic_key (bool, optional)
Exclude Topic key
Default: false
exclude_partion_key (bool, optional)
Exclude Partition key
Default: false
get_kafka_client_log (bool, optional)
Get Kafka Client log
Default: false
headers (map[string]string, optional)
Headers
Default: {}
headers_from_record (map[string]string, optional)
Headers from Record
Default: {}
use_default_for_unknown_topic (bool, optional)
Use default for unknown topics
Default: false
idempotent (bool, optional)
Idempotent
Default: false
sasl_over_ssl (bool, required)
SASL over SSL
Default: true
principal (string, optional)
Default: -
keytab (*secret.Secret, optional)
Default: -
username (*secret.Secret, optional)
Username when using PLAIN/SCRAM SASL authentication
Default: -
password (*secret.Secret, optional)
Password when using PLAIN/SCRAM SASL authentication
Default: -
scram_mechanism (string, optional)
If set, use SCRAM authentication with specified mechanism. When unset, default to PLAIN authentication
Default: -
max_send_retries (int, optional)
Number of times to retry sending of messages to a leader
Default: 1
required_acks (int, optional)
The number of acks required per request .
Default: -1
ack_timeout (int, optional)
How long the producer waits for acks. The unit is seconds
Default: nil => Uses default of ruby-kafka library
compression_codec (string, optional)
The codec the producer uses to compress messages . The available options are gzip and snappy.
Default: nil
kafka_agg_max_bytes (int, optional)
Maximum value of total message size to be included in one batch transmission. .
Default: 4096
kafka_agg_max_messages (int, optional)
Maximum number of messages to include in one batch transmission. .
Default: nil
discard_kafka_delivery_failed (bool, optional)
Discard the record where Kafka DeliveryFailed occurred
Default: false
ssl_ca_certs_from_system (*bool, optional)
System’s CA cert store
Default: false
ssl_ca_cert (*secret.Secret, optional)
CA certificate
Default: -
ssl_client_cert (*secret.Secret, optional)
Client certificate
Default: -
ssl_client_cert_chain (*secret.Secret, optional)
Client certificate chain
Default: -
ssl_client_cert_key (*secret.Secret, optional)
Client certificate key
Default: -
ssl_verify_hostname (*bool, optional)
Verify certificate hostname
Default: -
format (*Format, required)
Default: -
buffer (*Buffer, optional)
Default: -
slow_flush_log_threshold (string, optional)
The threshold for chunk flush performance check. Parameter type is float, not time, default: 20.0 (seconds) If chunk flush takes longer time than this threshold, fluentd logs warning message and increases metric fluentd_output_status_slow_flush_count.
Default: -