zoukankan      html  css  js  c++  java
  • Singer 修改tap-s3-csv 支持minio 连接

    singer 团队官方处了一个tap-s3-csv 的tap,对于没有使用aws 的人来说并不是很方便了,所以简单修改了
    下源码,可以支持通用的s3 csv 文件的处理,同时发布到了官方pip 仓库中,方便大家使用。
    以下是简单代码修改部分的说明,以及如何发布pip包

    修改说明

    主要是关于连接s3 的部分,因为tap-s3-csv 使用的是boto3 我们需要修改的就是关于boto3 连接s3 的部署
    添加上aws_access_key_id,aws_secret_access_key,endpoint_url
    关于s3 自定义连接的说明,格式如下:

     
      s3_client = boto3.session.Session().client(
            service_name='s3',
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            endpoint_url=endpoint_url,
        )

    几个需要修改的部分

    • s3.py
      get_input_files_for_table 部分,主要是传递参数的
      修改如下:
     
    def get_input_files_for_table(config, table_spec, modified_since=None):
        bucket = config['bucket']
        aws_access_key_id = config['aws_access_key_id']
        aws_secret_access_key =config['aws_secret_access_key']
        endpoint_url =config['endpoint_url']
        to_return = []
        pattern = table_spec['search_pattern']
        try:
            matcher = re.compile(pattern)
        except re.error as e:
            raise ValueError(
                ("search_pattern for table `{}` is not a valid regular "
                 "expression. See "
                 "https://docs.python.org/3.5/library/re.html#regular-expression-syntax").format(table_spec['table_name']),
                pattern) from e
        LOGGER.info(
            'Checking bucket "%s" for keys matching "%s"', bucket, pattern)
        matched_files_count = 0
        unmatched_files_count = 0
        max_files_before_log = 30000
        for s3_object in list_files_in_bucket(bucket,aws_access_key_id,aws_secret_access_key,endpoint_url, table_spec.get('search_prefix')):
            key = s3_object['Key']
            last_modified = s3_object['LastModified']
            LOGGER.info(key)
            LOGGER.info(last_modified)
            if s3_object['Size'] == 0:
                LOGGER.info('Skipping matched file "%s" as it is empty', key)
                unmatched_files_count += 1
                continue
            if matcher.search(key):
                matched_files_count += 1
                if modified_since is None or modified_since < last_modified:
                    LOGGER.info('Will download key "%s" as it was last modified %s',
                                key,
                                last_modified)
                    yield {'key': key, 'last_modified': last_modified}
            else:
                unmatched_files_count += 1
            if (unmatched_files_count + matched_files_count) % max_files_before_log == 0:
                # Are we skipping greater than 50% of the files?
                if 0.5 < (unmatched_files_count / (matched_files_count + unmatched_files_count)):
                    LOGGER.warn(("Found %s matching files and %s non-matching files. "
                                 "You should consider adding a `search_prefix` to the config "
                                 "or removing non-matching files from the bucket."),
                                matched_files_count, unmatched_files_count)
                else:
                    LOGGER.info("Found %s matching files and %s non-matching files",
                                matched_files_count, unmatched_files_count)
        if 0 == matched_files_count:
            raise Exception("No files found matching pattern {}".format(pattern))

    list_files_in_bucket 修改核心部分,关于连接s3 的
    修改如下:

     
     @retry_pattern()
    def list_files_in_bucket(bucket,aws_access_key_id,aws_secret_access_key,endpoint_url, search_prefix=None):
        s3_client = boto3.session.Session().client(
            service_name='s3',
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            endpoint_url=endpoint_url,
        )
        s3_object_count = 0
        max_results = 1000
        args = {
            'Bucket': bucket,
            'MaxKeys': max_results,
        }
        if search_prefix is not None:
            args['Prefix'] = search_prefix
        paginator = s3_client.get_paginator('list_objects_v2')
        pages = 0
        for page in paginator.paginate(**args):
            pages += 1
            LOGGER.debug("On page %s", pages)
            s3_object_count += len(page['Contents'])
            yield from page['Contents']
        if 0 < s3_object_count:
            LOGGER.info("Found %s files.", s3_object_count)
        else:
            LOGGER.warning('Found no files for bucket "%s" that match prefix "%s"', bucket, search_prefix)

    get_file_handle 部分,主要是关于获取s3 对象内容的

    @retry_pattern()
    def get_file_handle(config, s3_path):
        bucket = config['bucket']
        aws_access_key_id = config['aws_access_key_id']
        aws_secret_access_key =config['aws_secret_access_key']
        endpoint_url =config['endpoint_url']
        s3_client = boto3.resource(
           service_name="s3",
           aws_access_key_id=aws_access_key_id,
           aws_secret_access_key=aws_secret_access_key,
           endpoint_url=endpoint_url)
        s3_bucket = s3_client.Bucket(bucket)
        s3_object = s3_bucket.Object(s3_path)
        return s3_object.get()['Body']
    • init.py
      关于tap 命令处理的部分,比如模式发现,执行同步,以及参数检验的
      参数校验修改,修改为我们配置参数需要的
    REQUIRED_CONFIG_KEYS = ["start_date", "bucket", "aws_access_key_id", "aws_secret_access_key", "endpoint_url"]

    main 函数:

    @singer.utils.handle_top_exception(LOGGER)
    def main():
        args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS)
        config = args.config
        bucket = config['bucket']
        aws_access_key_id = config['aws_access_key_id']
        aws_secret_access_key =config['aws_secret_access_key']
        endpoint_url =config['endpoint_url']
        config['tables'] = validate_table_config(config)
        try:
            for page in s3.list_files_in_bucket(bucket,aws_access_key_id,aws_secret_access_key,endpoint_url):
                break
            LOGGER.warning("I have direct access to the bucket without assuming the configured role.")
        except:
            LOGGER.error("can't connect to s3 storage")
        if args.discover:
            do_discover(args.config)
        elif args.properties:
            do_sync(config, args.properties, args.state)
    • pip 包约定处理
      为了不和官方冲突,重新别名
      setup.py:
     
    #!/usr/bin/env python
    from setuptools import setup
    setup(name='tap-minio-csv',
          version='1.2.2',
          description='Singer.io tap for extracting CSV files from minio',
          author='rongfengliang',
          url='https://github.com/rongfengliang/tap-minio-csv',
          classifiers=['Programming Language :: Python :: 3 :: Only'],
          py_modules=['tap_minio_csv'],
          install_requires=[
              'backoff==1.3.2',
              'boto3==1.9.57',
              'singer-encodings==0.0.3',
              'singer-python==5.1.5',
              'voluptuous==0.10.5'
          ],
          extras_require={
              'dev': [
                  'ipdb==0.11'
              ]
          },
          entry_points='''
              [console_scripts]
              tap-minio-csv=tap_minio_csv:main
          ''',
          packages=['tap_minio_csv'])
    • 项目包名称

    发布pip 包

    • 安装工具
    python3 -m pip install --user --upgrade setuptools wheel twine
    • 生成文件
    python3 setup.py sdist bdist_wheel
    • 上传
      需要先注册账户,执行以下命令,按照提示输入账户信息即可
     
    twine upload dist/*
    • pip 包

    说明

    以上是一个简单的说明,详细代码可以参考https://github.com/rongfengliang/tap-minio-csv

    参考资料

    https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
    https://github.com/singer-io/tap-s3-csv
    https://github.com/rongfengliang/tap-minio-csv

  • 相关阅读:
    (6)STM32使用HAL库实现modbus的简单通讯
    (4)STM32使用HAL库实现串口通讯——理论讲解
    (3)STM32使用HAL库操作外部中断——实战操作
    (2)STM32使用HAL库操作外部中断——理论讲解
    对图片进行压缩、水印、伸缩变换、透明处理、格式转换操作1
    文件压缩、解压工具类。文件压缩格式为zip
    Bean与Map的转换 和 Map与Bean的转换
    正则 身份证的验证
    金钱处理工具类 人民币转换为大写
    正则表达式工具类,验证数据是否符合规范
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/11396174.html
Copyright © 2011-2022 走看看