zoukankan      html  css  js  c++  java
  • zmq使用

    zqm的三中模式

    1、请求回复

    server端

     1 import time
     2 import zmq
     3 import Queue
     4 Queue()
     5 context=zmq.Context()
     6 socket=context.socket(zmq.REP)
     7 socket.bind("tcp://*:5555")
     8 
     9 while True:
    10     message=socket.recv_string()
    11     socket.send(b"word")
    12     print message
    13     time.sleep(10)

    client端

     1 import zmq
     2 import time
     3 
     4 context = zmq.Context()
     5 
     6 #  Socket to talk to server
     7 print("Connecting to hello world server…")
     8 socket = context.socket(zmq.REQ)
     9 socket.connect("tcp://localhost:5555")
    10 
    11 #  Do 10 requests, waiting each time for a response
    12 s_time = time.time()
    13 for request in range(10):
    14     print("Sending request %s …" % request)
    15     socket.send(b"Hello")
    16 
    17     #  Get the reply.
    18     message = socket.recv()
    19     print("Received reply %s [ %s ]" % (request, message))
    20 print time.time() - s_time

    2、订阅模式 client端通过指定最前面的字符

    server端

     1 import zmq
     2 from random import randrange
     3 
     4 context = zmq.Context()
     5 socket = context.socket(zmq.PUB)
     6 socket.bind("tcp://*:5556")
     7 
     8 while True:
     9     zipcode = randrange(1, 100000)
    10     temperature = randrange(-80, 135)
    11     relhumidity = randrange(10, 60)
    12     print zipcode
    13 
    14     socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))

    client端

     1 import sys
     2 import zmq
     3 
     4 #  Socket to talk to server
     5 context = zmq.Context()
     6 socket = context.socket(zmq.SUB)
     7 
     8 print("Collecting updates from weather server…")
     9 socket.connect("tcp://localhost:5556")
    10 
    11 # Subscribe to zipcode, default is NYC, 10001
    12 zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"
    13 
    14 # Python 2 - ascii bytes to unicode str
    15 if isinstance(zip_filter, bytes):
    16     zip_filter = zip_filter.decode('ascii')
    17 socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
    18 
    19 # Process 5 updates
    20 total_temp = 0
    21 for update_nbr in range(5):
    22     string = socket.recv_string()
    23     zipcode, temperature, relhumidity = string.split()
    24     total_temp += int(temperature)
    25     print zipcode
    26 
    27 print("Average temperature for zipcode '%s' was %dF" % (
    28     zip_filter, total_temp / (update_nbr + 1))
    29       )

    3、并行模式,server发送,client端启动多个接收,并行处理

    server

     1 import zmq
     2 import time
     3 
     4 context = zmq.Context()
     5 socket = context.socket(zmq.PUSH)
     6 socket.bind("tcp://*:5557")
     7 num = 0
     8 
     9 while True:
    10     num += 1
    11     socket.send("测试消息" + str(num))
    12     print "测试消息" + str(num)
    13     # print "已发送"
    14     time.sleep(1)

    clinet

     1 import zmq, time
     2 
     3 context = zmq.Context()
     4 
     5 recive = context.socket(zmq.PULL)
     6 recive.connect('tcp://127.0.0.1:5557')
     7 
     8 
     9 def test1():
    10     while True:
    11         data = recive.recv()
    12         print "正在转发..."
    13         print data
    14         time.sleep(5)
    15 test1()
  • 相关阅读:
    Linux的常用用法
    docker入门实践01
    airflow安装rest api插件发现airflow webserver服务不能启动的解决办法
    27.Spark中transformation的介绍
    1.Cloudera Manager安装
    win10系统不能ping通vmware虚假机解决办法
    在airflow的BashOperator中执行docker容器中的脚本容易忽略的问题
    AirFlow后台运行调度程序
    Airflow怎么删除系统自带的DAG任务
    airflow删除dag不在页面显示
  • 原文地址:https://www.cnblogs.com/xinyonde/p/10906351.html
Copyright © 2011-2022 走看看