zoukankan      html  css  js  c++  java
  • 网络编程之python zeromq学习系列之一

        简介:
    
          zeromq中间件,他是一个轻量级的消息中间件,传说是世界上最快的消息中间件,为什么这么说呢?
        因为一般的消息中间件都需要启动消息服务器,但是zeromq这厮尽然没有消息服务器,他压根没有消息中间件的架子,但是这并不能掩盖他的强大。
        通过和activemq,rabbitmq对比,显然功能上没有前两者这么强大,他不支持消息的持久化,但是有消息copy功能,他也不支持崩溃恢复,而且由于他太快了,可能客户端还没启动,服务端的消息就已经发出去了,这个就容易丢消息了,但是zeromq自由他的办法,就先说这么多了。先来看看怎么在python中引入这个强大的利器。
        我自己之所以,学习体会一下,主要原因,是想在练习过程中体会其中的应用原理及逻辑,最好是能感知到其中的设计思想,为以后,自己做东西积攒点经验.
        另外最近也比较关注自动化运维的一些东西.网上说saltstack本身就用的zeromq做消息队列.所以更引起了我的兴趣.
        安装:
        我的操作系统是ubuntu 14.04的 python zeromq 环境安装参考这里的官网
    
        下面测试:
    
        一,C/S模式:
        server 端代码:
            #!/usr/bin/env python
            # coding:utf8
            #author: wangqiankun@lashou-inc.com
    
    
            import zmq
            #调用zmq相关类方法,邦定端口
            context = zmq.Context()
            socket = context.socket(zmq.REP)
            socket.bind('tcp://*:10001')
    
    
    
            while True:
                #循环接受客户端发来的消息
                msg = socket.recv()
                print "Msg info:%s" %msg
                #向客户端服务器发端需要执行的命令
                cmd_info = raw_input("client cmd info:").strip()
                socket.send(cmd_info)
    
            socket.close()
    
        client 端代码:
          import zmq
            import time
            import commands
    
            context = zmq.Context()
            socket = context.socket(zmq.REQ)
            socket.connect('tcp://127.0.0.1:10001')
    
    
            def execut_cmd(cmd):
                s,v = commands.getstatusoutput(cmd)
                return v
    
    
    
            while True:
                #获取当前时间
                now_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
    
                socket.send("now time info:[%s] request execution command:'
    ',%s"%(now_time,result))
                recov_msg = socket.recv()
                #调用execut_cmd函数,执行服务器发过来的命令
                result = execut_cmd(recov_msg)
                print recov_msg,'
    ',result,
                time.sleep(1)
                #print "now time info:%s cmd status:[%s],result:[%s]" %(now_time,s,v)
                continue
    
            socket.close()
          注意:此模式是经典的应答模式,不能同时send多个数据,
            这种模式说是主要用于远程调用和任务分配,但我愚笨,还是理解不透.后面有时间,再回过来好好看看,
    
            测试:
            req端
            # python zmq-server-cs-v01.py
            rep端
            # python  zmq-client-cs-v01.py
          
        二,发布订阅模式(pub/sub)
    
            pub 发布端代码如下:
    
            #!/usr/bin/env python
            # coding:utf8
            #author: wangqiankun@lashou-inc.com
    
            import itertools
            import sys,time,zmq
    
    
            def main():
                if len(sys.argv) != 2:
                    print 'Usage: publisher'
                    sys.exit(1)
                bind_to = sys.argv[1]
                all_topics = ['sports.general','sports.football','sports.basketball','stocks.general','stocks.GOOG','stocks.AAPL','weather']
    
                ctx = zmq.Context()
                s = ctx.socket(zmq.PUB)
                s.bind(bind_to)
    
                print "Starting broadcast on topics:"
                print "%s" %all_topics
                print "Hit Ctrl-c to stop broadcasting."
                print "waiting so subscriber sockets can connect...."
    
                print
                time.sleep(1)
                msg_counter = itertools.count()
    
                try:
                    for topic in itertools.cycle(all_topics):
                    msg_body = str(msg_counter.next())
                    #print msg_body,
                    print 'Topic:%s,msg:%s' %(topic,msg_body)
                    s.send_multipart([topic,msg_body])
                    #s.send_pyobj([topic,msg_body])
                    time.sleep(0.1)
                except KeyboardInterrupt:
    
                    pass
    
    
                print "Wating for message queues to flush"
    
                time.sleep(0.5)
                s.close()
                print "Done"
    
            if __name__ == "__main__":
            main()
    
            sub  端代码:
    
                #!/usr/bin/env python
                # coding:utf8
                #author: wangqiankun@lashou-inc.com
    
                import zmq
                import time,sys
    
    
                def main():
    
                if len(sys.argv) < 2:
                    print "Usage: subscriber [topic topic]"
                    sys.exit(1)
    
                connect_to = sys.argv[1]
                topics = sys.argv[2:]
    
                ctx = zmq.Context()
                s = ctx.socket(zmq.SUB)
                s.connect(connect_to)
    
                #manage subscriptions
    
                if not topics:
                    print "Receiving messages on ALL topics...."
                    s.setsockopt(zmq.SUBSCRIBE,'')
                else:
                    print "Receiving messages on topics: %s..." %topics
    
                    for t in topics:
                    s.setsockopt(zmq.SUBSCRIBE,t)
    
                    print
                try:
                    while True:
                    topics,msg = s.recv_multipart()
                    print 'Topic:%s,msg:%s' %(topics,msg)
                except KeyboardInterrupt:
                    pass
                print "Done...."
    
    
                if __name__ == "__main__":
                main()
    
    
    
    
    
    
    
         注意:
         这里的发布与订阅角色是绝对的,即发布者无法使用recv,订阅者不能使用send,官网还提供了一种可能出现的问题:当订阅者消费慢于发布,
         此时就会出现数据的堆积,而且还是在发布端的堆积(有朋友指出是堆积在消费端,或许是新版本改进,需要读者的尝试和反馈,thx!),显然,
         这是不可以被接受的。至于解决方案,或许后面的"分而治之"就是吧
    
         测试:
         pub端: 发布端 
         #python zmq-server-pubsub-v02.py  tcp://127.0.0.1:10001
         sub端:订阅端
         #python zmq-server-cs-v01.py  tcp://127.0.0.1:10001 sports.football
         
         三,push/pull 分而治之模式.
         
         任务发布端代码
         
         #!/usr/bin/env python
            # coding:utf8
            #author: wangqiankun@lashou-inc.com
    
    
    
            import zmq
            import random
            import time
    
            context = zmq.Context()
            #socket to send messages on
            sender = context.socket(zmq.PUSH)
            sender.bind('tcp://*:5557')
    
    
            print 'Press Enter when the workers are ready:'
            _ = raw_input()
            print "Sending tasks to workers...."
    
            #The first messages is "0" and signals start to batch
    
            sender.send('0')
    
            #Initialize random mumber generator
    
            random.seed()
    
            #send 100 tasks
    
            total_msec = 0
            for task_nbr in range(100):
                #Random workload from 1 to 100 msecs
                #print task_nbr,
                workload = random.randint(1,100)
                total_msec += workload
                sender.send(str(workload))
                print "Total expected cost:%s msec:%s workload:%s" %(total_msec,task_nbr,workload)
    
    
    
            work端代码如下:
    
            #!/usr/bin/env python
            # coding:utf8
            #author: wangqiankun@lashou-inc.com
    
            import sys,time,zmq
            import commands
    
    
            context = zmq.Context()
            #socket to receive messages on
    
            receiver = context.socket(zmq.PULL)
            receiver.connect('tcp://127.0.0.1:5557')
    
            #Socket to send messages to
    
            sender = context.socket(zmq.PUSH)
            sender.connect("tcp://127.0.0.1:5558")
    
            #Process tasks forever
    
            while True:
                s = receiver.recv()
    
                #Simple progress indicator for the viewer
                print s,
                sys.stdout.write("%s '	' "%s)
                sys.stdout.flush()
    
                #Do the work
                time.sleep(int(s)*0.001)
                #Send results to sink
                sender.send(s)
    
        pull端代码如下:
                #!/usr/bin/env python
                # coding:utf8
                #author: wangqiankun@lashou-inc.com
    
                import sys
                import time
                import zmq
    
                context = zmq.Context()
    
                #Socket to receive messages on
    
                receiver = context.socket(zmq.PULL)
                receiver.bind("tcp://*:5558")
    
                #Wait for start of batch
    
                s = receiver.recv()
    
                #Start our clock now
                tstart = time.time()
    
                #Process 100 confirmations
                total_msec = 0
    
                for task_nbr in range(100):
                s = receiver.recv()
    
                if task_nbr % 10 == 0:
                    print task_nbr,
                    print s,
                    sys.stdout.write(':')
    
                else:
                    print s,
                    #print task_nbr,
                    sys.stdout.write('.')
    
                #Calculate and report duration of batch
                tend = time.time()
                print "Total elapsed time:%d msec "%((tend-tstart)*1000)
    
        注意点:
        这种模式与pub/sub模式一样都是单向的,区别有两点:
        1,该模式下在没有消费者的情况下,发布者的信息是不会消耗的(由发布者进程维护)
        2,多个消费者消费的是同一列信息,假设A得到了一条信息,则B将不再得到
        这种模式主要针对在消费者能力不够的情况下,提供的多消费者并行消费解决方案(也算是之前的pub/sub模式的
        那个"堵塞问题"的一个解决策略吧)
    
        其实所谓的分就是pull端去抢push端发出来的任务.谁抢着算谁的.
    
        测试:
         #python zmq-server-pushpull-v03.py
         #python zmq-work-pushpull-v03.py
         #python zmq-client-pushpull-v03.py
         
  • 相关阅读:
    [翻译]在Windows版或MacOS版的Microsoft Edge上安装一个谷歌浏览器拓展
    《C#并发编程经典实例》学习笔记—2.6 任务完成时的处理
    《C#并发编程经典实例》学习笔记—2.5 等待任意一个任务完成 Task.WhenAny
    Visual Studio 2019 发布活动
    创建索引CreateIndex
    Windows 10 安装ElasticSearch(2)- MSI安装ElasticSearch和安装Kibana
    jQuery框架二
    jQuery框架
    JavaScript——二
    作业 5/17
  • 原文地址:https://www.cnblogs.com/shantu/p/4598933.html
Copyright © 2011-2022 走看看