zoukankan      html  css  js  c++  java
  • python 监听文件夹下的文件,将文本内容写入kafka,支持断电续传 (docker 发布)

    python 监听文件夹下的文件,将文本内容写入kafka,支持断电续传 (docker  发布)

    1.代码

    2.docker 部署

    1.代码:

    python 监听文件夹下的文件,将文本内容写入kafka,支持断电续传 (docker  发布)
    
    .代码:
    #! /usr/bin/env python3
    # coding = utf-8
    import json
    import os
    import sys
    import time
    from concurrent.futures.thread import ThreadPoolExecutor
    
    import Constant
    import MongoData
    import jsonpath
    import LogConfig
    import logging
    from watchdog.observers import Observer
    from watchdog.events import *
    
    """
     @Creation Date   : Aug 3, 2021 11:35:23 AM
     
     @Author          :  Sea  
    """
    
    
    def get_file_name(file_dir):
        for root, dirs, files in os.walk(file_dir):
            return files
    
    
    def hander_files(dirs):
        """ fileNames """
        booking_no_list = []
        booking_map = {}
        s = time.time()
        print("start to parse files")
        logging.info("start to parse files")
        for name in dirs:
            if name.endswith(".COMPLETED"):
                continue
            else:
                pass
                # 1.读取文件内容
                # f = open(Constant.FILE_PATH + name, "r")
                # f_str = f.read()
                # doing some thing
                os.rename(Constant.FILE_PATH + name, Constant.FILE_PATH + name+".COMPLETED")
    
    
    
    
    def hander_path():
        dirs = get_file_name(Constant.FILE_PATH)
        hander_files(dirs)
    
    
    
    
    class FileEventHandler(FileSystemEventHandler):
        def __init__(self):
            FileSystemEventHandler.__init__(self)
            self.pools = ThreadPoolExecutor(2)
    
        def on_created(self, event):
            if event.is_directory:
                pass
                # print("directory created:{0}".format(event.src_path))
            else:
                # print("file created:{0}".format(event.src_path))
                print(str(event.src_path).split("/").pop())
                filename = str(event.src_path).split("/").pop()
                filenameList = []
                filenameList.append(filename)
                time.sleep(1)
                # hander_files(filenameList)
                self.pools.submit(hander_files, filenameList)
    
    
    # --USERNAME=mongodb --PASSWORD=ACahlofh --MONGO_IP=192.168.18.129 --FILE_PATH=/home/sea/Desktop/flume/XXX/
    if __name__ == '__main__':
        iargs = sys.argv
        do_date = ''
        print("input args is :"+str(iargs))
        for ag in iargs:
            kv = ag.split("=")
            k = kv[0].replace("-", "")
            v = kv[-1]
            if not v:
                continue
            if k == "USERNAME":
                Constant.USERNAME = v
            if k == "PASSWORD":
                Constant.PASSWORD = v
            if k == "MONGO_IP":
                Constant.MONGO_IP = v
            if k == "FILE_PATH":
                Constant.FILE_PATH = v
            Constant.MONGO_CLIENT = 'mongodb://' + Constant.USERNAME + ':' + Constant.PASSWORD + '@' + Constant.MONGO_IP + '/'
        hander_path()
        observer = Observer()
        event_handler = FileEventHandler()
        observer.schedule(event_handler, Constant.FILE_PATH, True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

    代码中的环境变量也可以直接这么获取,这样在执行代码的时候,就不用显示的给值:

    import os
    def os_environ(evn_name,default=None):
        try:
           return os.environ[evn_name]
        except Exception as e:
            return default
    
    if __name__ == '__main__':
        environ = os_environ("JAVA_HOME1")
        print(environ)
        c = environ if environ else "default"
        print(c)
        print(os_environ("JAVA_HOME2","xxx"))

    docker 发布:

    Dockerfile:  FROM python:3.7.11-alpine3.13       43M

    FROM python:3.7.11-slim
    MAINTAINER Sea <lshan523@163.com>
    VOLUME /tmp
    RUN mkdir -p /opt/app/
    RUN chmod -R  777  /opt/app/
    RUN pip3 install --default-timeout=1000 --no-cache-dir --upgrade pip setuptools  pymongo  jsonpath watchdog
    COPY docker-entrypoint.sh /opt/app/
    #add python code
    COPY  lazada_move/   /opt/app/
    WORKDIR  /opt
    # set evn 
    ENV USERNAME=$USERNAME
    ENV PASSWORD=$PASSWORD
    ENV MONGO_IP=$MONGO_IP
    ENV FILE_PATH=$FILE_PATH
    
    ENTRYPOINT ["sh","/opt/app/docker-entrypoint.sh"]
    docker-entrypoint.sh:
    # Licensed to the Apache Software Foundation (ASF) under one
    # or more contributor license agreements.  See the NOTICE file
    # distributed with this work for additional information
    # regarding copyright ownership.  The ASF licenses this file
    # to you under the Apache License, Version 2.0 (the
    # "License"); you may not use this file except in compliance
    # with the License.  You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #!/bin/sh
    
    #if [ x"$AP_ENV" = x ]
    #then
    #   echo "AP_ENV IS NULL , USE DEFAULT DEV AS DEFAULT !"
    #   echo "dev=$AP_ENV"
    #else
    #   echo "AP_ENV IS $AP_ENV !"
    #fi
    #if [ x"$AGENT_SERVICE_NAME" = x ]
    #then
    #  echo "No MS Trace Agent $AGENT_COLLECTOR_ADDRESS Setting, @@ NOT USE MS TRACE  !"
    #  java -Denv=$AP_ENV -jar /opt/app.jar --apollo.meta=$CONFIG_SERVERS
    #else
    #  echo " USE MS TRACE Agent to start the service !"
    python3 /opt/app/PackageData.py --USERNAME=$USERNAME --PASSWORD=$PASSWORD --MONGO_IP=$MONGO_IP --FILE_PATH=$FILE_PATH

    条件参考:

    if [[ $FE_ROLE = 'fe-leader' ]]; then
        /home/doris/fe/bin/start_fe.sh
    elif [[ $FE_ROLE = 'be' ]]; then
        /home/doris/be/bin/start_be.sh
    elif [[ $FE_ROLE = 'fe-follower' ]]; then
        /home/doris/fe/bin/start_fe.sh --helper $FE_LEADER
    else
        /home/doris/fs_broker/bin/start_broker.sh
    fi

    build : sudo docker build -t  xxx  .

    run :

       

     /home/sea/Desktop/flume/XXX/   需要监听的文件path
    sudo docker run  -it  --privileged  -e 'USERNAME=mongodb' -e 'PASSWORD=sea' -e 'MONGO_IP=192.168.18.129' -e 'FILE_PATH=/home/sea/Desktop/flume/XXX/'  
    -v /home/sea/Desktop/flume/XXX/:/home/sea/Desktop/flume/XXX/
     -v /etc/localtime:/etc/localtime:ro    -v /etc/timezone:/etc/timezone    xxx
  • 相关阅读:
    网站结构之扁平结构与树形结构的区分
    如何提高网站的访问速度
    CSS透明度大汇总
    Microsoft.AlphaImageLoader滤镜讲解
    浏览器的渲染原理简介
    ACM思维题训练 Section A
    CF--思维练习--CodeForces
    CF--思维练习--CodeForces
    CF--思维练习--CodeForces
    CF思维联系--CodeForces
  • 原文地址:https://www.cnblogs.com/lshan/p/15093634.html
Copyright © 2011-2022 走看看