zoukankan      html  css  js  c++  java
  • netcore消息发送到kafka通过logstash保存在es当中

    简介

      根据项目需求,需要把系统当中的操作日志保存起来,在技术方面采用了Elasticsearch存储日志。

      项目当中把消息发送kafka当中,logstash关联kafka,然后把接收到的消息保存到elsticsearch当中。

    • 采用docker部署
    • kafka
    • zookeeper(用来保存kafka相关配置信息)
    • logstash

    安装kafka

      kafka采用zk来管理broker注册,在zk上有一个专门用来broker服务器列表记录节点【/brokers/ids】。每个broker在启动时,都会在zk上进行注册。

      以及消费者和生产者负载均衡和记录消息的偏移量。

      安装kafka一共是安装了三个组件。kafka、kafka-manager(可视化)、zookeeper.

      执行docker-compose 文件。生成对应的容器。

    docker-compose -f docker-kafka.yml up -d
    



    docker exec -it kafka /bin/bash
    
    cd opt/kafka_2.13.2.6.0/bin
    



    kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 2 --partitions 2 --topic useroperationlog    
    

      安装完成以后进入到容器里面创建相应的topic即可。

    安装logstash和elasticsearch

      es和logstash也是用docker-compose执行进行创建。在创建的时候logstash需要挂载对应的配置文件,config和pipeline



      安装完成之后需要修改汉化kibana,在kibana.yml文件当中添加i18n.locale: "zh-CN"即可



      修改logstash配置,在logstash.conf文件当中进行添加kafka配置topic、分组、kafka地址等信息



      修改完配置之后记得重启。然后就可以往kafka的topic当中发送消息。大功告成

    image

    附docker-compose文件

      elasticsearch的配置

    version: '3.3'
    services:
      elasticsearch:
        image: docker.elastic.co/elasticsearch/elasticsearch:7.9.3
        container_name: elasticsearch
        ports:
          - 9200:9200
        environment:
          - discovery.type=single-node
          - bootstrap.memory_lock=true
          - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        ulimits:
          memlock:
            soft: -1
            hard: -1
        deploy:
          replicas: 1
          update_config:
             parallelism: 2
             delay: 10s
          restart_policy:
              condition: on-failure1
        volumes:
          - esdata:/usr/share/elasticsearch/data
          - esconfig:/usr/share/elasticsearch/config
          - /etc/localtime:/etc/localtime
        networks:
          - esnet
      kibana:
        image: docker.elastic.co/kibana/kibana:7.9.3
        container_name: kibana
        volumes:
          - kibanaConfig:/usr/share/kibana/config
          - /etc/localtime:/etc/localtime
        ports:
          - 5601:5601
        environment:
          - ELASTICSEARCH_host=http://elasticsearch:9200
        deploy:
          replicas: 1
          update_config:
             parallelism: 2
             delay: 10s
          restart_policy:
              condition: on-failure1
        networks:
          - esnet
      logstash:
        image: logstash:7.9.3
        container_name: logstash
        volumes:
          - logstashConfig:/usr/share/logstash/config
          - pipelinesConfig:/usr/share/logstash/pipeline
        ports:
          - 5044:5044
        deploy:
          replicas: 1
          update_config:
             parallelism: 2
      delay: 10s
          restart_policy:
              condition: on-failure1
        networks:
          - esnet
    volumes:
      esdata: {}
      esconfig: {}
      kibanaConfig: {}
      logstashConfig: {}
      pipelinesConfig: {}
    networks:
      esnet:
          external: true
    
    
    
    

      kafka的docker-compose文件。

    
    version: "3"
    services:
       zookeeper:
         image: zookeeper
         container_name: zookeeper
    
         ports:
          - 2181:2181
         networks:
          - kafkanetwork
         volumes:
          - zookeeper_data:/data
          - zookeeper_log:/logs
          - zookeeper_datalog:/datalog
          - /etc/localtime:/etc/localtime
         deploy:
             restart_policy:
                 condition: on-failure
       kafka:
         image: wurstmeister/kafka
         ports:
          - 9092:9092
         environment:
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_LISTENERS: PLAINTEXT://kafka:9092
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://123.56.23.68:9092
          KAFKA_BROKER_ID: 22
         volumes:
           - /etc/localtime:/etc/localtime
           - kafka_config:/opt/kafka/config/
           - kafka_libs:/opt/kafka/libs/
           - kafka_logs:/kafka
         networks:
           - kafkanetwork
         container_name: kafka
         deploy:
          restart_policy:
              condition: on-failure
       kafka-manager:
        image: sheepkiller/kafka-manager:latest
        ports:
         - 9000:9000
        environment:
         ZK_HOSTS: zookeeper:2181
         APPLICATION_SECRET: LETMEIN
         KM_ARGS: -Djava.net.preferIPv4Stack=true
        networks:
         - kafkanetwork
        deploy:
          restart_policy:
              condition: on-failure
    networks:
       kafkanetwork:
         external:
            name: kafkanetwork
    
    volumes:
       zookeeper_data: {}
       zookeeper_log: {}
       zookeeper_datalog: {}
       kafka_config: {}
       kafka_libs: {}
       kafka_logs: {}
    
    
  • 相关阅读:
    【日语】标日初级上册单词(5-8)1
    【日语】标日初级上册单词(1-4)1
    【日语】计算机日语
    百家姓日语
    【日语】动物名称日语单词集合
    【日语】日语常用会话1000句
    【日语】编程相关日语词汇
    【日语】日语N5学习
    【日语】日语新闻核心词汇
    使用uni-app开发微信小程序之登录模块
  • 原文地址:https://www.cnblogs.com/zhengyazhao/p/13984750.html
Copyright © 2011-2022 走看看