zoukankan      html  css  js  c++  java
  • 关于gevent的一些理解(二)

    3 实际应用

    1 zeromq和gevent:

    zeromq的介绍请参看:http://www.infoq.com/cn/news/2010/09/introduction-zero-mq

    假设你已经安装了zeromq,gevent_zeromq(https://github.com/traviscline/gevent-zeromq.git)和pyzmq

    一个很基础的例子:

    import gevent
    from gevent_zeromq import zmq
    
    # Global Context
    context = zmq.Context() #它是GreenContext的一个简写,确保greenlet化socket
    
    def server():
        server_socket = context.socket(zmq.REQ) #创建一个socket,使用mq类型模式REQ/REP(请求/回复,服务器是请求),还有PUB/SUB(发布/订阅),push/pull等
        server_socket.bind("tcp://127.0.0.1:5000") #绑定socket
    
        for request in range(1,10):
            server_socket.send("Hello")
            print('Switched to Server for ', request)
            server_socket.recv()  #这里发生上下文切换
    
    def client():
        client_socket = context.socket(zmq.REP)  (客户端是回复)
        client_socket.connect("tcp://127.0.0.1:5000")  #连接server的socket端口
    
        for request in range(1,10):
    
            client_socket.recv()
            print('Switched to Client for ', request)
            client_socket.send("World")
    
    publisher = gevent.spawn(server)
    client    = gevent.spawn(client)
    
    gevent.joinall([publisher, client])

    执行结果:

    [root@248_STAT ~]# python test.py
    (‘Switched to Server for ‘, 1)
    (‘Switched to Client for ‘, 1)
    (‘Switched to Server for ‘, 2)
    (‘Switched to Client for ‘, 2)
    (‘Switched to Server for ‘, 3)
    (‘Switched to Client for ‘, 3)
    (‘Switched to Server for ‘, 4)
    (‘Switched to Client for ‘, 4)
    (‘Switched to Server for ‘, 5)
    (‘Switched to Client for ‘, 5)
    (‘Switched to Server for ‘, 6)
    (‘Switched to Client for ‘, 6)
    (‘Switched to Server for ‘, 7)
    (‘Switched to Client for ‘, 7)
    (‘Switched to Server for ‘, 8)
    (‘Switched to Client for ‘, 8)
    (‘Switched to Server for ‘, 9)
    (‘Switched to Client for ‘, 9)

    2 telnet 服务器

    from gevent.server import StreamServer #StreamServer是一个通用的TCP服务器
    
    def handle(socket, address):
        socket.send("Hello from a telnet!n")
        for i in range(5):
            socket.send(str(i) + 'n') #给socket客户端发送数据
        socket.close() #关闭客户端连接
    
    server = StreamServer(('127.0.0.1', 5000), handle) #当出现连接调用定义的方法handle
    server.serve_forever()
     

    执行结果:

    dongwm@localhost ~ $ nc 127.0.0.1 5000
    Hello from a telnet!
    0
    1
    2
    3
    4
    dongwm@localhost ~ $ telnet 127.0.0.1 5000
    Trying 127.0.0.1…
    Connected to 127.0.0.1.
    Escape character is ‘^]’.
    Hello from a telnet!
    0
    1
    2
    3
    4
    Connection closed by foreign host.
    3 wsgi服务器

    from gevent.wsgi import WSGIServer
    
    def application(environ, start_response):
        status = '200 OK' #页面状态指定为200 ok
        body = '<p>Hello World</p>'
    
        headers = [
            ('Content-Type', 'text/html')
        ]
    
        start_response(status, headers)
        return [body]
    
    WSGIServer(('', 8000), application).serve_forever() #启动一个占用8000端口的wsgi服务器
    
    from gevent.pywsgi import WSGIServer #使用pywsgi可以我们自己定义产生结果的处理引擎
    def application(environ, start_response):
        status = '200 OK'
    
        headers = [
            ('Content-Type', 'text/html')
        ]
    
        start_response(status, headers)
        yield "<p>Hello" #yield出数据
        yield "World</p>"
    
    WSGIServer(('', 8000), application).serve_forever()

    我们看一个用ab(Apache Benchmark)的性能测试(更多信息请查看http://nichol.as/benchmark-of-python-web-servers),我这里只

    对比了gevent和paste的性能比(没做系统优化,只是在同样条件下看性能差距):

    paste的wsgi程序:

    from gevent.wsgi import WSGIServer
    
    def application(environ, start_response):
        status = '200 OK'
        body = '<p>Hello World</p>'
    
        headers = [
            ('Content-Type', 'text/html')
        ]
    
        start_response(status, headers)
        return [body]
    
    #WSGIServer(('', 8000), application).serve_forever()
    from paste import httpserver
    httpserver.serve(application, '0.0.0.0', request_queue_size=500)
     

    dongwm@localhost ~ $ /usr/sbin/ab2 -n 10000 -c 100 http://127.0.0.1:8000/ #gevent的性能,条件是:并发100,请求1W
    This is ApacheBench, Version 2.3 <$Revision: 655654 $>
    Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
    Licensed to The Apache Software Foundation, http://www.apache.org/

    Benchmarking 127.0.0.1 (be patient)
    Completed 1000 requests
    Completed 2000 requests
    Completed 3000 requests
    Completed 4000 requests
    Completed 5000 requests
    Completed 6000 requests
    Completed 7000 requests
    Completed 8000 requests
    Completed 9000 requests
    Completed 10000 requests
    Finished 10000 requests

    Server Software:
    Server Hostname: 127.0.0.1
    Server Port: 8000

    Document Path: /
    Document Length: 18 bytes

    Concurrency Level: 100
    Time taken for tests: 2.805 seconds
    Complete requests: 10000
    Failed requests: 0
    Write errors: 0
    Total transferred: 1380000 bytes
    HTML transferred: 180000 bytes
    Requests per second: 3564.90 [#/sec] (mean)
    Time per request: 28.051 [ms] (mean)
    Time per request: 0.281 [ms] (mean, across all concurrent requests)
    Transfer rate: 480.43 [Kbytes/sec] received

    Connection Times (ms)
    min mean[+/-sd] median max
    Connect: 0 0 0.2 0 2
    Processing: 2 28 15.1 27 69
    Waiting: 1 28 15.1 27 69
    Total: 2 28 15.1 27 69

    Percentage of the requests served within a certain time (ms)
    50% 27
    66% 35
    75% 40
    80% 42
    90% 48
    95% 54
    98% 59
    99% 62
    100% 69 (longest request)

    dongwm@localhost ~ $ /usr/sbin/ab2 -n 10000 -c 100 http://127.0.0.1:8080/ #paste的性能
    This is ApacheBench, Version 2.3 <$Revision: 655654 $>
    Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
    Licensed to The Apache Software Foundation, http://www.apache.org/

    Benchmarking 127.0.0.1 (be patient)
    Completed 1000 requests
    Completed 2000 requests
    Completed 3000 requests
    Completed 4000 requests
    Completed 5000 requests
    Completed 6000 requests
    Completed 7000 requests
    Completed 8000 requests
    Completed 9000 requests
    Completed 10000 requests
    Finished 10000 requests

    Server Software: PasteWSGIServer/0.5
    Server Hostname: 127.0.0.1
    Server Port: 8080

    Document Path: /
    Document Length: 18 bytes

    Concurrency Level: 100
    Time taken for tests: 4.119 seconds
    Complete requests: 10000
    Failed requests: 0
    Write errors: 0
    Total transferred: 1600000 bytes
    HTML transferred: 180000 bytes
    Requests per second: 2427.52 [#/sec] (mean)
    Time per request: 41.194 [ms] (mean)
    Time per request: 0.412 [ms] (mean, across all concurrent requests)
    Transfer rate: 379.30 [Kbytes/sec] received

    Connection Times (ms)
    min mean[+/-sd] median max
    Connect: 0 0 0.2 0 2
    Processing: 2 41 5.4 41 107
    Waiting: 1 41 5.2 40 97
    Total: 4 41 5.3 41 107

    Percentage of the requests served within a certain time (ms)
    50% 41
    66% 41
    75% 42
    80% 43
    90% 46
    95% 50
    98% 56
    99% 59
    100% 107 (longest request)

    很不好理解吧,那我把数据直接整理下:

    1 测试用时:

    Time taken for tests: 2.805 seconds #gevent

    Time taken for tests: 4.119 seconds #paste 花费时间更长
    2 每秒请求数:

    Requests per second: 3564.90 [#/sec] (mean) #gevent的嘛,每秒请求数大的多
    Requests per second: 2427.52 [#/sec] (mean) #paste

    3 每请求数耗时:

    Time per request: 28.051 [ms] (mean) #gevent耗时少
    Time per request: 0.281 [ms] (mean, across all concurrent requests) #gevent并发请求时耗时少
    Time per request: 41.194 [ms] (mean) #paste
    Time per request: 0.412 [ms] (mean, across all concurrent requests) #paste

    4 传输效率:

    Transfer rate: 448.26 [Kbytes/sec] received #gevent的效率更高
    Transfer rate: 379.30 [Kbytes/sec] received #paste

    5 连接消耗的时间的分解:

    Connection Times (ms)
    min mean[+/-sd] median max
    Connect: 0 0 0.2 0 2
    Processing: 2 28 15.1 27 69
    Waiting: 1 28 15.1 27 69
    Total: 2 28 15.1 27 69

    Connection Times (ms) #paste
    min mean[+/-sd] median max
    Connect: 0 0 0.2 0 2
    Processing: 2 41 5.4 41 107
    Waiting: 1 41 5.2 40 97
    Total: 4 41 5.3 41 107 #明显其中最大用时107/97都大于gevent的69ms,最小用时gevent略强

    6 整个场景中所有请求的响应情况。在场景中每个请求都有一个响应时间

    Percentage of the requests served within a certain time (ms) #gevent
    50% 29
    66% 31
    75% 34
    80% 34
    90% 36
    95% 38
    98% 42
    99% 44
    100% 71 (longest request)

    可以这样理解:50%用户效应小于29ms,60%用户响应小于31ms,最长的访问响应为71ms
    Percentage of the requests served within a certain time (ms) #paste
    50% 41
    66% 41
    75% 42
    80% 43
    90% 46
    95% 50
    98% 56
    99% 59
    100% 107 (longest request) #很明显,无论那个区间,paste性能都略差

    4 长轮询

    import gevent
    from gevent.queue import Queue, Empty
    from gevent.pywsgi import WSGIServer
    import json
    
    data_source = Queue()
    
    def producer():
        while True:
            data_source.put_nowait('Hello World') #往队列非阻塞的放入数据
            gevent.sleep(1)
    
    def ajax_endpoint(environ, start_response):
        status = '200 OK'
        headers = [
            ('Content-Type', 'application/json') #设定网络文件的类型是json
        ]
        try:
            datum = data_source.get(timeout=5)
        except Empty:
            datum = [] #假如gevent.sleep的时间设置的长一些(比如5s),在不停刷新过程中会获得空列表
    
        start_response(status, headers)
        return json.dumps(datum) #返回数据,打印出来的数据是一个带引号的字符串
    
    gevent.spawn(producer)
    
    WSGIServer(('', 8000), ajax_endpoint).serve_forever()
     

    4 聊天室(源码在这里https://github.com/sdiehl/minichat.git):

    from gevent import monkey

    monkey.patch_all() #给模块打包
    from flask import Flask, render_template, request, json #作者在这里使用了flask框架,当然你也可以用其它比如django.tornado,bottle等
    
    from gevent import queue
    from gevent.pywsgi import WSGIServer
    
    app = Flask(__name__)
    app.debug = True
    
    class Room(object):
    
        def __init__(self):
            self.users = set()
            self.messages = []
    
        def backlog(self, size=25):
            return self.messages[-size:]
    
        def subscribe(self, user):
            self.users.add(user)
    
        def add(self, message):
            for user in self.users:
                print user
                user.queue.put_nowait(message)
            self.messages.append(message)
    
    class User(object):
    
        def __init__(self):
            self.queue = queue.Queue()
    
    rooms = {
        'python': Room(),
        'django': Room(),
    }
    
    users = {}
    
    @app.route('/') #flask指定url的处理使用路由的方式,访问页面地址根目录就会执行choose_name
    def choose_name():
        return render_template('choose.html') #然后调用模板choose.html,这个html文件最后使用了GET方法提交了一个uid页面(/<uid>)
    
    @app.route('/<uid>') #请求被转到了这里
    def main(uid):
        return render_template('main.html', #调用模板提供几个room的连接
            uid=uid,
            rooms=rooms.keys() #格局选择的连接,通过GET方法转到那个相应url:/<room>/<uid>
        )
    
    @app.route('/<room>/<uid>') #请求被转到了这里
    def join(room, uid):
        user = users.get(uid, None)
    
        if not user:
            users[uid] = user = User()
    
        active_room = rooms[room]
        active_room.subscribe(user)
        print 'subscribe', active_room, user
    
        messages = active_room.backlog()
    
        return render_template('room.html', #room.html包含一个POST提交方式,把你的聊天数据提交,并且更新页面(通过jquery的ajax调用url/poll/<uid>)
            room=room, uid=uid, messages=messages)
    
    @app.route("/put/<room>/<uid>", methods=["POST"]) #通过这个url
    def put(room, uid):
        user = users[uid]
        room = rooms[room]
    
        message = request.form['message']
        room.add(':'.join([uid, message]))
    
        return ''
    
    @app.route("/poll/<uid>", methods=["POST"])
    def poll(uid):
        try:
            msg = users[uid].queue.get(timeout=10)
        except queue.Empty:
            msg = []
        return json.dumps(msg) #返回队列中包含的聊天记录
    
    if __name__ == "__main__":
        http = WSGIServer(('', 5000), app)
        http.serve_forever()

    来一个更复杂带有前台后端的模型(例子来自http://blog.pythonisito.com/2011/07/gevent-zeromq-websockets-and-flot-ftw.html):

    源码在:http://dl.dropbox.com/u/24086834/blog/20110723/zmq_websocket.tar.gz

    其中需要修改graph.js第二行:

    var ws = new WebSocket(“ws://localhost:9999/test”);

    为:

    var ws = new MozWebSocket(“ws://localhost:9999/test”); #因为我的火狐用的websocket不同

    这个demo.py,我来解析下:

    import os

    import time
    import math
    import json
    import webbrowser
    
    import paste.urlparser #paste是一个WSGI工具包,在WSGI的基础上包装了几层,让应用管理和实现变得方便
    
    import gevent
    from gevent_zeromq import zmq
    from geventwebsocket.handler import WebSocketHandler #基于gevent的pywsgi的WebSocket的处理程序
    
    def main(): #主方法
        context = zmq.Context()
        gevent.spawn(zmq_server, context) #上个例子使用joinall,这个例子是spawn+start,context是参数,也就是实例化的GreenContext
        ws_server = gevent.pywsgi.WSGIServer(
            ('', 9999), WebSocketApp(context),
            handler_class=WebSocketHandler)
        http_server = gevent.pywsgi.WSGIServer(
            ('', 8000),
            paste.urlparser.StaticURLParser(os.path.dirname(__file__))) # paste.urlparser用来处理url和静态文件
        http_server.start()  #启动grennlet实例
        ws_server.start()
        webbrowser.open('http://localhost:8000/graph.html') #启动浏览器看这个页面,当正常启动后js会画图
        zmq_producer(context)
    
    def zmq_server(context):
        sock_incoming = context.socket(zmq.SUB)
        sock_outgoing = context.socket(zmq.PUB)
        sock_incoming.bind('tcp://*:5000') #发布绑定
        sock_outgoing.bind('inproc://queue') #订阅绑定,本地(通过内存)进程(线程间)通信传输
        sock_incoming.setsockopt(zmq.SUBSCRIBE, "") #这里表示对发布的所有信息都订阅
        while True:
            msg = sock_incoming.recv()
            sock_outgoing.send(msg)
    
    class WebSocketApp(object):
    
        def __init__(self, context):
            self.context = context
    
        def __call__(self, environ, start_response):
            ws = environ['wsgi.websocket']
            sock = self.context.socket(zmq.SUB)
            sock.setsockopt(zmq.SUBSCRIBE, "") #订阅所有信息
            sock.connect('inproc://queue') #websocket连接到订阅的地址
            while True:
                msg = sock.recv()
                ws.send(msg)
    
    def zmq_producer(context):  #发布的方法
        socket = context.socket(zmq.PUB)
        socket.connect('tcp://127.0.0.1:5000') #绑定到发布的socket
    
        while True:
            x = time.time() * 1000
            y = 2.5 * (1 + math.sin(x / 500))
            socket.send(json.dumps(dict(x=x, y=y))) #往发布socket发送数据,这样,数据会被inproc://queue订阅,而被websocket获取,根据数据展示
            gevent.sleep(0.05)
    
    if __name__ == '__main__':
        main()
    
    from:http://www.dongwm.com/archives/guanyugeventdeyixielijieer/
  • 相关阅读:
    排序算法 之 冒泡排序 插入排序 希尔排序 堆排序
    DataStructure之线性表以及其实现
    使用可重入函数进行更安全的信号处理
    内存经济学
    电脑通用技能
    循环套餐的逻辑
    占用了多少内存
    索引的用法
    电脑的眼缘
    字符串积木
  • 原文地址:https://www.cnblogs.com/buxizhizhoum/p/7448647.html
Copyright © 2011-2022 走看看