zoukankan      html  css  js  c++  java
  • kafka+elk

    安装elasticsearch

    下载:http://www.elastic.co/downloads/elasticsearch

    下载后解压

    修改配置文件,xxx是自定义目录

    vi elasticsearch.yml
    cluster.name: my-application
    node.name: node-1
    path.data: /xxx/data/elastic/data
    path.logs: /xxx/data/elastic/logs
    network.host: 10.202.203.29

    在bin目录下执行./elasticsearch ,会报错,提示不能使用root用户启动

    所以新增一个用户,用户名密码都是elastic

    groupadd elastic
    useradd elastic -g elastic -p elastic

    修改相应目录的权限

    chown -R elastic:elastic elasticsearch-6.2.4

    注意上面的数据和日志目录也需要修改权限 -R参数表示递归处理

    切换到新用户

    su elastic

    重新执行启动命令,会报如下错误

    上面的错误修改

    vim /etc/security/limits.conf

    切换回root用户修改配置

    sysctl -w vm.max_map_count=262144

    查看结果

    sysctl -a|grep vm.max_map_count

    重新切换到elastic用户启动服务

    表示成功启动,打开浏览器:

    停止服务

    $ jps | grep Elasticsearch
    14542 Elasticsearch
    $ kill -SIGTERM 15516

    如果es需要启用密码验证的话,需要做如下配置

    可以通过xpack api 查看

    此时的许可证是无效的,安全是不会开启的

    需要先启用一个30天的许可证

    根据提示添加参数

    设置内置账户密码

    再次查看,需要加上登录选项

    此时再看,安全已经启用了

    安装logstash

    下载:http://www.elastic.co/cn/downloads/logstash

    解压到bin目录执行

    ./logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'

    这里等待用户输入,输入后返回数据

     安装kibana

    下载:http://www.elastic.co/cn/downloads/kibana

    解压后修改conf目录下的配置,注意这里elastic的地址不要用localhost

    server.host: "10.202.203.29"
    server.name: "kibana-1"
    elasticsearch.url: "http://10.202.203.29:9200"
    elasticsearch.username: "elastic"
    elasticsearch.password: "elastic"

    使用如下命令启动

    nohup ./kibana >> ../logs/kibana.log &

    如果报一下提示

    使用下面命令

    nohup ./kibana >> ../logs/kibana.log 2>&1 &

    nohup表示后台启动,如果要关闭先找到进程号

    netstat -lnp|grep 5601

    使用下面命令关闭

    kill -9 25668

    打开浏览器5601端口

    kafka与logstash整合

    logstash收集日志到kafka

    在kafka-manager中创建一个新主题kafka-elk-log

    在logstash根目录下新建一个etc目录

    在etc目录中新建一个logstash_input_kafka文件,在文件中配置从控制台接收输入,并将收集到的信息输出到kafka,内容如下

    input{
        # 从控制台接收输入
        stdin{}
    }
    output{
        kafka{
            # 消息写入的主题
            topic_id=>"kafka-elk-log"
            # 连接kafka集群配置
            bootstrap_servers=>"10.202.203.29:9092"
            # 批量写入配置
            batch_size=>5
            # logstash导入数据编码方式
            codec=>"plain"
        }
        stdout{
            # 设置控制台打印数据表现形式
            codec=>rubydebug
        }
    }
    View Code

    切换到bin目录执行

    ./logstash -f ../etc/logstash_input_kafka

    输入一行消息

    使用下面命令验证

    kafka-run-class.sh kafka.tools.DumpLogSegments --files /neworiental/logs/kafka-logs/kafka-elk-log-0/00000000000000000000.log --print-data-log

     logstash从kafka消费日志

    在logstash/etc目录下新增一个logstash_out_es文件

    input{
        kafka{
            # 消费者
            group_id=>"kafka_elk_group"
            # 消费者标识
            client_id=>logstash
            # 消费的主题
            topics=>"kafka-elk-log"
            # 连接kafka集群配置
            bootstrap_servers=>"10.202.203.29:9092"
            # 消费起始位置
            auto_offset_reset=>"earliest"
            # 消费者线程数
            consumer_threads=>5
            # logstash导入数据编码方式
            codec=>"plain"
        }
    }
    output{
        elasticsearch{
            # elasticsearch集群地址,多个地址以逗号分隔
            hosts=>["10.202.203.29:9200"]
            # 创建索引
            index=>"kafka-elk-log-%{+YYYY.MM.dd}"
            # 指定数据导入elasticsearch格式
            codec=>"plain"
        }
    }
    View Code

    在bin目录执行

    ./logstash -f ../etc/logstash_out_es

    接下来打开kibana-management添加一个索引模式

    接下来在Discover中选择刚才添加的索引模式,同时添加只显示message和timestamp

     打开生产者程序,写入几条消息

    点击kibana右上角的时间选择

    选择刷新时间和Quick标签,可以实时看到日志信息

     logstash从rabbitmq消费日志

    input{
        kafka{
            # rabbitmq地址
            host=>"10.202.80.196"
            # 端口
            port=>5672
            # vhost
            vhost=>"NIST"
            # 用户名
            user=>"bm"
            # 密码
            password=>"bm_p@ss"
            # queue
            queue=>"apicenter"
            # subscription_retry_interval_seconds
            subscription_retry_interval_seconds=>5
            # key
            key=>"apicenter"
        }
    }
    output{
        elasticsearch{
            # elasticsearch集群地址,多个地址以逗号分隔
            hosts=>["10.202.203.29:9200"]
            # 创建索引
            index=>"rabbitmq-elk-log-%{+YYYY.MM.dd}"
            # 指定数据导入elasticsearch格式
            codec=>"plain"
        }
    }
    View Code

    Metricbeat

    下载:http://www.elastic.co/downloads/beats/metricbeat

    解压后编辑metricbeat.yml文件,设置elasticsearch

    启动

    ./metricbeat -e -c metricbeat.yml

    默认使用当前版本作为索引

    在kibana中添加索引模式

    在Discovery中查看数据

     接下来设置dashboard,编辑配置文件

    重新启动metricbeat,打开kibana,切换到Dashboard

     可以看到

     点击System进去
     
  • 相关阅读:
    Smobiler如何实现.net一键开发,ios和android跨平台运行
    使用Smobiler实现类似美团的界面
    疫情当下,企业系统如何快速实现移动化?
    Smobiler针对百度文字识别SDK动态编译与运行
    smobiler自适应不同手机分辨率
    仓库管理移动应用解决方案——C#开发的移动应用开源解决方案
    移动OA办公——Smobiler第一个开源应用解决方案,快来get吧
    react-native 标题随页面滚动显示和隐藏
    react-native 键盘遮挡输入框
    解决adb网络连接中出现的“由于目标计算机积极拒绝,无法连接”错误
  • 原文地址:https://www.cnblogs.com/uptothesky/p/8874889.html
Copyright © 2011-2022 走看看