zoukankan      html  css  js  c++  java
  • zeromq-python使用

    官方使用文档:
    中文翻译文档:
    应答模式:
    server端:
    #
    #   Hello World server in Python
    #   Binds REP socket to tcp://*:5555
    #   Expects b"Hello" from client, replies with b"World"
    #
    import time
    import zmq
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:5555")
    while True:
        #  Wait for next request from client
        message = socket.recv()
        print("Received request: %s" % message)
        #  Do some 'work'
        time.sleep(1)
        #  Send reply back to client
        socket.send(b"World")
    server端
    client端:
    #
    #   Hello World client in Python
    #   Connects REQ socket to tcp://localhost:5555
    #   Sends "Hello" to server, expects "World" back
    #
    import zmq
    context = zmq.Context()
    #  Socket to talk to server
    print("Connecting to hello world server…")
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555")
    #  Do 10 requests, waiting each time for a response
    for request in range(10):
        print("Sending request %s …" % request)
        socket.send(b"Hello")
        #  Get the reply.
        message = socket.recv()
        print("Received reply %s [ %s ]" % (request, message))
    client端
    发布订阅模式:
    server端:
    #
    #   Weather update server
    #   Binds PUB socket to tcp://*:5556
    #   Publishes random weather updates
    #
    import zmq
    from random import randrange
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5556")
    while True:
        zipcode = randrange(1, 100000)
        temperature = randrange(-80, 135)
        relhumidity = randrange(10, 60)
        socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))
    server端
    client端:
    #
    #   Weather update client
    #   Connects SUB socket to tcp://localhost:5556
    #   Collects weather updates and finds avg temp in zipcode
    #
    import sys
    import zmq
    #  Socket to talk to server
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    print("Collecting updates from weather server…")
    socket.connect("tcp://localhost:5556")
    # Subscribe to zipcode, default is NYC, 10001
    zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"
    # Python 2 - ascii bytes to unicode str
    if isinstance(zip_filter, bytes):
        zip_filter = zip_filter.decode('ascii')
    socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
    # Process 5 updates
    total_temp = 0
    for update_nbr in range(5):
        string = socket.recv_string()
        zipcode, temperature, relhumidity = string.split()
        total_temp += int(temperature)
    print("Average temperature for zipcode '%s' was %dF" % (
          zip_filter, total_temp / (update_nbr+1))
    )
    client端
    推拉模式:
    server端:
    # Task ventilator
    # Binds PUSH socket to tcp://localhost:5557
    # Sends batch of tasks to workers via that socket
    #
    # Author: Lev Givon <lev(at)columbia(dot)edu>
    import zmq
    import random
    import time
    try:
        raw_input
    except NameError:
        # Python 3
        raw_input = input
    context = zmq.Context()
    # Socket to send messages on
    sender = context.socket(zmq.PUSH)
    sender.bind("tcp://*:5557")
    # Socket with direct access to the sink: used to syncronize start of batch
    sink = context.socket(zmq.PUSH)
    sink.connect("tcp://localhost:5558")
    print("Press Enter when the workers are ready: ")
    _ = raw_input()
    print("Sending tasks to workers…")
    # The first message is "0" and signals start of batch
    sink.send(b'0')
    # Initialize random number generator
    random.seed()
    # Send 100 tasks
    total_msec = 0
    for task_nbr in range(100):
        # Random workload from 1 to 100 msecs
        workload = random.randint(1, 100)
        total_msec += workload
        sender.send_string(u'%i' % workload)
    print("Total expected cost: %s msec" % total_msec)
    # Give 0MQ time to deliver
    time.sleep(1)
    server端
    中间work端(可有可无):
    # Task worker
    # Connects PULL socket to tcp://localhost:5557
    # Collects workloads from ventilator via that socket
    # Connects PUSH socket to tcp://localhost:5558
    # Sends results to sink via that socket
    #
    # Author: Lev Givon <lev(at)columbia(dot)edu>
    import sys
    import time
    import zmq
    context = zmq.Context()
    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.connect("tcp://localhost:5557")
    # Socket to send messages to
    sender = context.socket(zmq.PUSH)
    sender.connect("tcp://localhost:5558")
    # Process tasks forever
    while True:
        s = receiver.recv()
        # Simple progress indicator for the viewer
        sys.stdout.write('.')
        sys.stdout.flush()
        # Do the work
        time.sleep(int(s)*0.001)
        # Send results to sink
        sender.send(b'')
    中间work端(可有可无)
    client端:
    # Task sink
    # Binds PULL socket to tcp://localhost:5558
    # Collects results from workers via that socket
    #
    # Author: Lev Givon <lev(at)columbia(dot)edu>
    import sys
    import time
    import zmq
    context = zmq.Context()
    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.bind("tcp://*:5558")
    # Wait for start of batch
    s = receiver.recv()
    # Start our clock now
    tstart = time.time()
    # Process 100 confirmations
    for task_nbr in range(100):
        s = receiver.recv()
        if task_nbr % 10 == 0:
            sys.stdout.write(':')
        else:
            sys.stdout.write('.')
        sys.stdout.flush()
    # Calculate and report duration of batch
    tend = time.time()
    print("Total elapsed time: %d msec" % ((tend-tstart)*1000))
    client端
    非阻塞模式的两种实现:
    原始方式中的zmq.DONTWAIT是3.0之后的版本使用,之前的版本使用zmq.NOBLOCK
    import zmq
    import time
    
    # Prepare our context and sockets
    context = zmq.Context()
    
    # Connect to task ventilator
    receiver = context.socket(zmq.PULL)
    receiver.connect("tcp://localhost:5557")
    
    # Connect to weather server
    subscriber = context.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:5556")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")
    
    # Process messages from both sockets
    # We prioritize traffic from the task ventilator
    while True:
    
        # Process any waiting tasks
        while True:
            try:
                msg = receiver.recv(zmq.DONTWAIT)
            except zmq.Again:
                break
            # process task
    
        # Process any waiting weather updates
        while True:
            try:
                msg = subscriber.recv(zmq.DONTWAIT)
            except zmq.Again:
                break
            # process weather update
    
        # No activity, so sleep for 1 msec
        time.sleep(0.001)
    原始方式(不推荐)
    import zmq
    
    # Prepare our context and sockets
    context = zmq.Context()
    
    # Connect to task ventilator
    receiver = context.socket(zmq.PULL)
    receiver.connect("tcp://localhost:5557")
    
    # Connect to weather server
    subscriber = context.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:5556")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")
    
    # Initialize poll set
    poller = zmq.Poller()
    poller.register(receiver, zmq.POLLIN)
    poller.register(subscriber, zmq.POLLIN)
    
    # Process messages from both sockets
    while True:
        try:
            socks = dict(poller.poll())
        except KeyboardInterrupt:
            break
    
        if receiver in socks:
            message = receiver.recv()
            # process task
    
        if subscriber in socks:
            message = subscriber.recv()
            # process weather update
    poll方式
     
     
     
     
  • 相关阅读:
    预备作业02:体会做中学(Learning By Doing)
    寒假作业01
    20210418第 237 场周赛(一)
    机器学习第七堂课20210415
    云计算与信息安全第七节课20210413
    操作系统第七堂课2021年0412内存管理基础
    机器学习第六堂课20210408
    云计算与信息安全第六节课20210406
    机器学习第五节课20210401
    云计算与信息安全第五堂课20210330
  • 原文地址:https://www.cnblogs.com/goldd/p/5896257.html
Copyright © 2011-2022 走看看