zoukankan      html  css  js  c++  java
  • elk7.3.2带认证的基础配置

    拉取7.3.2的elasticsearch镜像。

    docker run -d --name elasticsearch --net ELS -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node"  elasticsearch:7.3.2

    启动后用

    docker container cp -a 容器ID:路径  宿主机路径
    拷贝容器内的config文件到宿主机用来挂载。

    删除此时的容器,在拷贝出来的配置文件elasticsearch.yml 加上:

    http.cors.enabled: true
    http.cors.allow-origin: "*"
    http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type  允许跨域

    xpack.security.enabled: true 开启校验。

    完成版:

    cluster.name: "docker-cluster"
    network.host: 0.0.0.0
    http.port: 9200
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type
    xpack.security.enabled: true

    docker run -d --name elasticsearch --net ELS -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node"  -e "ELASTIC_PASSWORD=Tsl@2018"  -e "KIBANA_PASSWORD=Tsl@2018" -v /home/aa/elastic/config:/usr/share/elasticsearch/config elasticsearch:7.3.2
    指定了elastic和kibana的密码。并且挂载了配置文件。

    拉取7.3.2的kibana镜像。
    同样的
     docker run -d --name kibana --net ELS -p 5601:5601 kibana:7.3.2 空启动后,拷贝config配置文件夹到宿主机上。
    修改配置文件
    kibana.yml

    添加配置

    elasticsearch.hosts: [ "http://192.168.66.34:9200" ]
    xpack.monitoring.ui.container.elasticsearch.enabled: true
    elasticsearch.username: "kibana"
    elasticsearch.password: "1111"
    xpack.security.enabled: true.

    完整版:

    server.name: kibana
    server.host: "0"
    elasticsearch.hosts: [ "http://192.168.66.34:9200" ]
    xpack.monitoring.ui.container.elasticsearch.enabled: true
    elasticsearch.username: "kibana"
    elasticsearch.password: "1111"
    xpack.security.enabled: true

    此时配置的用户名 kibana只是kibana链接es的用户名,并不是kibana登陆的用户名,登陆还是需要最高权限的elasstic账号登陆。
     docker run -d --name kibana --net ELS -p 5601:5601 -v /home/aa/kibana/config:/usr/share/kibana/config kibana:7.3.2

    拉取logstash的镜像。

    针对从数据库抽取数据到es,7版本的logstash不用像5一样需要指定启动读取配置文件了。而是有个专门的文件夹存放读取写入配置文件的。
    pipeline
    文件夹存放读取写入配置conf文件的。
    config 存放logstash的启动配置
    logstash.yml修改后完整版:

    http.host: "0.0.0.0"
    xpack.monitoring.elasticsearch.hosts: [ "http://192.168.66.34:9200" ]
    xpack.management.elasticsearch.username: elastic
    xpack.management.elasticsearch.password: 1111
    xpack.monitoring.enabled: true
    xpack.monitoring.elasticsearch.username: elastic
    xpack.monitoring.elasticsearch.password: 1111

    需要复制活挂载驱动jar到容器中,

    docker run -d -p 5044:5044 -p 9600:9600 -it --name logstash -v /home/aa/logstash/config/:/usr/share/logstash/config/ -v /home/aa/logstash/pipeline:/usr/share/logstash/pipeline -v /home/aa/logstash/mysql/:/some/config-dir/ --network ELS logstash:7.3.2

     这个版本在这样启动后,会一直报错找不到驱动。

     

    不是挂载位置的问题。后来查了好久说是容器内的java的classpath位置在/usr/share/logstash/logstash-core/lib/jars下,挂载在别的地方,尽管conf配置文件写对了地址,依然是读取不到的!!!

    把驱动jar放在此文件夹下,conf里的驱动地址空着然后启动就可以读取到了。

    应该最终elk三兄弟整合成一个docker-compose文件的,现在还没学会。等等会了补充上。

    input {
    jdbc {
    jdbc_driver_library => ""
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "111"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"
    # 同步频率(分 时 天 月 年),默认每分钟同步一次
    schedule => "* * * * *"
    statement => " select sal.alarmID, vi.districtID, di.name districtName, de.streetID, st.name streetName,
    de.committeeID, comm.name committeeName, sal.villageID, vi.name villageName, de.buildingID,
    vi.name, sal.alarmCount, sal.address,
    sal.deviceType,sal.alarmTypeName,sal.modelID,
    sal.alarmLevel,sal.alarmState,sal.alarmTime,
    sal.alarmContent,de.installAddr,sal.updateTime
    from e_sense_alarm_log sal
    left join e_device de on de.deviceID = sal.deviceID
    left join b_village vi on vi.villageID = de.villageID
    left join b_district di on di.districtID = vi.districtID
    left join b_street st on st.streetID = vi.streetID
    left join b_committee comm on comm.committeeID = vi.committeeID
    WHERE sal.updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/last_record/logstash_alarm_last_time"
    type => "alarm"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
    }

    jdbc {
    jdbc_driver_library => ""
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "111"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"
    schedule => "* * * * *"
    statement => " select de.deviceID, de.isDelete, vi.districtID, di.name districtName, de.streetID, st.name streetName,
    de.committeeID, comm.name committeeName, de.villageID, vi.name villageName, de.buildingID,
    de.installAddr as installadd,
    de.type as devicetype, bu.buildingNo as buildingno, bu.name as buildingName,
    de.productModel as productmodel, de.name, de.code as code, de.installTime as installtime,
    de.state, de.updateTime as updatetime
    from e_device de
    left join b_building bu on de.buildingID = bu.buildingID
    left join b_village vi on vi.villageID = de.villageID
    left join b_district di on di.districtID = vi.districtID
    left join b_street st on st.streetID = vi.streetID
    left join b_committee comm on comm.committeeID = vi.committeeID
    WHERE de.updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/last_record/logstash_device_last_time"
    type => "device"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
    }

    jdbc {
    jdbc_driver_library => ""
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "111"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"
    schedule => "* * * * *"
    statement => " select al.accessLogID, vi.districtID, di.name districtName,vi.streetID, st.name streetName, vi.committeeID,
    comm.name committeeName, al.villageID, vi.name villageName, al.buildingID as buildingid,bui.name buildName,
    peo.peopleID, al.peopleName as peoplename,
    peo.gender, peo.phoneNo as phoneno, al.credentialNo as credentialno,
    lab.name as peoplelabel, bu.buildingNo as buildingno,
    al.cardNo as cardno, al.updateTime as opentime, peo.headPic as headpic,
    (case al.openType when '100101' then '刷门禁卡开门' when '100201' then '人脸识别开门' when '100301' then '手机蓝牙开门'
    when '100302' then '手机远程开门' when '100303' then '电话按键开门' when '100401' then '出门按钮开门'
    when '100402' then '键盘密码开门' when '100501' then '身份证开门' when '100601' then '访客呼叫开门' end) opentype,
    peo.livePic as livepic, peo.idPic as idpic , al.faceLogID faceLogID, io.name ioName, al.deviceID
    from e_access_log al
    left join p_people peo on peo.credentialNo =al.credentialNo
    left join p_people_label pl on pl.peopleID = peo.peopleID
    left join s_label lab on lab.labelID = pl.labelID
    left join b_building bu on bu.buildingID = al.buildingID
    left join b_village vi on vi.villageID = al.villageID
    left join b_in_out io on io.ioID = al.ioID
    left join b_district di on di.districtID = vi.districtID
    left join b_street st on st.streetID = vi.streetID
    left join b_committee comm on comm.committeeID = vi.committeeID
    left join b_building bui on bui.buildingID = al.buildingID
    WHERE al.updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/last_record/logstash_accessLog_last_time"
    type => "accessLog"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
    }

    jdbc {
    jdbc_driver_library => ""
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "111"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"
    schedule => "* * * * *"
    statement => "select fl.faceLogID,io.type as faceinouttype, vi.districtID, di.name districtName, vi.streetID, st.name streetName,
    vi.committeeID, comm.name committeeName, io.villageID, vi.name villageName, io.ioID as ioid, io.name, bid.deviceID,
    fl.personType as persontype,
    peo.peopleName as peoplename, peo.gender, peo.nation, peo.birthDate,
    peo.phoneNo as phoneno, peo.credentialNo as credentialno,
    peo.domiclleDetailAddress, peo.residenceDetailAddress,
    sl.name as peoplelabel, fl.updateTime as facecapturetime, fl.bkgUrl as bkgurl,
    fl.faceUrl as faceurl, peo.headPic as headpic, peo.livePic as livepic, peo.idPic as idpic ,
    peo.political, peo.education, peo.maritialStatus, peo.origin, fl.faceSimilarity*100 faceSimilarity, peo.peopleType
    from e_face_log fl
    left join b_in_out io on io.ioID = fl.ioID
    left join p_people peo on peo.credentialNo = fl.credentialNo
    left join p_people_label pl on pl.peopleID = peo.peopleID
    left join s_label sl on sl.labelID = pl.labelID
    left join b_village vi on vi.villageID = io.villageID
    left join b_inout_device bid on bid.ioID = io.ioID
    left join b_district di on di.districtID = vi.districtID
    left join b_street st on st.streetID = vi.streetID
    left join b_committee comm on comm.committeeID = vi.committeeID
    where fl.updateTime >= :sql_last_value
    #and fl.faceSource = 0
    "
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/last_record/logstash_wkface_last_time"
    type => "wkface"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
    }
    jdbc {
    jdbc_driver_library => ""
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "111"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"
    schedule => "* * * * *"
    statement => "select pr.parkingReserveID, vi.districtID, di.name districtName, vi.streetID, st.name streetName, vi.committeeID,
    comm.name committeeName, pr.villageID, vi.name villageName, io.ioID as inioid,
    io.ioID as outioid, pr.inParkingLogID as inparkinglogid,
    pr.outParkingLogID as outparkinglogid, pr.carBrand as cartype,
    pr.plateNo as plateno, peo.peopleName as peoplename, peo.phoneNo as phoneno,
    peo.credentialNo as credentialno, pr.insertTime as intime, pr.updateTime as outtime,
    peo.headPic as headpic,
    peo.livePic as livepic, peo.idPic as idpic, inlog.platePic as inplatepic,
    outlog.platePic as outplatepic, inlog.minPlatePic as inplatepic,
    outlog.minPlatePic as outplatepic, pr.isRegister
    from e_parking_reserve pr
    left join e_parking_channel pc on pc.parkingID = pr.parkingID
    left join b_in_out io on io.ioID = pc.ioID
    left join e_parking_car ec on ec.plateNo = pr.plateNo
    left join p_people peo on peo.peopleID = ec.peopleID
    left join e_parking_log inlog on inlog.parkingLogID = pr.inParkingLogID
    left join e_parking_log outlog on outlog.parkingLogID = pr.outParkingLogID
    left join b_village vi on vi.villageID = io.villageID
    left join b_district di on di.districtID = vi.districtID
    left join b_street st on st.streetID = vi.streetID
    left join b_committee comm on comm.committeeID = vi.committeeID
    where pr.updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/last_record/logstash_wkcar_last_time"
    type => "wkcar"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
    }

    }
    output {
    if [type] == "alarm"{
    elasticsearch {
    # ES的IP地址及端口
    hosts => ["192.168.66.34:9200"]
    user => "elastic"
    password => "111"
    # 索引名称 可自定义
    index => "alarmlogindex"
    # 需要关联的数据库中有有一个id字段,对应类型中的id
    document_id => "%{alarmID}"
    document_type => "alarm"
    }
    }
    if [type] == "device"{
    elasticsearch {
    # ES的IP地址及端口
    hosts => ["192.168.66.34:9200"]
    user => "elastic"
    password => "111"
    # 索引名称 可自定义
    index => "deviceindex"
    # 需要关联的数据库中有有一个id字段,对应类型中的id
    document_id => "%{deviceID}"
    document_type => "device"
    }
    }
    if [type] == "accessLog"{
    elasticsearch {
    # ES的IP地址及端口
    hosts => ["192.168.66.34:9200"]
    user => "elastic"
    password => "111"
    # 索引名称 可自定义
    index => "accesslogindex"
    # 需要关联的数据库中有有一个id字段,对应类型中的id
    document_id => "%{accessLogID}"
    document_type => "accessLog"
    }
    }
    if [type] == "wkface"{
    elasticsearch {
    # ES的IP地址及端口
    hosts => ["192.168.66.34:9200"]
    user => "elastic"
    password => "111"
    # 索引名称 可自定义
    index => "facelogindex"
    # 需要关联的数据库中有有一个id字段,对应类型中的id
    document_id => "%{faceLogID}"
    document_type => "wkface"
    }
    }
    if [type] == "wkcar"{
    elasticsearch {
    # ES的IP地址及端口
    hosts => ["192.168.66.34:9200"]
    user => "elastic"
    password => "111"
    # 索引名称 可自定义
    index => "parkingreservelogindex"
    # 需要关联的数据库中有有一个id字段,对应类型中的id
    document_id => "%{parkingReserveID}"
    document_type => "wkcar"
    }
    }
    stdout {
    # JSON格式输出
    codec => json_lines
    }
    }



    docker-compose.yml的elk配置:

    提前把elk的3个config文件夹复制进相应的文件夹下。

     logstash因为需要复制驱动进容器,所以需要自定义一个镜像。

    Dockerfile内容:

    FROM logstash:7.2.0

    MAINTAINER kf

    ADD ./mysql/*****.jar /usr/share/logstash/logstash-core/lib/jars      //复制驱动jar进镜像

    RUN mkdir last_record                       //容器内在当前目录下创建文件夹

    此处源文件需要是相对路径。不能写绝对路径。

    docker-compose.yml文件内容:

    version: "3"
    services:

    elasticsearch:
    image: elasticsearch:7.2.0
    container_name: elastic
    ports:
    - 9200:9200
    - 9300:9300
    environment:
    ELASTIC_PASSWORD: Root@2018
    KIBANA_PASSWORD: Kibana@2018
    LOGSTASH_PASSWORD: Logstash@2018
    discovery_type: single-node
    volumes:
    - /root/data/elastic/config:/usr/share/elasticsearch/config
    restart: always

    kibana:
    image: kibana:7.2.0
    container_name: kibana
    ports:
    - 5601:5601
    volumes:
    - /root/data/kibana/config:/usr/share/kibana/config
    restart: always

    logstash:
    image: logstash:7       自定义的镜像
    container_name: logstash
    ports:
    - 5044:5044
    - 9600:9600
    volumes:
    - /root/data/logstash/config:/usr/share/logstash/config
    - /root/data/logstash/pipeline:/usr/share/logstash/pipeline
    restart: always
    networks:
    default:
    external:
    name: ELS


    执行docker-compose up -d 报错需要创建network 根据提示创建完成后再次执行即可。
    发现docker-compose命令启动的es会报错。
    最后还是用了docker run的方式启动。
    没有找到原因。有搞成了的麻烦留言给我,谢谢
    后续。请教了别人,着了道原因。docker-compose启动时 已集群方式启动的,虽然配了单节点启动的环境变量,但还是不会生效。discovery_type: single-node
    此时要把此变量加在挂载的elasticsearch.yml文件中。完整版:

    cluster.name: "docker-cluster"
    discovery.type: "single-node"
    network.host: 0.0.0.0
    http.port: 9200
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type
    xpack.security.enabled: true
    #xpack.security.transport.ssl.enabled: true

    此时docker-compose.yml完整版是:

    version: "3.7"
    services:

    elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.3.2
    container_name: elastic
    ports:
    - 9200:9200
    - 9300:9300
    environment:
    #- discovery_type=single-node
    - ELASTIC_PASSWORD=Root@
    - KIBANA_PASSWORD=Kibana@
    - LOGSTASH_PASSWORD=Logstash@
    volumes:
    - ./elastic/config:/usr/share/elasticsearch/config
    restart: always

    kibana:
    image: kibana:7.3.2
    container_name: kibana
    ports:
    - 5601:5601
    volumes:
    - /data/elk/kibana/config:/usr/share/kibana/config
    depends_on:
    - elasticsearch
    restart: always

    logstash:
    image: logstash:7
    container_name: logstash
    ports:
    - 5044:5044
    - 9600:9600
    volumes:
    - /data/elk/logstash/config:/usr/share/logstash/config
    - /data/elk/logstash/pipeline:/usr/share/logstash/pipeline
    depends_on:
    - elasticsearch
    restart: always
    networks:
    default:
    external:
    name: ELS

    此时既可以  docker-compose up -d --build 启动elk了。

    此时存入es的数据会存在时区导致的时间差8小时问题。可以在docker-compose的es的环境变量加入:- TZ=Asia/Shanghai

     即可将插入的时间字段和数据库一致。但此时的时间会是UTC格式的。前端转变格式后 时间又会默认加8小时。所以尽量在es这里存入时时间格式也转为普通的YYYY-MM-DD HH:mm:ss格式。

    查到一个函数:DATE_FORMAT(sal.alarmTime,'%Y-%m-%d %T')    存储类型type为text。

    还有一种方式看来的,没有经过测试:

    将mysql中的mytime数据在logstash中作一个8小时追加

    filter{ ruby 
            { 
                code => "event.set('mytime', event.get('mytime').time.localtime + 8*60*60)" 
            }}
    ————————————————

     转换时间格式为text后,查询会报错:

    Fielddata is disabled on text fields by default. Set fielddata=true on [gender] in order to load fielddata in memory by uninverting the inverted index. Note that this can however use significant memor

    此时需要在kibana执行:

    PUT facelogindex/_mapping
    {
      "properties": {
        "facecapturetime": {
          "type": "text",
          "fielddata": true
        }
      }
    }

    facelogindex为索引,  facecapturetime为字段名






  • 相关阅读:
    前端代码规范
    node服务通过Jenkins上线流程
    移动端常用布局方法
    前端工程化
    前端开发辅助
    前端Node项目发布流程
    观list.clear()方法 有感
    数据导出 写入到excle文件
    tomcat内存使用情况
    三种分布式锁 简易说说(包含前一篇提到的redis分布式锁)
  • 原文地址:https://www.cnblogs.com/fuguang/p/11806733.html
Copyright © 2011-2022 走看看