zoukankan      html  css  js  c++  java
  • 异步IO数据库队列缓存

    SelectPollEpoll异步IO 

    select 多并发socket 例子

     1 import select
     2 import socket
     3 import sys
     4 import queue
     5 
     6 
     7 server = socket.socket()
     8 server.setblocking(0)
     9 
    10 server_addr = ('localhost',10000)
    11 
    12 print('starting up on %s port %s' % server_addr)
    13 server.bind(server_addr)
    14 
    15 server.listen(5)
    16 
    17 
    18 inputs = [server, ] #自己也要监测呀,因为server本身也是个fd
    19 outputs = []
    20 
    21 message_queues = {}
    22 
    23 while True:
    24     print("waiting for next event...")
    25 
    26     readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里
    27 
    28     for s in readable: #每个s就是一个socket
    29 
    30         if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了,
    31             #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀
    32             #新连接进来了,接受这个连接
    33             conn, client_addr = s.accept()
    34             print("new connection from",client_addr)
    35             conn.setblocking(0)
    36             inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接
    37             #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到
    38             #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的
    39 
    40             message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送
    41 
    42         else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了
    43             #客户端的数据过来了,在这接收
    44             data = s.recv(1024)
    45             if data:
    46                 print("收到来自[%s]的数据:" % s.getpeername()[0], data)
    47                 message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端
    48                 if s not  in outputs:
    49                     outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端
    50 
    51 
    52             else:#如果收不到data代表什么呢? 代表客户端断开了呀
    53                 print("客户端断开了",s)
    54 
    55                 if s in outputs:
    56                     outputs.remove(s) #清理已断开的连接
    57 
    58                 inputs.remove(s) #清理已断开的连接
    59 
    60                 del message_queues[s] ##清理已断开的连接
    61 
    62 
    63     for s in writeable:
    64         try :
    65             next_msg = message_queues[s].get_nowait()
    66 
    67         except queue.Empty:
    68             print("client [%s]" %s.getpeername()[0], "queue is empty..")
    69             outputs.remove(s)
    70 
    71         else:
    72             print("sending msg to [%s]"%s.getpeername()[0], next_msg)
    73             s.send(next_msg.upper())
    74 
    75 
    76     for s in exeptional:
    77         print("handling exception for ",s.getpeername())
    78         inputs.remove(s)
    79         if s in outputs:
    80             outputs.remove(s)
    81         s.close()
    82 
    83         del message_queues[s]
    server
     1 import socket
     2 import sys
     3 
     4 messages = [ b'This is the message. ',
     5              b'It will be sent ',
     6              b'in parts.',
     7              ]
     8 server_address = ('localhost', 10000)
     9 
    10 # Create a TCP/IP socket
    11 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
    12           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
    13           ]
    14 
    15 # Connect the socket to the port where the server is listening
    16 print('connecting to %s port %s' % server_address)
    17 for s in socks:
    18     s.connect(server_address)
    19 
    20 for message in messages:
    21 
    22     # Send messages on both sockets
    23     for s in socks:
    24         print('%s: sending "%s"' % (s.getsockname(), message) )
    25         s.send(message)
    26 
    27     # Read responses on both sockets
    28     for s in socks:
    29         data = s.recv(1024)
    30         print( '%s: received "%s"' % (s.getsockname(), data) )
    31         if not data:
    32             print(sys.stderr, 'closing socket', s.getsockname() )
    client

    selectors模块

     1 import selectors
     2 import socket
     3  
     4 sel = selectors.DefaultSelector()
     5  
     6 def accept(sock, mask):
     7     conn, addr = sock.accept()  # Should be ready
     8     print('accepted', conn, 'from', addr)
     9     conn.setblocking(False)
    10     sel.register(conn, selectors.EVENT_READ, read)
    11  
    12 def read(conn, mask):
    13     data = conn.recv(1000)  # Should be ready
    14     if data:
    15         print('echoing', repr(data), 'to', conn)
    16         conn.send(data)  # Hope it won't block
    17     else:
    18         print('closing', conn)
    19         sel.unregister(conn)
    20         conn.close()
    21  
    22 sock = socket.socket()
    23 sock.bind(('localhost', 10000))
    24 sock.listen(100)
    25 sock.setblocking(False)
    26 sel.register(sock, selectors.EVENT_READ, accept)
    27  
    28 while True:
    29     events = sel.select()
    30     for key, mask in events:
    31         callback = key.data
    32         callback(key.fileobj, mask)
    selectors

    堡垒机前戏

    开发堡垒机之前,先来学习Python的paramiko模块,该模块机遇SSH用于连接远程服务器并执行相关操作

    SSHClient

    用于连接远程服务器并执行基本命令

    基于用户名密码连接:

     1 import paramiko
     2   
     3 # 创建SSH对象
     4 ssh = paramiko.SSHClient()
     5 # 允许连接不在know_hosts文件中的主机
     6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
     7 # 连接服务器
     8 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123')
     9   
    10 # 执行命令
    11 stdin, stdout, stderr = ssh.exec_command('df')
    12 # 获取命令结果
    13 result = stdout.read()
    14   
    15 # 关闭连接
    16 ssh.close()
     1 import paramiko
     2 
     3 transport = paramiko.Transport(('hostname', 22))
     4 transport.connect(username='wupeiqi', password='123')
     5 
     6 ssh = paramiko.SSHClient()
     7 ssh._transport = transport
     8 
     9 stdin, stdout, stderr = ssh.exec_command('df')
    10 print stdout.read()
    11 
    12 transport.close()

    基于公钥密钥连接:

     1 import paramiko
     2  
     3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
     4  
     5 # 创建SSH对象
     6 ssh = paramiko.SSHClient()
     7 # 允许连接不在know_hosts文件中的主机
     8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
     9 # 连接服务器
    10 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key)
    11  
    12 # 执行命令
    13 stdin, stdout, stderr = ssh.exec_command('df')
    14 # 获取命令结果
    15 result = stdout.read()
    16  
    17 # 关闭连接
    18 ssh.close()
     1 import paramiko
     2 
     3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
     4 
     5 transport = paramiko.Transport(('hostname', 22))
     6 transport.connect(username='wupeiqi', pkey=private_key)
     7 
     8 ssh = paramiko.SSHClient()
     9 ssh._transport = transport
    10 
    11 stdin, stdout, stderr = ssh.exec_command('df')
    12 
    13 transport.close()
    SSHClient 封装 Transport
     1 import paramiko
     2 from io import StringIO
     3 
     4 key_str = """-----BEGIN RSA PRIVATE KEY-----
     5 MIIEpQIBAAKCAQEAq7gLsqYArAFco02/55IgNg0r7NXOtEM3qXpb/dabJ5Uyky/8
     6 NEHhFiQ7deHIRIuTW5Zb0kD6h6EBbVlUMBmwJrC2oSzySLU1w+ZNfH0PE6W6fans
     7 H80whhuc/YgP+fjiO+VR/gFcqib8Rll5UfYzf5H8uuOnDeIXGCVgyHQSmt8if1+e
     8 7hn1MVO1Lrm9Fco8ABI7dyv8/ZEwoSfh2C9rGYgA58LT1FkBRkOePbHD43xNfAYC
     9 tfLvz6LErMnwdOW4sNMEWWAWv1fsTB35PAm5CazfKzmam9n5IQXhmUNcNvmaZtvP
    10 c4f4g59mdsaWNtNaY96UjOfx83Om86gmdkKcnwIDAQABAoIBAQCnDBGFJuv8aA7A
    11 ZkBLe+GN815JtOyye7lIS1n2I7En3oImoUWNaJEYwwJ8+LmjxMwDCtAkR0XwbvY+
    12 c+nsKPEtkjb3sAu6I148RmwWsGncSRqUaJrljOypaW9dS+GO4Ujjz3/lw1lrxSUh
    13 IqVc0E7kyRW8kP3QCaNBwArYteHreZFFp6XmtKMtXaEA3saJYILxaaXlYkoRi4k8
    14 S2/K8aw3ZMR4tDCOfB4o47JaeiA/e185RK3A+mLn9xTDhTdZqTQpv17/YRPcgmwz
    15 zu30fhVXQT/SuI0sO+bzCO4YGoEwoBX718AWhdLJFoFq1B7k2ZEzXTAtjEXQEWm6
    16 01ndU/jhAasdfasdasdfasdfa3eraszxqwefasdfadasdffsFIfAsjQb4HdkmHuC
    17 OeJrJOd+CYvdEeqJJNnF6AbHyYHIECkj0Qq1kEfLOEsqzd5nDbtkKBte6M1trbjl
    18 HtJ2Yb8w6o/q/6Sbj7wf/cW3LIYEdeVCjScozVcQ9R83ea05J+QOAr4nAoGBAMaq
    19 UzLJfLNWZ5Qosmir2oHStFlZpxspax/ln7DlWLW4wPB4YJalSVovF2Buo8hr8X65
    20 lnPiE41M+G0Z7icEXiFyDBFDCtzx0x/RmaBokLathrFtI81UCx4gQPLaSVNMlvQA
    21 539GsubSrO4LpHRNGg/weZ6EqQOXvHvkUkm2bDDJAoGATytFNxen6GtC0ZT3SRQM
    22 WYfasdf3xbtuykmnluiofasd2sfmjnljkt7khghmghdasSDFGQfgaFoKfaawoYeH
    23 C2XasVUsVviBn8kPSLSVBPX4JUfQmA6h8HsajeVahxN1U9e0nYJ0sYDQFUMTS2t8
    24 RT57+WK/0ONwTWHdu+KnaJECgYEAid/ta8LQC3p82iNAZkpWlGDSD2yb/8rH8NQg
    25 9tjEryFwrbMtfX9qn+8srx06B796U3OjifstjJQNmVI0qNlsJpQK8fPwVxRxbJS/
    26 pMbNICrf3sUa4sZgDOFfkeuSlgACh4cVIozDXlR59Z8Y3CoiW0uObEgvMDIfenAj
    27 98pl3ZkCgYEAj/UCSni0dwX4pnKNPm6LUgiS7QvIgM3H9piyt8aipQuzBi5LUKWw
    28 DlQC4Zb73nHgdREtQYYXTu7p27Bl0Gizz1sW2eSgxFU8eTh+ucfVwOXKAXKU5SeI
    29 +MbuBfUYQ4if2N/BXn47+/ecf3A4KgB37Le5SbLDddwCNxGlBzbpBa0=
    30 -----END RSA PRIVATE KEY-----"""
    31 
    32 private_key = paramiko.RSAKey(file_obj=StringIO(key_str))
    33 transport = paramiko.Transport(('10.0.1.40', 22))
    34 transport.connect(username='wupeiqi', pkey=private_key)
    35 
    36 ssh = paramiko.SSHClient()
    37 ssh._transport = transport
    38 
    39 stdin, stdout, stderr = ssh.exec_command('df')
    40 result = stdout.read()
    41 
    42 transport.close()
    43 
    44 print(result)
    基于私钥字符串进行连接

    SFTPClient

    用于连接远程服务器并执行上传下载

    基于用户名密码上传下载

     1 import paramiko
     2  
     3 transport = paramiko.Transport(('hostname',22))
     4 transport.connect(username='wupeiqi',password='123')
     5  
     6 sftp = paramiko.SFTPClient.from_transport(transport)
     7 # 将location.py 上传至服务器 /tmp/test.py
     8 sftp.put('/tmp/location.py', '/tmp/test.py')
     9 # 将remove_path 下载到本地 local_path
    10 sftp.get('remove_path', 'local_path')
    11  
    12 transport.close()

    基于公钥密钥上传下载

     1 import paramiko
     2  
     3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
     4  
     5 transport = paramiko.Transport(('hostname', 22))
     6 transport.connect(username='wupeiqi', pkey=private_key )
     7  
     8 sftp = paramiko.SFTPClient.from_transport(transport)
     9 # 将location.py 上传至服务器 /tmp/test.py
    10 sftp.put('/tmp/location.py', '/tmp/test.py')
    11 # 将remove_path 下载到本地 local_path
    12 sftp.get('remove_path', 'local_path')
    13  
    14 transport.close()

    RabbitMQ队列  

    安装 rabbitMA

    http://www.cnblogs.com/ericli-ericli/p/5902270.html

    http://blog.csdn.net/zyz511919766/article/details/41946521

    Work Queues

     1 import pika
     2 connection = pika.BlockingConnection(pika.ConnectionParameters(
     3         host='localhost'))
     4 channel = connection.channel()
     5  
     6 channel.queue_declare(queue='hello')
     7  
     8 channel.basic_publish(exchange='',
     9                       routing_key='hello',
    10                       body='Hello World!')
    11 print(" [x] Sent 'Hello World!'")
    12 connection.close()
    生产者
     1 import pika
     2 connection = pika.BlockingConnection(pika.ConnectionParameters(
     3         host='localhost'))
     4 channel = connection.channel()
     5  
     6 channel.queue_declare(queue='hello')
     7  
     8 def callback(ch, method, properties, body):
     9     print(" [x] Received %r" % body)
    10  
    11 channel.basic_consume(callback,
    12                       queue='hello',
    13                       no_ack=True)
    14  
    15 print(' [*] Waiting for messages. To exit press CTRL+C')
    16 channel.start_consuming()
    消费者

    1、acknowledgment 消息不丢失

    no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

     1 import pika
     2 
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(
     4         host='10.211.55.4'))
     5 channel = connection.channel()
     6 
     7 channel.queue_declare(queue='hello')
     8 
     9 def callback(ch, method, properties, body):
    10     print(" [x] Received %r" % body)
    11     import time
    12     time.sleep(10)
    13     print 'ok'
    14     ch.basic_ack(delivery_tag = method.delivery_tag)
    15 
    16 channel.basic_consume(callback,
    17                       queue='hello',
    18                       no_ack=False)
    19 
    20 print(' [*] Waiting for messages. To exit press CTRL+C')
    21 channel.start_consuming()
    消费者

    2、durable   消息不丢失

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='hello', durable=True)
    
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!',
                          properties=pika.BasicProperties(
                              delivery_mode=2, # make message persistent
                          ))
    print(" [x] Sent 'Hello World!'")
    connection.close()
    生产者
     1 import pika
     2 
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
     4 channel = connection.channel()
     5 
     6 # make message persistent
     7 channel.queue_declare(queue='hello', durable=True)
     8 
     9 
    10 def callback(ch, method, properties, body):
    11     print(" [x] Received %r" % body)
    12     import time
    13     time.sleep(10)
    14     print 'ok'
    15     ch.basic_ack(delivery_tag = method.delivery_tag)
    16 
    17 channel.basic_consume(callback,
    18                       queue='hello',
    19                       no_ack=False)
    20 
    21 print(' [*] Waiting for messages. To exit press CTRL+C')
    22 channel.start_consuming()
    消费者

    3、消息获取顺序

    默认消息队列里的数据是按照顺序被消费者拿走,但是在消费者端,配置prefetch_count=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print 'ok'
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    消费者

    发布订阅

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

     exchange type = fanout

     1 import pika
     2 import sys
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5         host='localhost'))
     6 channel = connection.channel()
     7 
     8 channel.exchange_declare(exchange='logs',
     9                          type='fanout')
    10 
    11 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    12 channel.basic_publish(exchange='logs',
    13                       routing_key='',
    14                       body=message)
    15 print(" [x] Sent %r" % message)
    16 connection.close()
    发布者
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             type='fanout')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    channel.queue_bind(exchange='logs',
                       queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    订阅者

    有选择的接收消息

     exchange type = direct

    之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

     1 import pika
     2 import sys
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5         host='localhost'))
     6 channel = connection.channel()
     7 
     8 channel.exchange_declare(exchange='direct_logs',
     9                          type='direct')
    10 
    11 result = channel.queue_declare(exclusive=True)
    12 queue_name = result.method.queue
    13 
    14 severities = sys.argv[1:]
    15 if not severities:
    16     sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
    17     sys.exit(1)
    18 
    19 for severity in severities:
    20     channel.queue_bind(exchange='direct_logs',
    21                        queue=queue_name,
    22                        routing_key=severity)
    23 
    24 print(' [*] Waiting for logs. To exit press CTRL+C')
    25 
    26 def callback(ch, method, properties, body):
    27     print(" [x] %r:%r" % (method.routing_key, body))
    28 
    29 channel.basic_consume(callback,
    30                       queue=queue_name,
    31                       no_ack=True)
    32 
    33 channel.start_consuming()
    消费者
     1 import pika
     2 import sys
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5         host='localhost'))
     6 channel = connection.channel()
     7 
     8 channel.exchange_declare(exchange='direct_logs',
     9                          type='direct')
    10 
    11 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    13 channel.basic_publish(exchange='direct_logs',
    14                       routing_key=severity,
    15                       body=message)
    16 print(" [x] Sent %r:%r" % (severity, message))
    17 connection.close()
    生产者

    模糊匹配

    exchange type = topic

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

    • # 表示可以匹配 0 个 或 多个 单词
    • *  表示只能匹配 一个 单词
    • 1 发送者路由值              队列中
      2 old.boy.python          old.*  -- 不匹配
      3 old.boy.python          old.#  -- 匹配
       1 import pika
       2 import sys
       3 
       4 connection = pika.BlockingConnection(pika.ConnectionParameters(
       5         host='localhost'))
       6 channel = connection.channel()
       7 
       8 channel.exchange_declare(exchange='topic_logs',
       9                          type='topic')
      10 
      11 result = channel.queue_declare(exclusive=True)
      12 queue_name = result.method.queue
      13 
      14 binding_keys = sys.argv[1:]
      15 if not binding_keys:
      16     sys.stderr.write("Usage: %s [binding_key]...
      " % sys.argv[0])
      17     sys.exit(1)
      18 
      19 for binding_key in binding_keys:
      20     channel.queue_bind(exchange='topic_logs',
      21                        queue=queue_name,
      22                        routing_key=binding_key)
      23 
      24 print(' [*] Waiting for logs. To exit press CTRL+C')
      25 
      26 def callback(ch, method, properties, body):
      27     print(" [x] %r:%r" % (method.routing_key, body))
      28 
      29 channel.basic_consume(callback,
      30                       queue=queue_name,
      31                       no_ack=True)
      32 
      33 channel.start_consuming()
      消费者
      import pika
      import sys
      
      connection = pika.BlockingConnection(pika.ConnectionParameters(
              host='localhost'))
      channel = connection.channel()
      
      channel.exchange_declare(exchange='topic_logs',
                               type='topic')
      
      routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
      message = ' '.join(sys.argv[2:]) or 'Hello World!'
      channel.basic_publish(exchange='topic_logs',
                            routing_key=routing_key,
                            body=message)
      print(" [x] Sent %r:%r" % (routing_key, message))
      connection.close()
      生产者

      memcached 

      http://www.cnblogs.com/wupeiqi/articles/5132791.html  

      redis 使用

      http://www.cnblogs.com/alex3714/articles/6217453.html  

     

  • 相关阅读:
    使用CablleStatement调用存储过程
    权限问题
    全文检索lucene6.1的检索方式
    spring的JdbcTemplate
    spring使用注解开发
    IDEA的快捷键:
    IDEA里面的facets和artifacts的讲解
    Hibernate---criteria的具体使用列子
    关于操作日期函数及其取范围
    hibernate---crateria
  • 原文地址:https://www.cnblogs.com/nikitapp/p/6722783.html
Copyright © 2011-2022 走看看