zoukankan      html  css  js  c++  java
  • python day12


    import contextlib

    @contextlib.contextmanager
    def worker_state(state_list,worker_thread):
        """
         用于记录线程中正在等待的线程数
         """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)

    free_list = []
    current_thread = 'alex'
    with worker_state(free_list,current_thread):
        print(123)
        print(456)
       
       
       
    # socket 使用contestlib自动关闭   
       
    import contextlib
    import socket

    @contextlib.contextmanager
    def context_socket(host,port):
        sk = socket.socket()
        sk.bind((host,port))
        sk.listen(5)
        try:
            yield sk
        finally:
            sk.close()
    with context_socket('127.0.0.1',8888) as sock:
        print(sock)
       
       
       
       
       
    1. REDIS订阅、发布、频道

    -----s3.py             #不用执行
    import redis

    class RedisHelper:
        def __init__(self):
            self.__conn = redis.Redis(host='127.0.0.1')
        def public(self,msg,chan):
            self.__conn.publish(chan,msg)
            return True
        def subscribe(self,chan):
            pub = self.__conn.pubsub()
            pub.subscribe(chan)
            pub.parse_response()
            return pub

       
       
       
       
       
    # 订阅,制定频道wmh.7
    -----s4.py

    import s3

    obj = s3.RedisHelper()
    data = obj.subscribe('wmh.7')
    print(data.parse_response())

    # 发布
    -----s5.py

    import s3

    obj = s3.RedisHelper()
    obj.public('alex db','wmh.7')      #发给频道“wmh.7”信息


    ------执行s4.py等待收,执行s5.py发送,s4收结果如下
    [b'message', b'wmh.7', b'alex db']


       
       
       
       
       
       
    2. rabbitmq 发收
    # 安装pika


    发     #   Queues
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.0.2'))
    channel = connection.channel()
    channel.queue_declare(queue='hello')
    channel.basic_publish(exchange='',routing_key='hello',body='Hello World')
    print(" [x] Sent 'Hello World!' ")
    connection.close()

    读  #多发几次     消费者Consumers
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
    channel = connection.channel()
    channel.queue_declare(queue='hello1')
    def callback(ch,method,properties,body):
        print(" [x] Received %r" % body)
    channel.basic_consume(callback,
                          queue='hello1',
                          no_ack=True)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
       
       
       

    在收上查看信息结果
    [*] Waiting for messages. To exit press CTRL+C
    [x] Received b'Hello World!'
    [x] Received b'Hello World!'
       
       
       
       

    绑定多个队列,exchange调度,随机生成队列,
    往exchange发数据,只要是订阅者都能同时收到

    --------8.py 订阅者      运行两或多次,自动创建队列

    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.0.2'))
    channel = connection.channel()
    channel.exchange_declare(exchange='logs_fanout',type='fanout')

    # 随机创建队列
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    # 绑定
    channel.queue_bind(exchange='logs_fanout',queue=queue_name)
    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
        print(" [x] %r" % body)

    channel.basic_consume(callback,queue=queue_name,no_ack=True)
    channel.start_consuming()


    ---------9.py  发布者 

    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.0.2'))
    channel = connection.channel()
    channel.exchange_declare(exchange='logs_fanout',type='fanout')

    message = '456'
    channel.basic_publish(exchange='logs_fanout',routing_key='',body=message)
    print(" [x] Sent %r" % message)
    connection.close()

    关键字发送

    --------消费者
    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.0.2'))
    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',type='direct')

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue

    severities = ['error','info','warning']      #定义三个关键字
    # severities = sys.argv[1:]
    # if not severities:
    #     sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0])
    #     sys.exit(1)

    for severity in severities:
        channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,queue=queue_name,no_ack=True)
    channel.start_consuming()

    -------发布者1

    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.0.2'))
    channel = connection.channel()
    channel.exchange_declare(exchange='direct_logs',type='direct')
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue

    severities = ['error',]
    # severities = sys.argv[1:]
    # if not severities:
    #     sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0])
    #     sys.exit(1)

    for severity in severities:
        channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,queue=queue_name,no_ack=True)
    channel.start_consuming()


    -------发布者2

    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',type='direct')
    severity = 'error'                                     #一个关键字
    message='123'
    channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    connection.close()

    操作MYSQL
    python   pymysql

    http://www.cnblogs.com/wupeiqi/articles/5699254.html


    create table tb1(
        nid int not null auto_increment primary key,
        num int null
    )

    create table tb1(
        nid int not null auto_increment,
        num int null,
        index(nid)
    )
    注意:1、对于自增列,必须是索引(含主键)。
         2、对于自增可以设置步长和起始值
             show session variables like 'auto_inc%';
             set session auto_increment_increment=2;
             set session auto_increment_offset=10;

             shwo global  variables like 'auto_inc%';
             set global auto_increment_increment=2;
             set global auto_increment_offset=10;


    自动创建数据库结构   sqlalchemy


    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine

    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/s13", max_overflow=5)

    Base = declarative_base()

    # 创建单表
    class Users(Base):
        __tablename__ = 'users'
        id = Column(Integer, primary_key=True)
        name = Column(String(32))
        extra = Column(String(16))

        __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'),
            Index('ix_id_name', 'name', 'extra'),
        )

    # 一对多
    class Favor(Base):
        __tablename__ = 'favor'
        nid = Column(Integer, primary_key=True)
        caption = Column(String(50), default='red', unique=True)


    class Person(Base):
        __tablename__ = 'person'
        nid = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=True)
        favor_id = Column(Integer, ForeignKey("favor.nid"))

    # 多对多
    class ServerToGroup(Base):
        __tablename__ = 'servertogroup'
        nid = Column(Integer, primary_key=True, autoincrement=True)
        server_id = Column(Integer, ForeignKey('server.id'))
        group_id = Column(Integer, ForeignKey('group.id'))

    class Group(Base):
        __tablename__ = 'group'
        id = Column(Integer, primary_key=True)
        name = Column(String(64), unique=True, nullable=False)


    class Server(Base):
        __tablename__ = 'server'

        id = Column(Integer, primary_key=True, autoincrement=True)
        hostname = Column(String(64), unique=True, nullable=False)
        port = Column(Integer, default=22)

    Base.metadata.create_all(engine)      #创建数据库
    #Base.metadata.drop_all(engine)       #删除数据库

    SSH操作
    http://www.cnblogs.com/wupeiqi/articles/5095821.html

    # 用于连接远程服务器并执行基本命令

    import paramiko

    ssh = paramiko.SSHClient()  # 创建SSH对象
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())    # 允许连接不在know_hosts文件中的主机
    ssh.connect(hostname='172.16.0.2', port=22, username='root', password='pwd@123')   # 连接服务器
    stdin, stdout, stderr = ssh.exec_command('ifconfig')            # 执行命令
    out = stdout.readlines()
    for i in out:
        print(i)
    result = stdout.read()      # 获取命令结果
    ssh.close()                 # 关闭连接

    # SSHClient 封装 Transport
    import paramiko

    transport = paramiko.Transport(('172.16.0.2', 22))
    transport.connect(username='root', password='pwd@123')
    ssh = paramiko.SSHClient()
    ssh._transport = transport
    stdin, stdout, stderr = ssh.exec_command('df')
    out = stdout.readlines()
    for i in out:
        print(i)
    print(stdout.read())
    transport.close()


    规范化ssh远程
    import paramiko
    hostname="172.16.0.2"
    port=22
    username="root"
    password="pwd@123"
    if __name__=="__main__":
        s=paramiko.SSHClient()
        s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        s.connect(hostname,port,username,password)
        stdin,stdout,sterr=s.exec_command("df -h")
        for i in stdout:
            print(i)
        # print(stdout.read())
        s.close()

    # 基于公钥密钥连接
    import paramiko
     
    private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
    ssh = paramiko.SSHClient()  # 创建SSH对象
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())                            # 允许连接不在know_hosts文件中的主机
    ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key)   # 连接服务器
    stdin, stdout, stderr = ssh.exec_command('df')                                        # 执行命令
    result = stdout.read()      # 获取命令结果
    ssh.close()                   # 关闭连接

    # 基于用户名密码上传下载
    import paramiko
     
    transport = paramiko.Transport(('hostname',22))
    transport.connect(username='wupeiqi',password='123')
    sftp = paramiko.SFTPClient.from_transport(transport)
    sftp.put('/tmp/location.py', '/tmp/test.py')  # 将location.py 上传至服务器 /tmp/test.py
    sftp.get('remove_path', 'local_path')         # 将remove_path 下载到本地 local_path
    transport.close()

    # 基于公钥密钥上传下载
    import paramiko
    private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
    transport = paramiko.Transport(('hostname', 22))
    transport.connect(username='wupeiqi', pkey=private_key )
    sftp = paramiko.SFTPClient.from_transport(transport)
    sftp.put('/tmp/location.py', '/tmp/test.py')      # 将location.py 上传至服务器 /tmp/test.py
    sftp.get('remove_path', 'local_path')             # 将remove_path 下载到本地 local_path
    transport.close()

    多线程综合ssh程序(多机、执行命令、上传、下载)

    import datetime
    import threading
    import paramiko

    def sshCmd(ip, username, passwd, cmds):
        try:
            ssh = paramiko.SSHClient()                                  # 创建SSH对象
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())   # 允许连接不在know_hosts文件中的主机
            ssh.connect(hostname=ip, port=22, username=username, password=passwd,timeout=5)  # 连接服务器
            for cmd in cmds:
                stdin, stdout, stderr = ssh.exec_command(cmd)              # 执行命令
                out = stdout.readlines()
                for i in out:
                    print(i,)
            print('%s 运行完毕 ' % (ip))
        except Exception as e:
            print('%s 运行失败,失败原因 %s' % (ip, e))
        finally:
            ssh.close()

    #上传文件
    def uploadFile(ip,username,passwd):
        try:
            t=paramiko.Transport((ip,22))
            t.connect(username=username,password=passwd)
            sftp=paramiko.SFTPClient.from_transport(t)
            remotepath='/opt/1.txt'
            localpath=r'D:Downloads1.txt'
            sftp.put(localpath,remotepath)
            print('上传文件成功')
        except Exception as e:
            print('%s 运行失败,失败原因 %s' % (ip, e))
        finally:
            t.close()

    #下载文件
    def downloadFile(ip,username,passwd):
        try:
            t=paramiko.Transport((ip,22))
            t.connect(username=username,password=passwd)
            sftp=paramiko.SFTPClient.from_transport(t)
            remotepath='/opt/1.py'
            localpath='1.py'
            sftp.get(remotepath,localpath)
            print('下载文件成功')
        except Exception as e:
            print('%s 运行失败,失败原因 %s' % (ip, e))
        finally:
            t.close()

    if __name__ == '__main__':
        # 需要执行的命令列表
        cmds = ['ls /root', 'ifconfig']
        # 需要进行远程监控的服务器列表
        servers = ['172.16.0.2']

        username = "root"
        passwd = "pwd@123"
        threads = []
        print("程序开始运行%s" % datetime.datetime.now())
        # 每一台服务器创建一个线程处理
        for server in servers:
            th = threading.Thread(target=sshCmd, args=(server, username, passwd, cmds))
            th.start()
            threads.append(th)

        # 等待线程运行完毕
        for th in threads:
            th.join()

        print("程序结束运行%s" % datetime.datetime.now())

        #测试文件的上传与下载
        uploadFile(servers[0],username,passwd)
        downloadFile(servers[0],username,passwd)


    # pexpect自动输入密码执行命令

    import pexpect
    def sshCmd(ip, passwd, cmd):
        ret = -1
        ssh = pexpect.spawn('ssh root@%s "%s"' % (ip, cmd))
        try:
            i = ssh.expect(['password:', 'continue connecting(yes/no)?'], timeout=5)
            if i == 0:
                ssh.sendline(passwd)
            elif i == 1:
                ssh.sendline('yes ')
                ssh.expect('password:')
                ssh.sendline(passwd)
            ssh.sendline(cmd)
            r = ssh.read()
            print(r)
            ret = 0
        except pexpect.EOF:
            print("EOF")
            ret = -1
        except pexpect.TIMEOUT:
            print("TIMEOUT")
            ret = -2
        finally:
            ssh.close()
        return ret

    sshCmd('172.16.0.2','pwd@123','ls /root')

    视频
    http://edu.51cto.com/course/course_id-5686.html


    ---------------------------------------------------------------------------------------------------------------------------------

    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    # Author:Minghu Wang

    # 导入:
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine


    # 创建对象的基类:
    Base = declarative_base()

    # 定义User对象:
    class User(Base):
        # 表的名字:
        __tablename__ = 'hosts'

        # 表的结构:
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String(20))
        ip = Column(String(32))
        password = Column(String(16))
        port = Column(String(16))
        business = Column(Integer, ForeignKey("favor.nid"))


    class Favor(Base):
        __tablename__ = 'favor'
        nid = Column(Integer, primary_key=True,autoincrement=True)
        caption = Column(String(50), default='red', unique=True)


    # 初始化数据库连接:
    engine = create_engine("mysql+pymysql://root:123456@192.168.1.103:3306/s13?charset=utf8", max_overflow=5)
    # 创建DBSession类型:
    DBSession = sessionmaker(bind=engine)
    # 创建session对象:
    session = DBSession()

    # # 创建新User对象:
    # new_user = User(name='DNS',ip='192.168.1.100',password='pwd@123',port='22',business='1')
    # # 添加到session:
    # session.add(new_user)
    # # 提交即保存到数据库:
    # session.commit()
    # # 关闭session:
    # session.close()

    # 插入数据
    obj = User(name='DNS',ip='192.168.1.100',password='pwd@123',port='22',business='1')
    session.add(obj)
    session.add_all([
        Favor(caption='kfjsf'),
        User(name='DNS',ip='192.168.1.100',password='pwd@123',port='22',business='1'),
        User(name='DNS',ip='192.168.1.100',password='pwd@123',port='22',business='1'),
    ])
    session.commit()

    # 查询
    ret = session.query(User.id, User.name,User.ip,User.password,User.port,User.business).all()
    for i in ret:
        # ret = session.query(Users).filter_by(name='alex1').all()
        # ret = session.query(Users).filter_by(name='alex1').first()
        print(i)


    # 改
    # session.query(Users).filter(Users.id > 2).update({"name": "099"})
    # session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False)
    # session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
    # session.commit()

    # 删除
    # session.query(Users).filter(Users.id > 2).delete()
    # session.commit()


    # 创建删除表
    # Base.metadata.create_all(engine)
    # Base.metadata.drop_all(engine)

  • 相关阅读:
    分别针对Customers表与Order表的通用查询操作
    类的继承
    kubernetes service 原理解析
    k8s生命周期-钩子函数
    深入理解Pod-初始化容器
    为 Pod 或容器配置安全性上下文
    Docker四种网络模式
    python中__new__方法详解及使用
    浅析python析构函数
    k8s中的网络
  • 原文地址:https://www.cnblogs.com/wangminghu/p/5720512.html
Copyright © 2011-2022 走看看