https://segmentfault.com/a/1190000016595992
Logstash 参考指南(Kafka输入插件)
Kafka输入插件
- 插件版本:v8.1.1
- 发布于:2018-06-01
- 更新日志
其他版本,请参阅版本化的插件文档。
获取帮助
有关插件的问题,请在讨论论坛中打开一个主题,对于bug或特性请求,在Github中打开一个issue,关于Elastic支持的插件列表,请考虑Elastic支持矩阵。
描述
这个输入将读取来自Kafka主题的事件。
这个插件使用Kafka客户端1.1.0,有关broker兼容性,请参阅官方Kafka兼容性参考资料,如果链接的兼容wiki不是最新的,请联系Kafka支持/社区确认兼容性。
如果你需要这个插件中还没有提供的特性(包括客户端版本升级),请提交一个关于你需要什么细节的问题。
这个输入支持连接到Kafka:
- SSL(要求插件版本3.0.0或以上)
- Kerberos SASL(需要插件版本5.1.0或以上)
默认情况下,安全性是禁用的,但是可以根据需要打开。
Logstash Kafka消费者处理组管理并使用Kafka主题的默认偏移管理策略。
Logstash实例默认情况下以一个逻辑组的形式来订阅Kafka主题,每个Logstash的Kafka消费者可以运行多个线程来增加读吞吐量。或者,你可以使用相同的group_id
运行多个Logstash实例,以便在物理机器上分散负载,主题中的消息将分发给具有相同group_id
的所有Logstash实例。
理想情况下,为了达到完美的平衡,你应该拥有与分区数量一样多的线程,线程多于分区意味着有些线程将处于空闲状态。
有关更多信息,请参阅http://kafka.apache.org/documentation.html#theconsumer。
Kafka消费者配置:http://kafka.apache.org/document.html#consumerconfigs。
元数据字段
以下来自Kafka broker的元数据被添加到[@metadata]
字段下:
[@metadata][kafka][topic]
:消息被消费的原始kafka主题。[@metadata][kafka][consumer_group]
:消费者组。[@metadata][kafka][partition]
:此消息的分区信息。[@metadata][kafka][offset]
:此消息的原始记录偏移量。[@metadata][kafka][key]
:记录key,如果有的话。[@metadata][kafka][timestamp]
:Kafka broker接收此消息时的时间戳。
请注意,在输出时@metadata
字段不是任何事件的一部分,如果你需要将这些信息插入到原始事件中,你必须使用mutate
过滤器来手动将所需的字段复制到你的event
中。
Kafka输入配置选项
这个插件支持以下配置选项以及后面描述的通用选项。
设置 | 输入类型 | 要求 |
---|---|---|
auto_commit_interval_ms |
string | No |
auto_offset_reset |
string | No |
bootstrap_servers |
string | No |
check_crcs |
string | No |
client_id |
string | No |
connections_max_idle_ms |
string | No |
consumer_threads |
number | No |
decorate_events |
boolean | No |
enable_auto_commit |
string | No |
exclude_internal_topics |
string | No |
fetch_max_bytes |
string | No |
fetch_max_wait_ms |
string | No |
fetch_min_bytes |
string | No |
group_id |
string | No |
heartbeat_interval_ms |
string | No |
jaas_path |
有效的文件系统路径 | No |
kerberos_config |
有效的文件系统路径 | No |
key_deserializer_class |
string | No |
max_partition_fetch_bytes |
string | No |
max_poll_interval_ms |
string | No |
max_poll_records |
string | No |
metadata_max_age_ms |
string | No |
partition_assignment_strategy |
string | No |
poll_timeout_ms |
number | No |
receive_buffer_bytes |
string | No |
reconnect_backoff_ms |
string | No |
request_timeout_ms |
string | No |
retry_backoff_ms |
string | No |
sasl_kerberos_service_name |
string | No |
sasl_mechanism |
string | No |
security_protocol |
string,["PLAINTEXT" ,"SSL" ,"SASL_PLAINTEXT" ,"SASL_SSL" ]之一 |
No |
send_buffer_bytes |
string | No |
session_timeout_ms |
string | No |
ssl_key_password |
password | No |
ssl_keystore_location |
有效的文件系统路径 | No |
ssl_keystore_password |
password | No |
ssl_keystore_type |
string | No |
ssl_truststore_location |
有效的文件系统路径 | No |
ssl_truststore_password |
password | No |
ssl_truststore_type |
string | No |
topics |
array | No |
topics_pattern |
string | No |
value_deserializer_class |
string | No |
还可以查看所有输入插件支持的通用选项列表。
auto_commit_interval_ms
- 值类型为string
- 默认值为
“5000”
- 消费者偏移量提交给Kafka的频率(毫秒)
auto_offset_reset
- 值类型为string
- 此设置没有默认值
-
如果Kafka中没有初始偏移量,或者偏移量超出范围,该怎么办:
earliest
:自动重置偏移量到最早的偏移量latest
:自动重置偏移量到最新的偏移量none
:如果没有为消费者组找到先前的偏移量,则向消费者抛出异常- 其他:向消费者抛出异常
bootstrap_servers
- 值类型为string
- 默认值为
"localhost:9092"
- 用于建立到集群的初始连接的Kafka实例的url列表,这个列表应该是
host1:port1,host2:port2
的形式,这些url仅用于初始连接,以发现完整的集群成员(可能会动态更改),因此这个列表不需要包含完整的服务器集(不过,如果一个服务器宕机,你可能需要多个服务器)。
check_crcs
- 值类型为string
- 此设置没有默认值
- 自动检查被消费的记录的CRC32,这确保了在线路或磁盘上的消息没有发生损坏,这个检查增加了一些开销,因此在寻求极端性能的情况下可能会禁用它。
client_id
- 值类型为string
- 默认值为
"logstash"
- 请求时要传递给服务器的id字符串,这样做的目的是通过允许包含逻辑应用程序名称来跟踪ip/端口以外的请求源。
connections_max_idle_ms
- 值类型为string
- 此设置没有默认值
- 在这个配置指定的毫秒数之后关闭空闲连接。
consumer_threads
- 值类型为number
- 默认值为
1
- 理想情况下,为了达到完美的平衡,你应该拥有与分区数量一样多的线程,线程多于分区意味着有些线程将处于空闲状态。
decorate_events
- 值类型为boolean
- 默认值为
false
- 可向事件添加Kafka元数据,比如主题、消息大小的选项,这将向logstash事件中添加一个名为
kafka
的字段,其中包含以下属性:topic
:此消息关联的主题、consumer_group
:这个事件中用来读取的消费者组、partition
:此消息关联的分区、offset
:此消息关联的分区的偏移量、key
:包含消息key的ByteBuffer。
enable_auto_commit
- 值类型为string
- 默认值为
"true"
- 如果是
true
,消费者定期向Kafka提交已经返回的消息的偏移量,当进程失败时,将使用这个提交的偏移量作为消费开始的位置。
exclude_internal_topics
- 值类型为string
- 此设置没有默认值
- 内部主题(如偏移量)的记录是否应该公开给消费者,如果设置为
true
,从内部主题接收记录的唯一方法就是订阅它。
fetch_max_bytes
- 值类型为string
- 此设置没有默认值
- 提取请求时服务器应该返回的最大数据量,这不是绝对最大值,如果提取的第一个非空分区中的第一个消息大于此值,消息仍然会返回,以确保消费者能够进行下去。
fetch_max_wait_ms
- 值类型为string
- 此设置没有默认值
- 如果没有足够的数据立即满足
fetch_min_bytes
,服务器在响应提取请求之前将阻塞的最大时间,这应该小于或等于poll_timeout_ms
中使用的超时。
fetch_min_bytes
- 值类型为string
- 此设置没有默认值
- 提取请求时服务器应该返回的最小数据量,如果可用数据不足,请求将在响应请求前等待大量的数据积累。
group_id
- 值类型为string
- 默认值为
"logstash"
- 此消费者所属的组的标识符,消费者组是由多个处理器组成的单个逻辑订阅服务器,主题中的消息将分发给具有相同
group_id
的所有Logstash实例。
heartbeat_interval_ms
- 值类型为string
- 此设置没有默认值
- 从心跳到消费者协调器的预期时间,心跳被用来确保消费者会话保持活跃,并在新消费者加入或离开组时促进重新平衡,该值必须设置为低于
session.timeout.ms
,但通常应该设置不高于该值的1/3,它可以调整得更低,以控制正常重新平衡的预期时间。
jaas_path
- 值类型为path
- 此设置没有默认值
-
Java身份验证和授权服务(JAAS)API为Kafka提供用户身份验证和授权服务,这个设置提供了JAAS文件的路径,Kafka客服端的样例JAAS文件:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true renewTicket=true serviceName="kafka"; };
请注意,在配置文件中指定
jaas_path
和kerberos_config
将会添加到全局JVM系统属性中,这意味着如果你有多个Kafka输入,它们都共享相同的jaas_path
和kerberos_config
。如果不希望这样做,则必须在不同的JVM实例上运行Logstash的独立实例。
kerberos_config
- 值类型为path
- 此设置没有默认值
- kerberos配置文件路径的选项,这是
krb5.conf
样式,详见https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html。
key_deserializer_class
- 值类型为string
- 默认值为
"org.apache.kafka.common.serialization.StringDeserializer"
- 用于反序列化记录key的Java类。
max_partition_fetch_bytes
- 值类型为string
- 此设置没有默认值
- 服务器将返回每个分区的最大数据量,请求使用的最大总内存为
#partitions * max.partition.fetch.bytes
,这个大小必须至少是服务器允许的最大消息大小的最大值,否则可能生产者发送的消息比消费者能够提取的大,如果发生这种情况,消费者可能会陷入在某个分区上提取大量消息的困境。
max_poll_interval_ms
- 值类型为string
- 此设置没有默认值
- 当使用消费者组管理时,
poll()
调用之间的最大延迟,这为消费者在提取更多记录之前可以空闲的时间设置了上限,如果在超时过期之前没有调用poll()
,就认为消费者失败了并且组将重新平衡,以便将分区重新分配给另一个成员,配置request_timeout_ms
的值必须总是大于max_poll_interval_ms
。
max_poll_records
- 值类型为string
- 此设置没有默认值
- 在对
poll()
的单个调用中返回的记录的最大数量。
metadata_max_age_ms
- 值类型为string
- 此设置没有默认值
- 以毫秒为单位的时间周期后,即使我们没有看到任何分区领导更改,也会强制刷新元数据,以主动发现任何新的broker或分区。
partition_assignment_strategy
- 值类型为string
- 此设置没有默认值
- 客户端将使用分区分配策略的类名在消费者实例之间分配分区所有权。
poll_timeout_ms
- 值类型为number
- 默认值为
100
- kafka消费者将等待从主题接收新消息的时间。
receive_buffer_bytes
- 值类型为string
- 此设置没有默认值
- 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小
reconnect_backoff_ms
- 值类型为string
- 此设置没有默认值
- 在尝试重新连接到给定主机之前等待的时间量,这避免了在一个紧密循环中重复连接主机,此回退适用于消费者向broker发送的所有请求。
request_timeout_ms
- 值类型为string
- 此设置没有默认值
- 配置控制客户端等待请求响应的最长时间,如果超时之前没有收到响应,如有必要客户端将重新发送请求,或者在重试耗尽时失败请求。
retry_backoff_ms
- 值类型为string
- 此设置没有默认值
- 在试图重试失败的提取请求到给定主题分区之前等待的时间量,这避免了在一个紧凑的循环中重复的提取和失败。
sasl_kerberos_service_name
- 值类型为string
- 此设置没有默认值
- Kafka broker运行的Kerberos主体名称,这可以在Kafka的JAAS配置或Kafka的配置中定义。
sasl_mechanism
- 值类型为string
- 默认值为
"GSSAPI"
- 用于客户端连接的SASL机制,这可能是安全提供者可用的任何机制,GSSAPI是默认机制。
security_protocol
- 值可以是:
PLAINTEXT
、SSL
、SASL_PLAINTEXT
、SASL_SSL
- 默认值为
"PLAINTEXT"
- 要使用的安全协议,可以是
PLAINTEXT
、SSL
、SASL_PLAINTEXT
、SASL_SSL
。
send_buffer_bytes
- 值类型为string
- 此设置没有默认值
- 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小
session_timeout_ms
- 值类型为string
- 此设置没有默认值
- 超时之后,如果
poll_timeout_ms
没有被调用,消费者将被标记为死亡,并为group_id
标识的组触发重新平衡操作。
ssl_key_password
- 值类型为password
- 此设置没有默认值
- 密钥存储文件中私有密钥的密码。
ssl_keystore_location
- 值类型为path
- 此设置没有默认值
- 如果需要客户端身份验证,则此设置存储密钥存储路径。
ssl_keystore_password
- 值类型为password
- 此设置没有默认值
- 如果需要客户端身份验证,则此设置存储密钥库密码
ssl_keystore_type
- 值类型为string
- 此设置没有默认值
- 密钥存储库类型。
ssl_truststore_location
- 值类型为path
- 此设置没有默认值
- JKS信任存储库路径用于验证Kafka broker的证书。
ssl_truststore_password
- 值类型为password
- 此设置没有默认值
- 信任存储库的密码。
ssl_truststore_type
- 值类型为string
- 此设置没有默认值
- 信任存储库类型。
topics
- 值类型为array
- 默认值为
["logstash"]
- 要订阅的主题列表,默认为
["logstash"]
。
topics_pattern
- 值类型为string
- 此设置没有默认值
- 订阅的主题正则表达式模式,使用此配置时,主题配置将被忽略。
value_deserializer_class
- 值类型为string
- 默认值为
"org.apache.kafka.common.serialization.StringDeserializer"
- 用于反序列化记录值的Java类。
通用选项
所有输入插件都支持以下配置选项:
设置 | 输入类型 | 要求 |
---|---|---|
add_field |
hash | No |
codec |
codec | No |
enable_metric |
boolean | No |
id |
string | No |
tags |
array | No |
type |
string | No |
细节
add_field
- 值类型为hash
- 默认值为
{}
- 向事件添加字段。
codec
- 值类型为codec
- 默认值为
"plain"
- 用于输入数据的编解码器,在输入数据之前,输入编解码器是一种方便的解码方法,不需要在你的Logstash管道中使用单独的过滤器。
enable_metric
- 值类型是boolean
- 默认值是
true
- 禁用或启用这个特定插件实例的指标日志,默认情况下,我们记录所有我们可以记录的指标,但是你可以禁用特定插件的指标集合。
id
- 值类型为string
- 这个设置没有默认值
-
向插件配置添加唯一的
ID
,如果没有指定ID,则Logstash将生成一个,强烈建议在配置中设置此ID,当你有两个或多个相同类型的插件时,这一点特别有用。例如,如果你有两个log4j输入,在本例中添加一个命名ID将有助于在使用监视API时监视Logstash。input { kafka { id => "my_plugin_id" } }
tags
- 值类型为array
- 这个设置没有默认值
- 向事件添加任意数量的标记,这有助于以后的处理。
type
- 值类型为string
- 这个设置没有默认值
- 向该输入处理的所有事件添加
type
字段,类型主要用于过滤器激活,该type
作为事件本身的一部分存储,因此你也可以使用该类型在Kibana中搜索它。如果你试图在已经拥有一个type
的事件上设置一个type
(例如,当你将事件从发送者发送到索引器时),那么新的输入将不会覆盖现有的type
,发送方的type
集在其生命周期中始终与该事件保持一致,甚至在发送到另一个Logstash服务器时也是如此。
最关键的一点,可以使用topic的正则表达式,监听多个系统的kafka的topic
logstash版本为5.5.3,kafka版本为2.11,此版本默认内置了kafka插件,可直接配置使用,不需要重新安装插件;注意logstash5.x版本前后配置不太一样,注意甄别,必要时可去elasticsearch官网查看最新版配置参数的变化,例如logstash5.x版本以前kafka插件配置的是zookeeper地址,5.x以后配置的是kafka实例地址。
input{ kafka{ bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"] client_id => "test" group_id => "test" auto_offset_reset => "latest" //从最新的偏移量开始消费 consumer_threads => 5 decorate_events => true //此属性会将当前topic、offset、group、partition等信息也带到message中 topics => ["logq","loge"] //数组类型,可配置多个topic type => "bhy" //所有插件通用属性,尤其在input里面配置多个数据源时很有用 } }
另外一个input里面可设置多个kafka,
input{ kafka{ bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"] client_id => "test1" group_id => "test1" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true topics => ["loge"] type => "classroom" } kafka{ bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"] client_id => "test2" group_id => "test2" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true topics => ["logq"] type => "student" } }
假如你在filter模块中还要做其他过滤操作,并且针对input里面的每个数据源做得操作不一样,那你就可以根据各自定义的type来匹配
filter{ if[type] == "classroom"{ grok{ ........ } } if[type] == "student"{ mutate{ ........ } } }
不只filter中可以这样,output里面也可以这样;并且当output为elasticsearch的时候,input里面的定义的type将会成为elasticsearch的你定义的index下的type
output { if[type] == "classroom"{ elasticsearch{ hosts => ["192.168.110.31:9200"] index => "school" timeout => 300 user => "elastic" password => "changeme" } } if[type] == "student"{ ........ } }
对于第一个存储到elasticsearch的路径为localhost:9200/school/classroom;第二个存储到elasticsearch的路径为localhost:9200/school/student。假如从来没有定义过type,默认的type为logs,访问路径为第一个存储到elasticsearch的路径为localhost:9200/school/logs,默认的type也可不加
此外对于kafka也可以使用
[@metadata][kafka][topic]
链接:https://www.zhihu.com/question/276783606/answer/523437827
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
今天刚好搭建了一套filebeat=>kafka=>双路logstash=>双路es的数据流,不同的业务日志写到不同的topic,es索引名字是topic名字加年月日。logstash配置文件如下
input{
kafka{
bootstrap_servers => ["xxxxx:9092"]
topics_pattern => ".*"
auto_offset_reset => "earliest"
decorate_events => true
codec => json {
charset => "UTF-8"
}
}
}
output{
elasticsearch{
hosts => "xxxx:9200"
index => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
}
}
其中最重要的是input中decorate_events => true,这样output中%{[@metadata][kafka][topic]}才生效,不然要使用mutatefilter。官方文档如下
Metadata fields
The following metadata from Kafka broker are added under the [@metadata] field:
- [@metadata][kafka][topic]: Original Kafka topic from where the message was consumed.
- [@metadata][kafka][consumer_group]: Consumer group
- [@metadata][kafka][partition]: Partition info for this message.
- [@metadata][kafka][offset]: Original record offset for this message.
- [@metadata][kafka][key]: Record key, if any.
- [@metadata][kafka][timestamp]: Timestamp when this message was received by the Kafka broker.
Please note that @metadata fields are not part of any of your events at output time. If you need these information to be inserted into your original event, you’ll have to use the mutate filter to manually copy the required fields into your event.