zoukankan      html  css  js  c++  java
  • ELK——elasticsearch、logstash、kibana、filebeat、kafka介绍与安装配置

    elasticsearch

    elasticsearch介绍

    分布式实时数据快速存取

    分片
    a.水平分割扩展,增大存储量
    b.分布式并行跨分片操作,提高性能和吞吐量
    副本
    a.高可用性,以应对分片或者节点故障
    b.提高性能,增大吞吐量,并行跨副本执行搜索(分片副本要在不同的节点上)

    elasticsearch集群安装配置

    1.主机名/etc/hosts
    2.配置免密互信
    3.处理防火墙、selinux,时间同步
    4.安装常用软件 yum install -y net-tools bash-completion
    5.系统优化

    [root@zookeeper01 ~]# vim /etc/sysctl.conf
    fs.file-max = 65536
    vm.max_map_count = 262144
    [root@zookeeper01 ~]# sysctl -p
    [root@zookeeper01 ~]# vim /etc/security/limits.conf
    * soft nofile 65535
    * hard nofile 131072
    * soft nproc 2048
    * hard nproc 4096
    

    6.配置jdk

    [root@zookeeper01 ~]# mkdir -p /data/program/software
    [root@zookeeper01 ~]# tar xf jdk-8u842-linux-x64.tar.gz -C /data/program/software
    [root@zookeeper01 ~]# cd /data/program/software
    [root@zookeeper01 software]# mv jdk1.8.0_242/ java8
    #配置jdk全局变量
    [root@zookeeper01 software]# vim /etc/profile
    export JAVA_HOME=/data/program/software/java8
    export JRE_HOME=/data/program/software/java8/jre
    export CLASSPATH=.:$CLASSPATH:$JAVA_HOME/lib:$JRE_HOME/1ib
    export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
    [root@zookeeper01 software]# source /etc/profile
    [root@zookeeper01 software]# java -version
    

    7.集群部署配置

    [root@zookeeper01 ~]# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.2.0.tar.gz
    [root@zookeeper01 ~]# tar elasticsearch-5.2.0.tar.gz -C /data/program/software
    [root@zookeeper01 ~]# cd /data/program/software/
    [root@zookeeper01 software]# mv elasticsearch-5.2.0 elasticsearch
    #配置elasticsearch.yml
    [root@zookeeper01 software]# vim elasticsearch/config/elasticsearch.yml
    cluster.name: my-cluster	#集群名称,默认为elasticsearch
    node.name: ${HOSTNAME}	#节点名称,默认为uuid前7个字符
    #生产环境下应与软件分离
    path.data: /data/program/software/elasticsearch/elk_data	#数据目录
    path.logs: /data/program/software/elasticsearch/elk_logs	#日志目录
    bootstrap.memory_lock: false
    network.host: 192.168.213.128	#ip绑定
    http.port: 9200	#对外服务的http端口
    #Discovery Config节点发现配置,ES中默认采用的节点发现方式是zen(基于组播(多播)、单播),在应用于生产前有两个重要参数需配
    discovery.zen.ping.unicast.hosts: ["192.168.213.135", "192.168.213.136"]	#具有master资格的节点列表,不能写本机
    discovery.zen.minmum_master_nodes: 1
    #配置jvm.options
    [root@zookeeper01 software]# vim elasticsearch/config/jvm.options
    -Xms512m
    -Xmx512m
    -XX:HeapDumpPath=/data/program/software/elasticsearch
    
    参数 含义
    discovery.zen.ping.unicast.hosts 单播模式下,具有master资格的节点列表,新加入的节点向这个列表中的节点发送请求来加入集群
    discovery.zen.minimum_master_nodes 具有master资格的节点的最小数量,官方的推荐值是(N/2)+1,N是具有master资格的节点的数量

    配置jvm.options
    (1)Jvm heap 大小设置
    生产环境中一定要在jvm.options中调大它的jvm内存
    (2)VM heap dump path 设置
    生产环境中指定当发生OOM异常时,heap的dump path,以分析问题
    -XX: HeapDumpPath=data/program/software/elasticsearch

    8.启动集群

    [root@zookeeper01 software]# useradd elsearch
    [root@zookeeper01 software]# chown -R elsearch:elsearch elasticsearch
    
    [root@zookeeper01 elasticsearch]# su - elsearch
    [elsearch@zookeeper01 ~]$ cd /data/program/software/elasticsearch/
    [elsearch@zookeeper01 elasticsearch]$ nohup bin/elasticsearch -d >>/tmp/elasticsearch.nohup &
    [elsearch@zookeeper01 ~]$ netstat -tunlp|grep 9200
    

    复制到其他节点后,一定要记得删除elk_data好elk_logs下的数据!

    检查ES是否安装成功

    (1)查看端口

    [root@zookeeper01 ~]# netstat -tunlp|grep 9200
    tcp6       0      0 192.168.213.128:9200    :::*                    LISTEN      1578/java
    

    (2)浏览器测试
    http://192.168.213.128:9200
    在这里插入图片描述查看集群信息 http://192.168.213.136:9200/_cat/nodes?pretty
    *号表示该节点为主节点
    在这里插入图片描述因为elasticsearch提供了标准的http接口,所以可以使用curl方便的访问elasticsearch

    #获取所有可以查看的信息
    [root@zookeeper01 ~]# curl -XGET 'http://192.168.213.128:9200/_cat?pretty'
    

    在这里插入图片描述

    配置head插件

    使用head插件显示索引和分片情况

    (1)在5.0版本中不支持直接安装head插件,需要启动一个服务,由于head插件本质上还是一个nodejs的工程,因此需要安装node,使用npm来安装依赖的包
    (2)NPM的全名叫Node Package Manager,是随NodeJS一起安装的包管理和分发工具,方便让JavaScript开发者下载、安装、上传以及管理已经安装的包
    (3)grunt是一个方便的构建工具,可以进行打包压缩、测试、执行等等的工作,5.x 里的head插件就是通过grunt启动的
    (4)-g参数代表全局安装,一般安装到nodejs的lib/node_ modules目录下;不带参数-g是本地安装,一般安装到运行npm命令时所在的目录,本次需要安装到head插件目录

    (1)配置head插件

    [root@zookeeper01 ~]# wget https://nodejs.org/dist/v7.5.0/node-v7.5.0-linux-x64.tar.xz
    [root@zookeeper01 ~]# tar xf node-v7.5.0-linux-x64.tar.xz -C /data/program/software
    [root@zookeeper01 ~]# vim /etc/profile.d/node.sh
    #!/bin/bash
    NODE_HOME=/data/program/software/node-v7.5.0-linux-x64
    export PATH=$PATH:$NODE_HOME/bin
    [root@zookeeper01 ~]# source /etc/profile.d/node.sh
    [root@zookeeper01 ~]# node -v
    v7.5.0
    [root@zookeeper01 ~]# npm -v
    4.1.2
    #切到国内的镜像
    [root@zookeeper01 ~]# npm config set registry https://registry.npm.taobao.org
    #使用cnpm代替默认的npm【时间漫长】
    [root@zookeeper01 ~]# npm install -g cnpm --registry=https://registry.npm.taobao.org
    #安装grunt
    [root@zookeeper01 ~]# cnpm install -g grunt-cli
    [root@zookeeper01 elasticsearch]# git clone https://github.com/mobz/elasticsearch-head.git
    [root@zookeeper01 elasticsearch]# chown -R elsearch:elsearch elasticsearch-head
    #elasticsearch开启跨域访问
    [root@zookeeper01 elasticsearch]# vim config/elasticsearch.yml
    #action.destructive_requires_name: true
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    [root@zookeeper01 elasticsearch-head]# cnpm install phantomjs-prebuilt@2.1.14 --ignore-scripts
    [root@zookeeper01 elasticsearch-head]# cnpm install
    

    (2)修改服务器监听地址

    [root@zookeeper01 ~]# cd /data/program/software/elasticsearch/elasticsearch-head/
    [root@zookeeper01 elasticsearch-head]# cp Gruntfile.js{,.bak}
    [root@zookeeper01 elasticsearch-head]# vim Gruntfile.js
    connect: {
                server: {
                    options: {
                        port: 9100,
                        hostname: '*',#插入本行,确保能被访问【第98行】
                        base: '.',
                        keepalive: true
                    }
                }
            }
    

    (3)修改源码

    [root@zookeeper01 elasticsearch-head]# vim _site/app.js
    this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://192.168.213.128:9200";	#把localhost修改为es服务器地址,否则head插件不能获取节点状态信息【4374行】
    

    (4)启动验证
    首先启动ES
    启动nodejs grunt server & 在head目录下执行
    验证端口

    [root@zookeeper01 elsearch]# netstat -tunlp|grep 9100
    tcp6       0      0 :::9100                 :::*                    LISTEN      2946/grunt
    

    (5)集群节点状态信息查看
    节点状态信息 curl -XGET 'http://192.168.213.128:9200/?pretty'
    在这里插入图片描述
    节点数信息 curl -XGET 'http://192.168.213.128:9200/_cat/nodes?v'
    在这里插入图片描述查看当前索引及分片情况 curl -XGET 'http://192.168.213.128:9200/_cluster/state?pretty'
    查看集群健康状态信息 curl -XGET 'http://192.168.213.128:9200/_cluster/health?pretty'
    在这里插入图片描述(6)浏览器访问:http://192.168.213.128:9100/
    在这里插入图片描述

    logstash

    logstash介绍

    收集分析日志
    四大组件、两类主机

    logstash的四大组件 作用
    Shipper 发送事件(events) 至LogStash;通常,远程代理端(agent) 只需要运行这个组件即可;
    Broker and Indexer 接收并索引化事件;
    Search and Storage 允许对事件进行搜索和存储;
    Web Interface 基于Web的展示界面正是由于以上组件在LogStash架构中可独立部署,才提供了更好的集群扩展性。
    logstash主机分类
    代理主机(agent host) :将各种日志数据发送至中心主机
    中心主机(central host):接收、处理和存储日志数据

    logstash安装配置

    [root@zookeeper01 ~]# tar xf logstash-5.2.0.tar.gz -C /data/program/software/
    [root@zookeeper01 ~]# cd /data/program/software/
    [root@zookeeper01 software]# mv logstash-5.2.0 logstash
    [root@zookeeper01 software]# grep -v "^#" logstash/config/logstash.yml
    path.data: /data/program/software/logstash
    path.config: /data/program/software/logstash/conf.d	#pipeline实例文件位置
    # http.host: "127.0.0.1"
    #指定数据输入主机
    # http.port: 9600-9700
    #指定数据输入端口,默认为9600~9700 (每实例占用1个)
    [root@zookeeper01 logstash]# mkdir conf.d
    [root@zookeeper01 bin]# ./logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
    [root@zookeeper01 logstash]#  bin/logstash -e 'input{stdin{} } output { elasticsearch { hosts => ["192.168.213.136:9200"] }}'
    

    在这里插入图片描述在这里插入图片描述

    [root@zookeeper01 logstash]# cd conf.d
    [root@zookeeper01 conf.d]# vim test_messages.conf
    input {
        file {
            path => '/var/log/messages.1og'
        }
    output {
        elasticsearch {
            hosts => ["192.168.213.135:9200","192.168.213.136:9200"]
            index => "messages-%{+YYYY.MM.dd}"
        }
        stdout {
            codec => rubydebug
        }
    }
    

    logstash语法

    (1) input

    [root@zookeeper01 logstash]# cat conf.d/test_stdin.conf
    input {
        stdin {
            add_field => {"key" => "value"}
            codec => "plain"
            tags =>["add"]
            type => "std"
        }
    }
    output{
        stdout{
            codec=>rubydebug
        }
    }
    
    [root@zookeeper01 logstash]# cat conf.d/test_filter_date.conf
    input {
        file {
            path => ["/var/log/*.log","/var/log/messages"]
            type => "system"
            start_position => "beginning"
        }
    }
    
    filter {
        grok {
            match => ["message","%{HTTPDATE:logdate}"]
        }
        date {
            match => ["logdate","dd/MMM/yyyy:HH:mm:ss Z"]
        }
    }
    
    output{
        stdout{
            codec=>rubydebug
        }
    }
    
    [root@zookeeper01 logstash]# cat conf.d/test_filter_geoip.conf
    input {
        stdin {
            add_field => {"key" => "value"}
            codec => "plain"
            tags => ["add"]
            type => "std"
        }
    }
    filter {
        geoip {
            source => "message"
        }
    }
    output{
        stdout{
            codec=> rubydebug
        }
    }
    

    在这里插入图片描述

    测试logstash配置文件是否出错

    使用命令bin/logstash -t -f /etc/logstash/conf.d/test.conf测试配置文件是否有问题
    如果输出Configuration OK则说明配置信息无误

    踩坑

    [ERROR][logstash.outputs.elasticsearch] Failed to install template. {:mes"Template file '' could not be found!"...}
    原因: logstash与elasticsearch版本不匹配;也可能是logstash解析日志没有输出结果(我遇到的是前者)

    kibana

    分析与可视化平台

    kibana安装配置

    (1)安装JDK环境
    (2)安装kibana

    [root@zookeeper01 ~]# tar xf ELK/kibana-5.2.0-linux-x86_64.tar.gz -C /data/program/software/
    [root@zookeeper01 ~]# cd /data/program/software/
    [root@zookeeper01 software]# mv kibana-5.2.0-linux-x86_64 kibana
    #修改配置文件
    [root@zookeeper01 kibana]# grep -v "^#" kibana/config/kibana.yml
    server.port: 5601
    server.host: "0.0.0.0"
    elasticsearch.hosts: ["http://192.168.213.128:9200"]
    kibana.index: ".kibana"
    

    (3)启动文件

    [root@zookeeper01 ~]# cd /data/program/software/kibana
    [root@zookeeper01 kibana]# bin/kibana &
    [root@zookeeper01 kibana]# nohup bin/kibana >>/tmp/kibana.nohup &
    [root@zookeeper01 kibana]# netstat -tunlp|grep 5601
    tcp        0      0 0.0.0.0:5601            0.0.0.0:*               LISTEN      3620/bin/../node/bi
    

    使用浏览器打开 http://192.168.213.128:5601
    在这里插入图片描述

    ELK实例

    实例一:收集环境日志

    (1) 收集系统日志

    [root@zookeeper01 conf.d]# cat test_system.conf
    input {
        file {
            path => "/var/log/messages"
            type => "system"
            start_position => "beginning"
            sincedb_path => "/data/program/software/logstash/data/sincedb"
        }
    }
    output {
        elasticsearch {
            hosts => ["192.168.213.128:9200","192.168.213.128:9200"]
            index => "message-%{+YYYY.MM.dd}"
        }
    }
    

    (2)收集elasticsearch的error日志
    使用if判断,两种日志分别写到不同索引中,type不可以和日志格式的任何一个域(可以理解为字段)的名称重复

    [root@zookeeper01 conf.d]# cat test_system1.conf
    input {
        file {
            path => "/var/log/messages"
            type => "system"
            start_position => "beginning"
            sincedb_path => "/data/program/software/logstash/data/sincedb"
        }
        file {
            path => "/data/program/software/elasticsearch/elk_logs/*.log"
            type => "es-error"
            start_position => "beginning"
        }
    }
    output {
        if [type] == "system" {
            elasticsearch {
                hosts => ["192.168.213.135:9200","192.168.213.136:9200"]
                index => "message-%{+YYYY.MM.dd}"
            }
        }
        if [type] == "es-error" {
            elasticsearch {
                hosts => ["192.168.213.128:9200","192.168.213.135:9200"]
                    index => "es-error-%{+YYYY.MM.dd}"
            }
        }
    }
    

    实例二:收集nginx访问日志

    在这里使用codec的json插件将日志的域进行分段,使用key-value的方式, 使日志格式更清晰,易于搜索,还可以降低cpu的负载

    JSON: Javascript object Notati on(JavaScript对象表示法)
    JSON是存储和交换文本信息的语法,类似XML
    JSON比XML更小、更快,更易解析

    (1)更改nginx的配置文件的日志格式,使用json

    [root@zookeeper01 ~]# vim /etc/nginx/nginx.conf
    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
        #                  '$status $body_bytes_sent "$http_referer" '
        #                  '"$http_user_agent" "$http_x_forwarded_for"';
        log_format json '{"@timestamp":"$time_iso8601",'
                        '"host":"$server_addr",'
                        '"clientip":"$remote_addr",'
                        '"size":$body_bytes_sent,'
                        '"responsetime":$request_time,'
                        '"upstreamtime":"$upstream_response_time",'
                        '"upstreamhost":"$upstream_addr",'
                        '"http_host":"$host",'
                        '"url":"$uri",'
                        '"xff":"$http_x_forwarded_for",'
                        '"referer":"$http_referer",'
                        '"agent":"$http_user_agent",'
                        '"status":"$status"}';
        access_log /var/log/nginx/access.log json;
        #access_log  /var/log/nginx/access.log  main;
    

    (2)启动nginx

    [root@zookeeper01 ~]# nginx -t
    nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
    nginx: configuration file /etc/nginx/nginx.conf test is successful
    [root@zookeeper01 ~]# systemctl start nginx
    

    (3)logstash抓取nginx日志

    [root@zookeeper01 ~]# cd /data/program/software/logstash/
    [root@zookeeper01 logstash]# vim conf.d/test_nginx.conf
    input {
        file {
            path => "/var/log/nginx/access.log"
            type => "nginx-log"
            start_position => "beginning"
            codec => "json"
        }
    }
    output {
        if [type] == "nginx-log" {
            elasticsearch {
                hosts => ["192.168.213.135:9200","192.168.213.136:9200"]
                index => "nginx-log-%{+YYYY.MM.dd}"
            }
        }
    }
    

    实例三:收集系统syslog日志

    实际生产环境使用syslog插件直接收集系统日志
    (1)修改syslog的配置文件,把日志信息发送到514端口上

    [root@zookeeper01 ~]# vim /etc/rsyslog.conf
    #*.* @@remote-host:514
    *.* @@192.168.213.128:514
    # ### end of the forwarding rule ###
    

    (2)重启rsyslog

    [root@zookeeper01 ~]# systemctl restart rsyslog
    

    (3)logstash抓取rsyslog日志

    [root@zookeeper01 logstash]# vim conf.d/test_syslog.conf
    input {
        syslog {
            type => "system-log"
            host => "192.168.213.128"
            port => "514"
        }
    }
    
    output {
        if [type] == "system-log" {
            elasticsearch {
            hosts => ["192.168.213.135:9200","192.168.213.136:9200"]
            index => "system-log-%{+YYYY.MM.dd}"
            }
        }
    }
    

    #/etc/rc.d/rc.local
    #chmod +x /etc/rc.d/rc.local
    if [ -f /data/program/software/elasticsearch/bin/elasticsearch]
    then
    `su -elsearch -c "data/program/software/elasticsearch/bin/elasticsearch -d"`
    fi
    

    filebeat

    filebeat介绍

    Filebeat是一个日志文件托运工具, 在服务器上安装客户端后,filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读), 并且转发这些信息到elasticsearch或者logstarsh中存放

    filebeat的工作流程
    当开启filebeat程序的时候,它会启动一个或多个探测器(prospectors)去检测指定的日志目录或文件,对于探测器找出的每一个日志文件,filebeat都会启动一个收割进程(harvester)读取日志文件的新内容,并发送这些新的日志数据到处理程序(spooler),处理程序会集合这些事件,最后filebeat会发送集合的数据到指定的地点

    Harvester (收割机) 负责读取单个文件内容
    Prospector (勘测者) 负责管理Harvester并找到所有读取源

    Prospector会检查每个文件,看Harvester是否已经启动,是否需要启动,或者文件是否可以忽略
    若Harvester关闭,只有在文件大小发生变化的时候Prospector才会执行检查
    Prospector只能检测本地的文件

    Filebeat如何记录文件状态?
    Filebeat将文件状态记录在文件中(默认在/var/lib/filebeat/registry)。此状态可以记住Harvester收集文件的偏移量。若连接不上输出设备,如ES等,filebeat会记录发送前的最后一行,并在可以连接的时候继续发送。Filebeat在运行的时候,Prospector状态会被记录在内存中。Filebeat重启的时候,利用registry记录的状态来进行重建,用来还原到重启之前的状态。每个Prospector会为每个找到的文件记录一个状态,对于每个文件,Filebeat存储唯一标识符以检测文件是否先前被收集。
    Filebeat如何保证事件至少被输出一次?
    因为Filebeat将每个事件的传递状态保存在文件中, 在未得到输出方确认时,会尝试一直发送,直到得到回应。若在传输过程中被关闭, 则不会在关闭之前确认事件。任何在关闭之前未确认的事件,都会在重启之后重新发送。这可确保至少发送一次,但有可能会重复。可通过设置参数来设置关闭之前的等待事件回应的时间(默认禁用)。

    在这里插入图片描述

    安装并配置filebeat

    (1)配置JDK
    (2)安装filebeat

    [root@zookeeper01 ~]# yum install glibc.i686 -y
    [root@zookeeper01 ~]# tar xf filebeat-6.6.1-linux-x86_64.tar.gz -C /data/program/software
    [root@zookeeper01 ~]# cd /data/program/software
    [root@zookeeper01 software]# mv filebeat-6.6.1-linux-x86_64 filebeat
    

    (3)修改配置文件

    #=========================== Filebeat inputs =============================
    filebeat.inputs:
    - type: log
      enabled: yes
      paths:
        - /var/log/nginx/*.log
    #----------------------------- Logstash output --------------------------------
    output.logstash:
      hosts: ["localhost:5044"]
    #logstash需要开启input beta插件,启动监听5044端口
    

    filebeat默认输出到elasticsearch
    (4)启动

    [root@localhost filebeat]# ./filebeat -e -c ./filebeat.yml
    

    kafka

    kafka介绍

    分布式实时数据流平台
    可独立部署在单台服务器上,也可以部署在多台服务器上构成集群,提供发布与订阅功能,用户可以发送数据到Kafka集群上,也可以从Kafka集群中读取数据

    生产者服务写入消息数据;消费者负责读取消息数据
    kafka是一个分布式系统, 用zookeeper来管理,协调kafka集群的各个代理(Broker) 节点,当Kafka集群中新添加一个代理节点,或者某台节点出现故障,zookeeper服务将会通知生产者和消费者应用程序去其他的正常代理节点读写

    kafka安装配置

    (1)安装配置

    [root@zookeeper01 ~]# wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka2.2.1/kafka_2.13-2.4.1.tgz
    [root@zookeeper01 ~]# tar xf kafka_2.13-2.4.1.tgz -C /data/program/software/
    [root@zookeeper01 ~]# cd /data/program/software/
    [root@zookeeper01 software]# mv kafka_2.13-2.4.1/ kafka
    #建立logs目录
    [root@zookeeper01 software]# mkdir kafka/logs
    #配置config文件
    [root@zookeeper01 software]# vim kafka/config/server.properties
    log.dirs=/var/kafka
    zookeeper.connect=zookeeper01:2181,zookeeper02:2181,zookeeper03:2181
    

    (2)启动kafka

    [root@zookeeper01 kafka]# nohup bin/kafka-server-start.sh config/server.properties >>/tmp/kafka.nohup &
    [root@zookeeper01 kafka]# netstat -tunlp|egrep "(2181|9092)"
    tcp6       0      0 192.168.213.128:9092    :::*                    LISTEN      1809/java
    tcp6       0      0 :::2181                 :::*                    LISTEN      1744/java
    

    说明:Kafka的进程ID为1809,占用端口为9092,QuorumPeerMain为对应的zookeeper实例,基础ID为1744,在2181端口监听

    单机连通性测试

    (1)创建topic进行测试

    [root@zookeeper01 kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hehe
    Created topic hehe.
    

    (2)查看topic列表

    [root@zookeeper01 kafka]# bin/kafka-topics.sh --list --zookeeper localhost:2181
    hehe
    

    (3)生产者消息测试

    [root@zookeeper01 kafka]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hehe
    >hello
    

    (4)消费者消息测试

    [root@zookeeper01 kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hehe --from-beginning
    hello
    

    kafka集群搭建

    kafak集群依赖zookeeper,在kafka程序包中内置了zookeeper(建议使用),本次实验使用外置zookeeper

    (1)zookeeper配置

    [root@zookeeper01 software]# vim zookeeper/conf/zoo.cfg
    broker.id=1	#集群中的第几个节点
    listeners=PLAINTEXT://zookeeper01:9092
    log.dirs=/var/kafka
    zookeeper.connect=zookeeper01:2181,zookeeper02:2181,zookeeper03:2181
    

    (2)kafka配置

    broker.id=1	#集群中的第几个节点
    listeners=PLAINTEXT://zookeeper01:9092
    zookeeper.connect=zookeeper01:2181,zookeeper02:2181,zookeeper03:2181
    

    (3)将zookeeper01上的kafka配置整体拷贝到其余的两个节点上

    [root@zookeeper01 software]# scp -pr kafka root@192.168.213.136:/data/program/software
    [root@zookeeper01 software]# scp -pr kafka root@192.168.213.135:/data/program/software
    

    (4)在其他两节点上修改broker.id的值与listeners,将logs目录下的内容清空

    [root@zookeeper02 software]# rm -rf kafka/logs/*
    [root@zookeeper03 software]# rm -rf kafka/logs/*
    

    集群测试

    [root@zookeeper01 kafka]# ../zookeeper/bin/zkServer.sh start
    [root@zookeeper01 kafka]# ../zookeeper/bin/zkServer.sh status
    [root@zookeeper01 kafka]# bin/kafka-server-start.sh config/server.properties &
    [root@zookeeper01 kafka]# netstat -tunlp|egrep "(2181|9092)"
    

    创建topic

    [root@zookeeper01 kafka]# bin/kafka-topics.sh --create --zookeeper zookeeper01,zookeeper02,zookeeper03 --replication-factor 1 --partitions 2 --topic nebula01
    Created topic nebula01.
    

    broke1输入消息、broke2和broke3同时收到消息

    [root@zookeeper01 kafka]# bin/kafka-console-producer.sh --broker-list zookeeper01:9092 --topic nebula01
    [root@zookeeper03 kafka]# bin/kafka-console-consumer.sh --bootstrap-server zookeeper03:9092 --topic nebula01 --from-beginning
    [root@zookeeper02 kafka]# bin/kafka-console-consumer.sh --bootstrap-server zookeeper02:9092 --topic nebula01 --from-beginning
    

    Kafka常用命令

    #启动kafka
    nohup bin/kafka-server-start.sh config/server.properties >> /tmp/kafka.nohup 2>&1 &
    #查看topic
    bin/kafka-topics.sh --list --zookeeper localhost:2181
    #控制台消费
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic middleware --from-beginning
    #创建生产者
    bin/kafka-conso1e- producer.sh --broker-list localhost:9092 --topic test
    
    #测试输入
    [root@zookeeper01 logstash]# bin/logstash -f conf.d/test_kafka.conf
    [root@zookeeper01 kafka]# bin/kafka-console-producer.sh --broker-list 192.168.213.128:9092 --topic test_logstash
    #测试输出
    [root@zookeeper01 logstash]# bin/logstash -f conf.d/test_kafka_output.conf
    [root@zookeeper02 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.213.136:9092 --topic test_logstash --from-beginning
    

    logstash解耦之消息队列

    logstash扩展概述

    在生产环境中,从后台应用服务器运行logstash进程并将数据直接发送到elasticsearch里,不是第一选择
    第一,过多的客户端链接对elasticseach是一种另外的压力;
    第二,网络抖动会影响到Logstash进程,进而影响生产环境;
    第三,运维人员未必愿意在生产服务器上部署java,或者让logstash跟业务代码争夺java资源。

    在实际运用中,Logstash进程会被分为两个不同的角色:运行在应用服务器上,减轻运行压力,只做读取和转发的shipper;运行在独立服务器上,完成数据解析处理,负责写入elasticsearch的indexer

    通过kafka传输:kafka利用磁盘做队列,无所谓消息缓冲时的磁盘问题;如果公司内部已有Kafka服务在运行,logstash可以快速接入,免去重复建设的麻烦

    插件配置

    (1)input配置示例

    [root@zookeeper01 logstash]# vim conf.d/test_kafka.conf
    input {
        kafka {
            bootstrap_servers => "192.168.213.128:9092,192.168.213.136:9092"
            topics => ["test_logstash"]
        }
    }
    output {
        stdout {
            codec=> rubydebug
        }
    }
    

    (2)output配置示例

    [root@zookeeper01 logstash]# vim conf.d/test_kafka_output.conf
    input {
        stdin {
            add_field => {"key" => "value"}
            codec => "plain"
            tags => ["add"]
            type => "std"
        }
    }
    output {
        kafka {
            bootstrap_servers => "192.168.213.128:9092"
            topic_id => "test_logstash"
        }
    }
    
  • 相关阅读:
    iOS中过滤html文档中的标签
    十六进制函数转换UIColor对象
    vue使用echarts
    vue打包部署
    charels代理跨域访问接口
    vue 使用highcharts
    vue配置跨域
    命令行
    安装nvm
    vsCode个人设置
  • 原文地址:https://www.cnblogs.com/zhaoya2019/p/13141762.html
Copyright © 2011-2022 走看看