python 监听文件夹下的文件,将文本内容写入kafka,支持断电续传 (docker 发布)
1.代码
2.docker 部署
1.代码:
python 监听文件夹下的文件,将文本内容写入kafka,支持断电续传 (docker 发布) 1.代码:
#! /usr/bin/env python3 # coding = utf-8 import json import os import sys import time from concurrent.futures.thread import ThreadPoolExecutor import Constant import MongoData import jsonpath import LogConfig import logging from watchdog.observers import Observer from watchdog.events import * """ @Creation Date : Aug 3, 2021 11:35:23 AM @Author : Sea """ def get_file_name(file_dir): for root, dirs, files in os.walk(file_dir): return files def hander_files(dirs): """ fileNames """ booking_no_list = [] booking_map = {} s = time.time() print("start to parse files") logging.info("start to parse files") for name in dirs: if name.endswith(".COMPLETED"): continue else: pass # 1.读取文件内容 # f = open(Constant.FILE_PATH + name, "r") # f_str = f.read() # doing some thing os.rename(Constant.FILE_PATH + name, Constant.FILE_PATH + name+".COMPLETED") def hander_path(): dirs = get_file_name(Constant.FILE_PATH) hander_files(dirs) class FileEventHandler(FileSystemEventHandler): def __init__(self): FileSystemEventHandler.__init__(self) self.pools = ThreadPoolExecutor(2) def on_created(self, event): if event.is_directory: pass # print("directory created:{0}".format(event.src_path)) else: # print("file created:{0}".format(event.src_path)) print(str(event.src_path).split("/").pop()) filename = str(event.src_path).split("/").pop() filenameList = [] filenameList.append(filename) time.sleep(1) # hander_files(filenameList) self.pools.submit(hander_files, filenameList) # --USERNAME=mongodb --PASSWORD=ACahlofh --MONGO_IP=192.168.18.129 --FILE_PATH=/home/sea/Desktop/flume/XXX/ if __name__ == '__main__': iargs = sys.argv do_date = '' print("input args is :"+str(iargs)) for ag in iargs: kv = ag.split("=") k = kv[0].replace("-", "") v = kv[-1] if not v: continue if k == "USERNAME": Constant.USERNAME = v if k == "PASSWORD": Constant.PASSWORD = v if k == "MONGO_IP": Constant.MONGO_IP = v if k == "FILE_PATH": Constant.FILE_PATH = v Constant.MONGO_CLIENT = 'mongodb://' + Constant.USERNAME + ':' + Constant.PASSWORD + '@' + Constant.MONGO_IP + '/' hander_path() observer = Observer() event_handler = FileEventHandler() observer.schedule(event_handler, Constant.FILE_PATH, True) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()
代码中的环境变量也可以直接这么获取,这样在执行代码的时候,就不用显示的给值:
import os def os_environ(evn_name,default=None): try: return os.environ[evn_name] except Exception as e: return default if __name__ == '__main__': environ = os_environ("JAVA_HOME1") print(environ) c = environ if environ else "default" print(c) print(os_environ("JAVA_HOME2","xxx"))
docker 发布:
Dockerfile: FROM python:3.7.11-alpine3.13 43M
FROM python:3.7.11-slim MAINTAINER Sea <lshan523@163.com> VOLUME /tmp RUN mkdir -p /opt/app/ RUN chmod -R 777 /opt/app/ RUN pip3 install --default-timeout=1000 --no-cache-dir --upgrade pip setuptools pymongo jsonpath watchdog COPY docker-entrypoint.sh /opt/app/ #add python code COPY lazada_move/ /opt/app/ WORKDIR /opt # set evn ENV USERNAME=$USERNAME ENV PASSWORD=$PASSWORD ENV MONGO_IP=$MONGO_IP ENV FILE_PATH=$FILE_PATH ENTRYPOINT ["sh","/opt/app/docker-entrypoint.sh"]
docker-entrypoint.sh:
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #!/bin/sh #if [ x"$AP_ENV" = x ] #then # echo "AP_ENV IS NULL , USE DEFAULT DEV AS DEFAULT !" # echo "dev=$AP_ENV" #else # echo "AP_ENV IS $AP_ENV !" #fi #if [ x"$AGENT_SERVICE_NAME" = x ] #then # echo "No MS Trace Agent $AGENT_COLLECTOR_ADDRESS Setting, @@ NOT USE MS TRACE !" # java -Denv=$AP_ENV -jar /opt/app.jar --apollo.meta=$CONFIG_SERVERS #else # echo " USE MS TRACE Agent to start the service !" python3 /opt/app/PackageData.py --USERNAME=$USERNAME --PASSWORD=$PASSWORD --MONGO_IP=$MONGO_IP --FILE_PATH=$FILE_PATH
条件参考:
if [[ $FE_ROLE = 'fe-leader' ]]; then /home/doris/fe/bin/start_fe.sh elif [[ $FE_ROLE = 'be' ]]; then /home/doris/be/bin/start_be.sh elif [[ $FE_ROLE = 'fe-follower' ]]; then /home/doris/fe/bin/start_fe.sh --helper $FE_LEADER else /home/doris/fs_broker/bin/start_broker.sh fi
build : sudo docker build -t xxx .
run :
/home/sea/Desktop/flume/XXX/ 需要监听的文件path
sudo docker run -it --privileged -e 'USERNAME=mongodb' -e 'PASSWORD=sea' -e 'MONGO_IP=192.168.18.129' -e 'FILE_PATH=/home/sea/Desktop/flume/XXX/'
-v /home/sea/Desktop/flume/XXX/:/home/sea/Desktop/flume/XXX/
-v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone xxx