zoukankan      html  css  js  c++  java
  • Airflow v2.0 分布式部署 elasticsearch日志解决方案

    Airflow v2.0 分布式部署 elasticsearch日志解决方案

    安装环境:

    • docker
    • airflow v2.0
    • elasticsearch 7+
    • filebeat 7+

    开发依赖:

    pip install 'apache-airflow-providers-elasticsearch'

    日志方案

    graph LR AF[(Airflow)] -.写入.-> LOG[[json格式日志文件]] -.读取.-> FB[/Filebeat 解析规范化日志结构/] -.存入.-> ES[(ElasticSearch)]

    上图filebeat和logstash之间可以加入logstash处理,根据个人方案设计。

    airflow配置

    开启远程日志设置,另配置elasticsearch设置信息,使webserver可以访问到elasticsearch,远程日志的获取是通过对log_id进行搜索的,所以要保证日志输出包含与log_id_template配置格式匹配的log_id字段。

    [logging]
    # 开启远程日志开关
    remote_logging = True
    
    # 设置webserver elasticsearch连接信息
    [elasticsearch]
    host = your_host:your_port
    # log_id模板,日志搜索的id
    log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
    # 是否将日志输出到标准输出,根据个人日志方案设置,由于本人是需要filebeat读取日志文件再发送至elasticsearch,所以设为false
    write_stdout = False
    # 是否将日志输出为json
    json_format = True
    
    # elasticsearch 加密配置,根据个人需求配置
    [elasticsearch_configs]
    use_ssl = False
    verify_certs = False
    

    filebeat配置

    filebeat负责读取worker节点任务执行产生的日志,并将其格式化规范化后发给elasticsearch进行保存。

    注意事项:

    • host字段须为可哈希类型或者不存在

    下面配置仅包含涉及到需要修改的配置。

    # 读取日志文件
    filebeat.inputs:
    
    # Each - is an input. Most options can be set at the input level, so
    # you can use different inputs for various configurations.
    # Below are the input specific configurations.
    
    - type: log
      enabled: true
      paths: 
        - /app/logs/**/*.log
      exclude_files: 
        - '.py.log$'
    
    setup.template:
      # name和pattern为elasticsearch index设置需要
      name: "airflow_log_template"
      pattern: "airflow-log*"
    
    # 关闭Index lifecycle management,否则修改index会无效
    setup.ilm.enabled: false
    
    output.elasticsearch:
      # Array of hosts to connect to.
      hosts: '${FILEBEAT_OUTPUT_ELASTICSEARCH_HOSTS:"127.0.0.1:9200"}'
    
      # Protocol - either `http` (default) or `https`.
      protocol: '${FILEBEAT_OUTPUT_ELASTICSEARCH_PROTOCOL:"http"}'
    
      # Authentication credentials - either API key or username/password.
      #api_key: "id:api_key"
      username: '${FILEBEAT_OUTPUT_ELASTICSEARCH_USERNAME:""}'
      password: '${FILEBEAT_OUTPUT_ELASTICSEARCH_PASSWORD:""}'
      index: "airflow-log-%{+yyyy.MM}"
    
    # 日志处理设置
    processors:
      # 关闭host信息输出
      # - add_host_metadata:
            # when.not.contains.tags: forwarded
    
      # 添加对json日志的解析
      - decode_json_fields:
          fields: ["message"]
          process_array: false
          max_depth: 1
          target: ""
          overwrite_keys: true
          add_error_key: true
    
      # 移除host字段
      - drop_fields:
          fields: ["host"]
          ignore_missing: true
    

    当配置修改成功后,使用filebeat -e进行配置测试

    参考

    airflow webserver获取日志报错

    [2021-01-27 15:12:57,006] {app.py:1891} ERROR - Exception on /get_logs_with_metadata [GET]
    Traceback (most recent call last):
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
        response = self.full_dispatch_request()
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
        rv = self.handle_user_exception(e)
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
        reraise(exc_type, exc_value, tb)
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
        raise value
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
        rv = self.dispatch_request()
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
        return self.view_functions[rule.endpoint](**req.view_args)
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/www/auth.py", line 34, in decorated
        return func(*args, **kwargs)
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/www/decorators.py", line 60, in wrapper
        return f(*args, **kwargs)
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
        return func(*args, session=session, **kwargs)
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/www/views.py", line 1054, in get_logs_with_metadata
        logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/utils/log/log_reader.py", line 58, in read_log_chunks
        logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 217, in read
        log, metadata = self._read(task_instance, try_number_element, metadata)
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 163, in _read
        logs_by_host = self._group_logs_by_host(logs)
      File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 132, in _group_logs_by_host
        grouped_logs[key].append(log)
    TypeError: unhashable type: 'AttrDict'
    

    上面报错对应代码:

        @staticmethod
        def _group_logs_by_host(logs):
            grouped_logs = defaultdict(list)
            for log in logs:
                key = getattr(log, 'host', 'default_host')  # 此处为获取host值作为日志键
                grouped_logs[key].append(log)
    
            # return items sorted by timestamp.
            result = sorted(grouped_logs.items(), key=lambda kv: getattr(kv[1][0], 'message', '_'))
    
            return result
    

    由上面代码可以看出,上面会获取日志host字段信息并将其作为字典的键,所以日志中host字段内容必须为可以做字典键的可哈希类型,不可为列表或者字典等可变类型,删除日志host字段或者设为哈希类型可以解决此问题。

  • 相关阅读:
    fiddler如何抓取夜神模拟器上的包
    关于在虚拟机上安装iOS所遇到的问题
    ADB WiFi连接手机
    ADB命令总结(1)
    ADB连接手机遇到的问题:list of devices attached
    win 7 查看端口被占用
    APP测试用例要考虑的一些方面
    什么是阿尔法测试?
    关于ADB push 出现failed to copy 'D:file.xtxt' to '/system/temp/' : Read-only file system 的报错信息解决办法
    ADB push 和ADB pull命令
  • 原文地址:https://www.cnblogs.com/li1234yun/p/14336209.html
Copyright © 2011-2022 走看看