zoukankan      html  css  js  c++  java
  • python写的文件同步服务器

    服务端使用asyncore, 收到文件后保存到本地。

    客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。

    重点:

    1. 使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。

    2. 客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。

    上代码:

    服务端:

    查看代码
    ############################################################################################
    #                                                                                          #
    # receive file from client and store them into file use asyncore.                          #
    # author: roy.lieu@gmail.com                                                               #
    # vascloud@2012                                                                            #
    #                                                                                          #
    ###########################################################################################
    #/usr/bin/python
    #coding: utf-8
    import asyncore
    import socket
    from socket import errno
    import logging
    import time
    import sys
    import struct
    import os
    import fcntl
    import threading
    
    from rrd_graph import MakeGraph
    
    
    try:
        import rrdtool
    except (ImportError, ImportWarnning):
        print "Hope this information can help you:"
        print "Can not find pyinotify module in sys path, just run [apt-get install python-rrdtool] in ubuntu."
        sys.exit(1)
    
    
    
    class RequestHandler(asyncore.dispatcher):
        def __init__(self, sock, map=None, chunk_size=1024):
            self.logger = logging.getLogger('%s-%s' % (self.__class__.__name__, str(sock.getsockname())))
            self.chunk_size = chunk_size
            asyncore.dispatcher.__init__(self,sock,map)
            self.data_to_write = list()
        
        def readable(self):
            #self.logger.debug("readable() called.")
            return True
            
        def writable(self):
            response = (not self.connected) or len(self.data_to_write)
            #self.logger.debug('writable() -> %s data length -> %s' % (response, len(self.data_to_write)))
            return response
            
        def handle_write(self):
            data = self.data_to_write.pop()
            #self.logger.debug("handle_write()->%s size: %s",data.rstrip('\r\n'),len(data))
            sent = self.send(data[:self.chunk_size])
            if sent < len(data):
                remaining = data[sent:]
                self.data_to_write.append(remaining)
            
        def handle_read(self):
            self.writen_size = 0
            nagios_perfdata = '../perfdata'
            head_packet_format = "!LL128s128sL"
            head_packet_size = struct.calcsize(head_packet_format)
            data = self.recv(head_packet_size)
            if not data:
                return
            filepath_len, filename_len, filepath,filename, filesize = struct.unpack(head_packet_format,data)
            filepath = os.path.join(nagios_perfdata, filepath[:filepath_len])
            filename = filename[:filename_len]
            self.logger.debug("update file: %s" % filepath + '/' + filename)
            try:
                if not os.path.exists(filepath):
                    os.makedirs(filepath)
            except OSError:
                pass
            
            self.fd = open(os.path.join(filepath,filename), 'w')
            #self.fd = open(filename,'w')
            
            if filesize > self.chunk_size:
                times = filesize / self.chunk_size
                first_part_size = times * self.chunk_size
                second_part_size = filesize % self.chunk_size
                
                while 1:
                    try:
                        data = self.recv(self.chunk_size)
                        #self.logger.debug("handle_read()->%s size.",len(data))
                    except socket.error,e:
                        if e.args[0] == errno.EWOULDBLOCK:
                            print "EWOULDBLOCK"
                            time.sleep(1)
                        else:
                            #self.logger.debug("Error happend while receive data: %s" % e)
                            break
                    else:
                        self.fd.write(data)
                        self.fd.flush()
                        self.writen_size += len(data)
                        if self.writen_size == first_part_size:
                            break
                
                #receive the packet at last
                while 1:
                    try:
                        data = self.recv(second_part_size)
                        #self.logger.debug("handle_read()->%s size.",len(data))
                    except socket.error,e:
                        if e.args[0] == errno.EWOULDBLOCK:
                            print "EWOULDBLOCK"
                            time.sleep(1)
                        else:
                            #self.logger.debug("Error happend while receive data: %s" % e)
                            break
                    else:
                        self.fd.write(data)
                        self.fd.flush()
                        self.writen_size += len(data)
                        if len(data) == second_part_size:
                            break
                        
            elif filesize <= self.chunk_size:
                while 1:
                    try:
                        data = self.recv(filesize)
                        #self.logger.debug("handle_read()->%s size.",len(data))
                    except socket.error,e:
                        if e.args[0] == errno.EWOULDBLOCK:
                            print "EWOULDBLOCK"
                            time.sleep(1)
                        else:
                            #self.logger.debug("Error happend while receive data: %s" % e)
                            break
                    else:
                        self.fd.write(data)
                        self.fd.flush()
                        self.writen_size += len(data)
                        if len(data) == filesize:
                            break
                        
            self.logger.debug("File size: %s" % self.writen_size)
        
    
    class SyncServer(asyncore.dispatcher):
        def __init__(self,host,port):
            asyncore.dispatcher.__init__(self)
            self.debug = True
            self.logger = logging.getLogger(self.__class__.__name__)
            self.create_socket(socket.AF_INET,socket.SOCK_STREAM)
            self.set_reuse_addr()
            self.bind((host,port))
            self.listen(2000)
            
        def handle_accept(self):
            client_socket = self.accept()
            if client_socket is None:
                pass
            else:
                sock, addr = client_socket
                #self.logger.debug("Incoming connection from %s" % repr(addr))
                handler = RequestHandler(sock=sock)
    
    class RunServer(threading.Thread):
        def __init__(self):
            super(RunServer,self).__init__()
            self.daemon = False
        
        def run(self):
            server = SyncServer('',9999)
            asyncore.loop(use_poll=True)
    
    
    
    def StartServer():
        logging.basicConfig(level=logging.DEBUG,
                            format='%(name)s: %(message)s',
                            )
        RunServer().start()
        #MakeGraph().start()
    
    if __name__ == '__main__':
        StartServer()

    客户端:

    查看代码
      1 ############################################################################################
      2 #                                                                                          #
      3 # monitor path with inotify(python module), and send them to remote server.                #
      4 # use sendfile(2) instead of send function in socket, if we have python-sendfile installed.#
      5 # author: roy.lieu@gmail.com                                                               #
      6 # vascloud@2012                                                                            #
      7 #                                                                                          #
      8 ###########################################################################################
      9 import socket
     10 import time
     11 import os
     12 import sys
     13 import struct
     14 import threading
     15 import Queue
     16 
     17 try:
     18     import pyinotify
     19 except (ImportError, ImportWarnning):
     20     print "Hope this information can help you:"
     21     print "Can not find pyinotify module in sys path, just run [apt-get install python-pyinotify] in ubuntu."
     22     sys.exit(1)
     23 
     24 try:
     25     from sendfile import sendfile
     26 except (ImportError,ImportWarnning):
     27     pass
     28 
     29 filetype_filter = [".rrd",".xml"]
     30 
     31 def check_filetype(pathname):
     32     for suffix_name in filetype_filter:
     33         if pathname[-4:] == suffix_name:
     34             return True
     35     try:
     36         end_string = pathname.rsplit('.')[-1:][0]
     37         end_int = int(end_string)
     38     except:
     39         pass
     40     else:
     41         # means pathname endwith digit
     42         return False
     43 
     44 
     45 class sync_file(threading.Thread):
     46     def __init__(self, addr, events_queue):
     47         super(sync_file,self).__init__()
     48         self.daemon = False
     49         self.queue = events_queue
     50         self.addr = addr
     51         self.chunk_size = 1024
     52 
     53     def run(self):
     54         while 1:
     55             event = self.queue.get()
     56             if check_filetype(event.pathname):
     57                 print time.asctime(),event.maskname, event.pathname
     58                 filepath = event.path.split('/')[-1:][0]
     59                 filename = event.name
     60                 filesize = os.stat(os.path.join(event.path, filename)).st_size
     61                 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     62                 filepath_len = len(filepath)
     63                 filename_len = len(filename)
     64                 sock.connect(self.addr)
     65                 offset = 0
     66                 
     67                 data = struct.pack("!LL128s128sL",filepath_len, filename_len, filepath,filename,filesize)
     68                 fd = open(event.pathname,'rb')
     69                 sock.sendall(data)
     70                 
     71                 if "sendfile" in sys.modules:
     72                     # print "use sendfile(2)"
     73                     while 1:
     74                         sent = sendfile(sock.fileno(), fd.fileno(), offset, self.chunk_size)
     75                         if sent == 0:
     76                             break
     77                         offset += sent
     78                 else:
     79                     # print "use original send function"
     80                     while 1:
     81                         data = fd.read(self.chunk_size)
     82                         if not data: break
     83                         sock.send(data)
     84                     
     85                 sock.close()
     86                 fd.close()
     87 
     88 
     89 class EventHandler(pyinotify.ProcessEvent):
     90     def __init__(self, events_queue):
     91         super(EventHandler,self).__init__()
     92         self.events_queue = events_queue
     93     
     94     def my_init(self):
     95         pass
     96     
     97     def process_IN_CLOSE_WRITE(self,event):
     98         self.events_queue.put(event)
     99     
    100     def process_IN_MOVED_TO(self,event):
    101         self.events_queue.put(event)
    102     
    103 
    104 
    105 def start_notify(path, mask, sync_server):
    106     events_queue = Queue.Queue()
    107     sync_thread_pool = list()
    108     for i in range(500):
    109         sync_thread_pool.append(sync_file(sync_server, events_queue))
    110     for i in sync_thread_pool:
    111         i.start()
    112     
    113     wm = pyinotify.WatchManager()
    114     notifier = pyinotify.Notifier(wm,EventHandler(events_queue))
    115     wdd = wm.add_watch(path,mask,rec=True)
    116     notifier.loop()
    117 
    118 
    119 def do_notify():
    120     perfdata_path = '/var/lib/pnp4nagios/perfdata'
    121     mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO
    122     sync_server = ('127.0.0.1',9999)
    123     start_notify(perfdata_path,mask,sync_server)
    124 
    125 
    126 if __name__ == '__main__':
    127     do_notify()
    128     
  • 相关阅读:
    如何控制input框!
    火车头采集器破解版
    记Angular与Django REST框架的一次合作(2):前端组件化——Angular
    拉勾网一些“震惊”的结论
    一个知乎重度用户眼中的知乎
    anthelion编译
    搜索引擎爬虫蜘蛛的USERAGENT大全
    Netty系列之Netty高性能之道
    python正则表达式
    Cookie的使用,详解,获取,无法互通、客户端获取Cookie、深入解析cookie
  • 原文地址:https://www.cnblogs.com/huazi/p/2621477.html
Copyright © 2011-2022 走看看