zoukankan      html  css  js  c++  java
  • Python学习day07 多线程多进程及主机管理

    #Created on 2016年12月30日

    第一课 课前废话 28minutes

    可以通过Queue来限制线程的数量,就像生产者和消费者

    第二课 上节内容回顾 20minutes
    上节回顾:
    socket文件传送
    多线程:
    计算密集型:多进程,需要CPU
    I/O密集型:多线程 ,不需CPU
    线程安全:线程锁 GLL
    event:线程间通信
    线程事件 event.set(),event.wait()
    两种写法:直接调用函数,另一种直接继承类
    生产者消费者模型:加快效率,松耦合
    paramiko
    ssh通信原理:认证加密,通信加密

    本节内容:
    多进程
    Paramiko
    审计开发
    Select异步模型

    第三课 多进程使用 14minutes
    Multiprocessing
    进程:由系统控制

    简单的多进程
    a = range(10)

    def main(n):
    time.sleep(1)
    print n*n
    return n*n

    if __name__ == '__main__':
    p = Pool(5)
    print p.map(main,a)


    第四课 子进程与父进程的关系 25minutes

    每一个程序都有主进程和子进程
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    import sys,time,re,os
    from multiprocessing import Pool,Process

    def aa(x):
    print x
    print 'ppid:',os.getppid()
    time.sleep(1)
    print 'pid:',os.getpid()

    aa('fffff')
    print '-----------'
    p = Process(target=aa,args=('sssss',))
    p.start()
    p.join()

    结果:
    [root@Rsyslog day7]# python multiprocess.py
    fffff
    ppid: 7730
    pid: 7760
    -----------
    sssss
    ppid: 7760
    pid: 7761
    [root@Rsyslog day7]#


    第五课 进程间的内存同步方法Queue 31minutes

    以下例子多进程无法通信的,但在Win下老死机,需要Linux测试
    def aa(info_list,n):
    info_list.append(n)
    print info_list

    info = []
    for i in range(10):
    #进程不共享内存,所以每次只出来一个
    p = Process(target=aa,args=(info,i))
    p.start()
    [root@Rsyslog day7]# python multiprocess-1.py
    [1]
    [0]
    [2]
    [3]
    [4]
    [5]
    [7]
    [6]
    [8]
    [9]
    [root@Rsyslog day7]#


    #如果用多线程,它因为是共享内存的,所以会出来很多
    p = threading.Thread(target=aa,args=(info,i))
    p.start()


    [root@Rsyslog day7]# python multiprocess-1.py
    [0]
    [0, 1]
    [0, 1, 2]
    [0, 1, 2, 3]
    [0, 1, 2, 3, 4]
    [0, 1, 2, 3, 4, 5]
    [0, 1, 2, 3, 4, 5, 6]
    [0, 1, 2, 3, 4, 5, 6, 7]
    [0, 1, 2, 3, 4, 5, 6, 7, 8]
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

    --------------------------------------------------
    问题:如果通过Queue解决上面的问题
    下面例子中,主进程和子进程的Q是不一样的结果,相互独立
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    import sys,time,re,os
    from multiprocessing import Pool,Process,Queue

    def f(q,n):
    q.put([n,'hello'])
    print q.get()

    q = Queue()
    for i in range(5):
    p = Process(target=f,args=(q,i))
    p.start()
    p.join()


    [root@Rsyslog day7]# python multi-2.py
    [0, 'hello']
    [1, 'hello']
    [2, 'hello']
    [3, 'hello']
    [4, 'hello']
    [root@Rsyslog day7]#
    -------------------------------------------------------

    第六课 进程间的内存同步方法manager 29minutes

    进程里的会直接Copy外部变量的数据,所以子进程内部和外面的就是结果是不一样

    共享内存方法:Value,Array-------需要练习
    只能进行数字和列表共享
    from multiprocessing import Array,Value

    def run(v,a):
    v.value = 3.1415926
    for i in range(len(a)):
    a[i] = -a[i]

    if __name__ == '__main__':
    v = Value('d',0.0)
    a = Array('i',range(10))

    p = Process(target=run,args=(v,a))
    p.start()
    p.join()

    print v.value
    print a[:]
    结果:
    3.1415926
    [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

    manager 支持更多的共享
    可使用Manager方式更换数据
    def run(d,l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

    if __name__ == '__main__':
    man= Manager()
    d = man.Dict()
    l = man.list(range(10))
    p = Process(target=run,args=(d,l))
    p.start()
    p.join()

    print d
    print l
    结果:
    {0.25: None, 1: '1', '2': 2}
    [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]


    #Created on 2017年01月01日
    第七课 通过Pool产生多进程1 20minutes
    第8课 通过Pool产生多进程2 9minutes

    多进程数量和CPU核数相对即可,不可太多.

    def run(x):
    print x*x
    time.sleep(1)

    pool = Pool(processes=4)
    re_list = []
    # for i in range(10):
    # res = pool.apply_async(run, [i,]) #异步
    # #res = pool.apply(run, [i,]) #同步,基本不用
    # res.get()
    #上面For循环等同于以下命令
    print pool.map(run,range(10))
    -----------------------------------------------
    一个简单的多进程的池的练习
    [root@Rsyslog test_script]# cat process_test_ping.py
    #!/usr/bin/env python
    import multiprocessing
    import subprocess
    host_list = ['192.168.100.254','1.1.1.1','192.168.100.253','114.28.127.2','114.28.127.72','114.28.127.70','114.28.127.12','114.28.127.56','114.28.127.102']
    if len(host_list) > 30:
    process_number = 30
    else:
    process_number = len(host_list)
    def ping_host(ipaddr):
    if subprocess.call('ping -c1 -W 1 %s > /dev/null' % ipaddr, shell=True) == 0:
    print '%s is OK' % ipaddr
    else:
    print '%s is DOWN' % ipaddr
    pool = multiprocessing.Pool(processes=process_number)
    for ip in host_list:
    pool.apply_async(ping_host,(ip,))
    #pool.map(ping_host,host_list)
    pool.close()
    pool.join()
    [root@Rsyslog test_script]#

    #Created on 2017年01月03日
    第九课 开发审计堡垒机 56minutes

    主要用Paramiko源码下的demo.py文件

    第十课 开发审计堡垒机修改Paramiko源码记录操作 29minutes
    修改Demo下的一个文件

    第十一课 审计堡垒的安全控制 15minutes

    登录后自动启动脚本
    将脚本放到用户的环境变量中
    在/home/allan/.bashrc 用户环境变量
    将用户的脚本放到用户的环境变量中
    shellinabox -t 不用证书


    第十二课 select-Poll-epoll介绍 47minutes
    Select VS poll &epoll
    异步I/O模型
    Nginx可通过异步支持上万个并发

    ulimit -n 显示当前Shell终端同时最大打开多少个文件

    Select单进程多并发,文件描述符限制为1024
    Select目前基本不用,无太大并发可用

    Poll改进版:本质同Select,无文件描述符限制
    水平触发:以上两种将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用
    Select和poll的时候再次报告这些描述符,所以一般不会丢失,这就是水平触发。
    这种操作速度慢,且耗时,如果有一万个描述符,要轮循很久。

    Epoll:水平触发和边缘触发
    边缘触发:只告诉进程哪些文件描述符就绪,只说一次,如果没有采取行动,不会再次告知。

     

    #Created on 2017年01月03日
    第十三课 select代码实例分析 54minutes

    select异步例子,需要实例化操作。

    第十四课 审计作业 14minutes

    审计开发
    用户可登录和操作指定机器,机器组
    用户可通过不同的远程帐号登录远程主机:
    在选择主机后可选择收号登录,如Root或其他

    作业二:
    Select代码作业,加注释

    Server:

    #!/usr/bin/env python
    # encoding: utf-8
    
    #Created on 2017年1月11日
    #@author: Administrator
    
    #--------------select异步说明--------------------------
    import select
    import socket
    import Queue
    import sys
    
    def select_server():
        server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        #设定SOcket为非阻塞模式,当前允许连接,但会话需等待,不设定有连接时不允许连接
        server.setblocking(0)  
        address_port = ('127.0.0.1',9999)
        print >>sys.stderr,'starting up on %s port %s' % address_port
        server.bind(address_port)
        server.listen(5)
        
        '''
        select()方法接收并监控3个通信列表, 第一个是所有的输入的data,就是指外部发过来的数据,
        第2个是监控和接收所有要发出去的data(outgoing data),第3个监控错误信息,接下来我们需
        要创建2个列表来包含输入和输出信息来传给select().
        '''
        inputs = [server]
        outputs = []
        '''
        所有客户端的进来的连接和数据将会被server的主循环程序放在上面的list中处理,我们现在的
        server端需要等待连接可写(writable)之后才能过来,然后接收数据并返回(因此不是在接收到
        数据之后就立刻返回),因为每个连接要把输入或输出的数据先缓存到queue里,然后再由select取
        出来再发出去。
        '''
        message_que = {}
        
        while inputs:
            #select将轮循三张表,将结果返回给前面的三个参数
            print '\nnow wait for connect.......'
            readable,writable,exception = select.select(inputs,outputs,inputs)
            
            
            for s in readable:  #轮循inputs表
                #第一种情况是S是Server本身,则开始接收客户端
                if s is server:  #如果目标是当前Server,就是socket对象
                    #接收连接
                    connection,address = s.accept()
                    #将连接对象放到表中
                    inputs.append(connection)
                    #将连接对象放到Queue中
                    message_que[connection] = Queue.Queue()
                #第二种情况:如果已连接,则接收数据
                else:  #如果已连接,则接收数据
                    data = s.recv(1024)
                    if data: #如果有数据
                        message_que[s].put(data) #将数据放到Queue中
                        if s not in outputs:   #如果s不在outputs中,则加入
                            outputs.append(s)
                    #第三种情况,客户端已断开,接收数据为空,则关闭客户端
                    else:
                        inputs.remove(s)  #删除inputs中的连接
                        if s in outputs:  #删除outputs中的连接
                            outputs.remove(s)
                        s.close()      #关闭连接
                        del message_que[s]   #删除Queue中的数据
            '''
        对于writable list中的socket,也有几种状态,如果这个客户端连接在跟它对应的queue里有数据,
        就把这个数据取出来再发回给这个客户端,否则就把这个连接从output list中移除,这样下一次循环
        select()调用时检测到outputs list中没有这个连接,那就会认为这个连接还处于非活动状态
            '''            
        
            for s in writable:
                try:  #取出Queue中的数据
                    next_msg = message_que[s].get_nowait()
                except Queue.Empty: #如果数据为空,则出现错误
                    #print >>sys.stderr,'output queue for',s.getpeername(),'is empty'
                    outputs.remove(s)     #删除putputs中的连接       
                else:  #如果正常取出,则发送给客户端
                    #print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
                    s.send(next_msg.upper()+'  =====> from server')
                    
            for s in exception:  #如果出现错误
                inputs.remove(s)  #删除inputs表的项
                if s in outputs:
                    outputs.remove(s) #删除outputs表的项
                s.close()    #关闭连接
                del message_que[s]   #删除Queue中的项
                
    if __name__ == '__main__':
        select_server()    
    

     Client:

    #!/usr/bin/env python
    # encoding: utf-8
    
    #Created on 2017年1月11日
    #@author: Administrator
    
    
    import socket
    import sys
    
    #开两个Socket连接
    socks  = [socket.socket(socket.AF_INET,socket.SOCK_STREAM),
              socket.socket(socket.AF_INET,socket.SOCK_STREAM)]
    address_port = ('127.0.0.1',9999)
    #定义发送的信息
    message = ['helle,this is first data',
               'helle,this is second data',
               'helle,this is thrid data']
    
    #通过迭代方式打开两个Socket进程
    for s in socks:
        try:
            s.connect(address_port)
        except:
            print 'error'
    
    #迭代发送信息
    for msg in message:
        #发送
        for s in socks:
            s.send(msg)
            print >>sys.stderr,'%s sending "%s" '%(s.getsockname(),msg)
        #接收
        for s in socks:
            data = s.recv(1024)
            print >>sys.stderr,'%s recevid "%s"' % (s.getsockname(),data)
            if not data:
                print >>sys.stderr,'closeing socket',s.getsockname()
    

      

  • 相关阅读:
    C++ new 解析重载 .
    __cdecl,__fastcall, __stdcall 什么区别? .
    C++构造函数调用顺序
    用gdb调试core dump文件
    placement new(转)
    [精华] 跟我一起写 Makefile
    使用 GDB 调试多进程程序
    第37条:避免对函数中继承得来的默认参数值进行重定义
    程序只运行一个是实例 .
    南通SEO:单页的SEO元素
  • 原文地址:https://www.cnblogs.com/syother/p/6738665.html
Copyright © 2011-2022 走看看