zoukankan      html  css  js  c++  java
  • Python 常用外部模块详解

    RabbitMQ

    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统,他遵循Mozilla Public License开源协议,MQ全称为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法,应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过队列来通信,队列的使用除去了接收和发送应用程序同时执行的要求,说的笼统点是queue+socket实现.

    ◆MQ的基础应用◆

    如果启动了多个消费者,那么他们之间是串行获取数据的,也就是说如果1号消费者收不到数据,那么MQ将默认发送给2号消费者,以此类推,如果全部消费者不在线,那么MQ会默认存储这个消息,直到有消费者上线,MQ就会将消息发送给指定的消费者.

    生产者:

    import pika
    
    conn = pika.BlockingConnection(pika.ConnectionParameters
                                   (host="192.168.1.5",port="5672")    #指定连接地址
                )
    print("链接消息:",conn)
    
    channel = conn.channel()
    channel.queue_declare(queue="lyshark")
    
    while True:
        temp =input("发送数据:").strip()
        channel.basic_publish(exchange="",routing_key="lyshark",body=temp)
    
    conn.close()
    

    消费者:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters
                                        (host='192.168.1.5',port="5672")
                )
    
    channel = connection.channel()
    channel.queue_declare(queue='lyshark')
    
    def callback(ch,method,properties,body):
        print("接收的数据: %r" %body)
    
    channel.basic_consume(callback,         #消息来到后,调用callback回调函数.
                          queue='lyshark',  #指定消息队列名称
                          no_ack=True)      # 如果=True,则消息发送中间中断后会自动保存下来.
                                            # 下一次客户端上线后会自动的接受消息
    
    print("==========准备接收消息==========")
    channel.start_consuming()              #循环接收消息
    

    ◆消息的持久化◆

    如果服务器端被强制关闭了,我们的消息就丢失了,那就需要我们对服务器端的数据做一个持久化处理.

    在每次声明队列的时候加上durable=True 队列持久化,delivery_mode =2 消息持久化
    也就是开启持久化的意思,必须客户端服务端都要写上.

    生产者:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.5'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello', durable=True)
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!',
                          properties=pika.BasicProperties(delivery_mode=2, ))  # 发布时设置delivery_mode=2,数据持久化
    print(" [x] Sent 'Hello World!'")
    connection.close()
    

    消费者:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.5'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello', durable=True)
    
    def callback(ch, method, properties, body):
        print("返回数据: %r" % body)
        import time
        #time.sleep(10)
        print("完成...")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    

    ◆消息发布订阅◆

    如上的配置方式,MQ只能将消息发送给一个消费者手里,有时候我们想给所有的消费者发送消息,那就需要使用广播的方式给所有的客户端发送消息的分发,MQ支持消息的公平分发,之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息.

    发布者(fanout广播模式): 指定发布者为广播模式,所有bind到此exchange的queue都可以接收到服务端发送的消息.

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.5"))
    channel = connection.channel()
    
    channel.exchange_declare(exchange="logs",
                             exchange_type="fanout"          #指定使用广播模式
                             )
    
    message = "info:hello lyshark"   #指定发送的消息
    
    channel.basic_publish(exchange="logs",
                          routing_key="",         #不绑定队列,因为是广播模式
                          body = message          #要发送的消息
                          )
    
    print("发送消息: %r"%message)
    connection.close()
    

    订阅者(fanout广播模式): 订阅者修改让其随机生成队列名称,你可以启动多个订阅者来看其执行效果.

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.5"))
    channel = connection.channel()
    
    channel.exchange_declare(exchange="logs",exchange_type="fanout")   #指定为广播模式
    
    result = channel.queue_declare(exclusive=True)        #不指定queue名字,rabbit会随机分配一个名字
    queue_name = result.method.queue                      #返回这个随机生成的名字.
    channel.queue_bind(exchange="logs",queue=queue_name)  #绑定随机生成的名字
    
    print("==========接收数据==========")
    def callback(ch, method, properties, body):
        print("收到的数据: %r" %body)
    
    channel.basic_consume(callback,queue=queue_name,no_ack=True)
    channel.start_consuming()
    

    ◆选择发布订阅◆

    RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据关键字判定应该将数据发送至指定队列,direct模式通过routingKey和exchange决定的那个唯一的queue可以接收消息.

    发布者(direct模式):

    import pika
    import sys
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='direct_logs',
                             type='direct')
     
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    connection.close()
    

    发布者(direct模式):

    import pika
    import sys
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='direct_logs',
                             type='direct')
     
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
     
    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
     
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
     
    print(' [*] Waiting for logs. To exit press CTRL+C')
     
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
     
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
     
    channel.start_consuming()
    

    ## MariaDB

    MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL授权许可MariaDB的目的是完全兼容MySQL,包括API和命令行,使之能轻松成为MySQL的代替品,MariaDB由MySQL的创始人Michael Widenius主导开发,他早前曾以10亿美元的价格,将自己创建的公司MySQL AB卖给了SUN,本小结内容将介绍如何使用pymysql模块在程序中使用数据库应用.

    安装环境: 先安装MariaDB数据库,并配置好以下环境.

    [root@localhost ~]# yum install -y mariadb mariadb-server
    [root@localhost ~]# systemctl restart mariadb
    [root@localhost ~]# mysql -uroot -p123123
    
    MariaDB> CREATE DATABASE IF NOT EXISTS testdb DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
    
    MariaDB> use testdb;
    
    MariaDB> CREATE TABLE hosts (id int not null auto_increment primary key,ip varchar(20) not null,port int not null);
    
    MariaDB> INSERT INTO hosts (ip,port) values 
           ('1.1.1.1',22),('1.1.1.2',22),
           ('1.1.1.3',22),('1.1.1.4',22),
           ('1.1.1.5',22);
    
    MariaDB> grant all on *.* to root@'%' identified by '123123';
    

    连接数据库:

    import pymysql
    
    conn = pymysql.connect(
                host="192.168.1.5",    #连接IP地址
                port=3306,             #数据库端口
                user="root",           #数据库用户名
                passwd="123123",       #数据库密码
                db="testdb",           #操作的数据库
                charset="utf8"         #指定编码格式
                )
    cursor = conn.cursor()             #创建游标
    
    print("返回连接对象: ",conn)
    print("返回游标对象: ",cursor)
    

    ## Paramiko

    paramiko 是一个用于做远程SSH控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实,其实它的底层是对ssh的上层代码的一个封装,值得注意的是,由于paramiko模块内部依赖pycrypto,所以先下载安装pycrypto模块.

    ◆基于密码认证◆

    SSHClient:

    import paramiko
        
    # 创建SSH对象
    ssh = paramiko.SSHClient()
    # 允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 连接服务器
    ssh.connect(hostname='192.168.1.5',port=22,username='root',password='1233')
    # 执行命令
    stdin,stdout,stderr = ssh.exec_command('ls -lh')
    # 获取命令结果
    result = stdout.read()
    # 关闭连接
    ssh.close()
    

    Transport:

    import paramiko
     
    transport = paramiko.Transport(('192.168.1.5',22))
    transport.connect(username='root',password='1233')
     
    ssh = paramiko.SSHClient()
    ssh._transport = transport
     
    stdin, stdout, stderr = ssh.exec_command('ls -lh')
    print stdout.read()
     
    transport.close()
    

    ◆基于公钥认证◆

    SSHClient:

    import paramiko
    
    private_key = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
    
    # 创建SSH对象
    ssh = paramiko.SSHClient()
    # 允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 连接服务器
    ssh.connect(hostname='192.168.1.5',port=22,username='root',key=private_key)
    # 执行命令
    stdin,stdout,stderr = ssh.exec_command('ls -lh')
    # 获取命令结果
    result = stdout.read()
    # 关闭连接
    ssh.close()
    

    Transport:

    import paramiko
    
    private_key = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
    
    transport = paramiko.Transport(('192.168.1.5',22))
    transport.connect(username='root',pkey=private_key)
    
    ssh = paramiko.SSHClient()
    ssh._transport = transport
    
    stdin,stdout,stderr = ssh.exec_command('ls -lh')
    transport.close()
    

    ◆远程传输文件◆

    SFTPClient:

    import paramiko
    
    transport = paramiko.Transport(('192.168.1.5',22))
    transport.connect(username='root',password='1233')
    
    sftp = paramiko.SFTPClient.from_transport(transport)
    
    # 将目录下的location.py 上传至服务器 /tmp/lyshark.py
    sftp.put('./location.py', '/tmp/lyshark.py')
     
    # 将remove_path 下载到本地 local_path
    sftp.get('remove_path','local_path')
       
    transport.close()
    

    SFTPTransport:

    import paramiko
    
    private_key = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
    
    transport = paramiko.Transport(('192.168.1.5', 22))
    transport.connect(username='root', pkey=private_key )
    
    sftp = paramiko.SFTPClient.from_transport(transport)
    
    # 将location.py 上传至服务器 /tmp/test.py
    sftp.put('/tmp/location.py', '/tmp/test.py')
    
    # 将remove_path 下载到本地 local_path
    sftp.get('remove_path', 'local_path')
    transport.close()
    

    SQLAchemy , pyecharts

  • 相关阅读:
    【mysql】mac上基于tar.gz包安装mysql服务
    【maven】在idea上创建maven多模块项目
    关于Class.getResource和ClassLoader.getResource的路径问题
    【maven】Maven打包后为何文件大小改变了
    git常用命令
    第一章 第一个spring boot程序
    第二章 eclipse中m2e插件问题
    第一章 mac下开发环境的配置
    第一章 开发中遇到的错误列表
    第十一章 企业项目开发--消息队列activemq
  • 原文地址:https://www.cnblogs.com/LyShark/p/11317157.html
Copyright © 2011-2022 走看看