zoukankan      html  css  js  c++  java
  • 简单的文件上传下载服务器

    #!/usr/bin/env python
    
    __version__ = "0.1"
    
    import os
    import posixpath
    import BaseHTTPServer
    import urllib
    import cgi
    import shutil
    import mimetypes
    import re
    try:
        from cStringIO import StringIO
    except ImportError:
        from StringIO import StringIO
    
    
    class SimpleHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
        server_version = "SimpleHTTPWithUpload/" + __version__
    
        def do_GET(self):
            """Serve a GET request."""
            f = self.send_head()
            if f:
                self.copyfile(f, self.wfile)
                f.close()
    
        def do_HEAD(self):
            """Serve a HEAD request."""
            f = self.send_head()
            if f:
                f.close()
    
        def do_POST(self):
            """Serve a POST request."""
            r, info = self.deal_post_data()
            print r, info, "by: ", self.client_address
            f = StringIO()
            f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
            f.write("<html>
    <title>Upload Result Page</title>
    ")
            f.write("<body>
    <h2>Upload Result Page</h2>
    ")
            f.write("<hr>
    ")
            if r:
                f.write("<strong>Success:</strong>")
            else:
                f.write("<strong>Failed:</strong>")
            f.write(info)
            f.write("<br><a href="%s">back</a>" % self.headers['referer'])
            f.write("<hr><small>Powered By: bones7456, check new version at ")
            f.write("<a href="http://luy.li/?s=SimpleHTTPServerWithUpload">")
            f.write("here</a>.</small></body>
    </html>
    ")
            length = f.tell()
            f.seek(0)
            self.send_response(200)
            self.send_header("Content-type", "text/html")
            self.send_header("Content-Length", str(length))
            self.end_headers()
            if f:
                self.copyfile(f, self.wfile)
                f.close()
    
        def deal_post_data(self):
            boundary = self.headers.plisttext.split("=")[1]
            remainbytes = int(self.headers['content-length'])
            line = self.rfile.readline()
            remainbytes -= len(line)
            if not boundary in line:
                return (False, "Content NOT begin with boundary")
            line = self.rfile.readline()
            remainbytes -= len(line)
            fn = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', line)
            if not fn:
                return (False, "Can't find out file name...")
            path = self.translate_path(self.path)
            fn = os.path.join(path, fn[0])
            while os.path.exists(fn):
                fn += "_"
            line = self.rfile.readline()
            remainbytes -= len(line)
            line = self.rfile.readline()
            remainbytes -= len(line)
            try:
                out = open(fn, 'wb')
            except IOError:
                return (False, "Can't create file to write, do you have permission to write?")
    
            preline = self.rfile.readline()
            remainbytes -= len(preline)
            while remainbytes > 0:
                line = self.rfile.readline()
                remainbytes -= len(line)
                if boundary in line:
                    preline = preline[0:-1]
                    if preline.endswith('
    '):
                        preline = preline[0:-1]
                    out.write(preline)
                    out.close()
                    return (True, "File '%s' upload success!" % fn)
                else:
                    out.write(preline)
                    preline = line
            return (False, "Unexpect Ends of data.")
    
        def send_head(self):
            path = self.translate_path(self.path)
            f = None
            if os.path.isdir(path):
                if not self.path.endswith('/'):
                    # redirect browser - doing basically what apache does
                    self.send_response(301)
                    self.send_header("Location", self.path + "/")
                    self.end_headers()
                    return None
                for index in "index.html", "index.htm":
                    index = os.path.join(path, index)
                    if os.path.exists(index):
                        path = index
                        break
                else:
                    return self.list_directory(path)
            ctype = self.guess_type(path)
            try:
                # Always read in binary mode. Opening files in text mode may cause
                # newline translations, making the actual size of the content
                # transmitted *less* than the content-length!
                f = open(path, 'rb')
            except IOError:
                self.send_error(404, "File not found")
                return None
            self.send_response(200)
            self.send_header("Content-type", ctype)
            fs = os.fstat(f.fileno())
            self.send_header("Content-Length", str(fs[6]))
            self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
            self.end_headers()
            return f
    
        def list_directory(self, path):
            try:
                list = os.listdir(path)
            except os.error:
                self.send_error(404, "No permission to list directory")
                return None
            list.sort(key=lambda a: a.lower())
            f = StringIO()
            displaypath = cgi.escape(urllib.unquote(self.path))
            f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
            f.write("<html>
    <title>Directory listing for %s</title>
    " % displaypath)
            f.write("<body>
    <h2>Directory listing for %s</h2>
    " % displaypath)
            f.write("<hr>
    ")
            f.write("<form ENCTYPE="multipart/form-data" method="post">")
            f.write("<input name="file" type="file"/>")
            f.write("<input type="submit" value="upload"/></form>
    ")
            f.write("<hr>
    <ul>
    ")
            for name in list:
                fullname = os.path.join(path, name)
                displayname = linkname = name
                # Append / for directories or @ for symbolic links
                if os.path.isdir(fullname):
                    displayname = name + "/"
                    linkname = name + "/"
                if os.path.islink(fullname):
                    displayname = name + "@"
                    # Note: a link to a directory displays with @ and links with /
                f.write('<li><a href="%s">%s</a>
    '
                        % (urllib.quote(linkname), cgi.escape(displayname)))
            f.write("</ul>
    <hr>
    </body>
    </html>
    ")
            length = f.tell()
            f.seek(0)
            self.send_response(200)
            self.send_header("Content-type", "text/html")
            self.send_header("Content-Length", str(length))
            self.end_headers()
            return f
    
        def translate_path(self, path):
            path = path.split('?',1)[0]
            path = path.split('#',1)[0]
            path = posixpath.normpath(urllib.unquote(path))
            words = path.split('/')
            words = filter(None, words)
            path = os.getcwd()
            for word in words:
                drive, word = os.path.splitdrive(word)
                head, word = os.path.split(word)
                if word in (os.curdir, os.pardir): continue
                path = os.path.join(path, word)
            return path
    
        def copyfile(self, source, outputfile):
            shutil.copyfileobj(source, outputfile)
    
        def guess_type(self, path):
            base, ext = posixpath.splitext(path)
            if ext in self.extensions_map:
                return self.extensions_map[ext]
            ext = ext.lower()
            if ext in self.extensions_map:
                return self.extensions_map[ext]
            else:
                return self.extensions_map['']
    
        if not mimetypes.inited:
            mimetypes.init() # try to read system mime.types
        extensions_map = mimetypes.types_map.copy()
        extensions_map.update({
            '': 'application/octet-stream', # Default
            '.py': 'text/plain',
            '.c': 'text/plain',
            '.h': 'text/plain',
            })
    
    
    def test(HandlerClass = SimpleHTTPRequestHandler,
             ServerClass = BaseHTTPServer.HTTPServer):
            server_address = ('192.168.4.231', 80)
            httpd = ServerClass(server_address, HandlerClass)
            print  'it is   very GOOD '
            httpd.serve_forever()
    
    if __name__ == '__main__':
            test()
  • 相关阅读:
    C++实现base64编码
    php实现base64编码
    美团2016研发工程师笔试题(绑鞋带问题)
    URL encode 与 URL decode 的C语言实现
    常用排序算法集合-C实现
    用文件实现计算器要求多进程同时写
    vim操作命令-笔记
    Can't connect to local MySQL server through socket
    小程序页面跳转数据丢失
    Vue路由 --登录状态的判断
  • 原文地址:https://www.cnblogs.com/yubenliu/p/6207711.html
Copyright © 2011-2022 走看看