zoukankan      html  css  js  c++  java
  • 线程&进程&协程

    线程

    线程是应用程序中工作的最小单元,它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。Threading用于线程相关操作

    使用多线程和单线程的运行速度对比

    import threading
    import time
    
    
    def run(n):
    	print("task", n)
    	time.sleep(2)
    
    
    t1 = threading.Thread(target=run, args=('t1',))
    t2 = threading.Thread(target=run, args=('t2',))
    # t1.start()
    # t2.start()
    run(1)
    run(2)
    
    • start 线程准备就绪,等待CPU调度
    • setName 为线程设置名称
    • getName 获取线程名称
    • setDaemon 设置为后台线程或前台线程(默认)如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
      如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
    • join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
    • run 线程被cpu调度后自动执行线程对象的run方法
      多线程例子
    import threading
    import time
    
    
    def run(n):
    	print("running task", n)
    	time.sleep(2)
    	print("task done", n)
    
    
    start_time = time.time()
    t_objs = []  # 存线程实例
    for i in range(50):
    	t = threading.Thread(target=run, args=("t-%s" % i,))
    	t_objs.append(t)
    	t.start()
    
    for t in t_objs:  # 循环线程实例列表,等待所有线程执行完毕
    	t.join()
    
    print("all thread has finished")
    print("cost:", time.time() - start_time)
    

    守护线程

    import threading
    import time
    
    
    def run(n):
    	print("running task", n)
    	time.sleep(2)
    	print("task done", n)
    
    
    start_time = time.time()
    t_objs = []
    for i in range(50):
    	t = threading.Thread(target=run, args=("t-%s" % i,))
    	t.setDaemon(True)  # 把当前线程设置为守护线程
    	t_objs.append(t)   # 为了不阻塞后面线程的启动,不在这里join
    	t.start()
    
    print("all thread has finished")
    print("cost:", time.time() - start_time)
    

    线程锁(互斥锁Mutex)

    由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁即:
    同一时刻允许一个线程执行操作。

    import time
    import threading
    
    
    def run(n):
            global num  # 在每个线程中都获取这个全局变
            time.sleep(0.5)
            num += 1  # 对此公共变量进行-1操作
            print("num:", num)
            
    
    num = 0  # 设定一个共享变量
    thread_list = []
    for i in range(100):
        t = threading.Thread(target=run, args=("t-%s" % i,))
        t.start()
        thread_list.append(t)
    for t in thread_list:  # 等待所有线程执行完毕
        t.join()
    
    print('final num:', num)
    
    

    正常来讲,这个num结果应该是0,Ubuntu上结果有时不为0,在python2.7下。python3.0+上可能自动加上锁

    import time
    import threading
    
    
    def addNum():
    	global num  # 在每个线程中都获取这个全局变量
    	print('--get num:', num)
    	time.sleep(1)
    	lock.acquire()  # 修改数据前加锁
    	num -= 1  # 对此公共变量进行-1操作
    	lock.release()  # 修改后释放
    
    
    num = 100  # 设定一个共享变量
    thread_list = []
    lock = threading.Lock()  # 生成全局锁
    for i in range(num):
    	t = threading.Thread(target=addNum)
    	t.start()
    	thread_list.append(t)
    
    for t in thread_list:  # 等待所有线程执行完毕
    	t.join()
    
    print('final num:', num)
    

    RLOCK(递归锁)

    大锁加小锁

    import threading,time
     
    def run1():
        print("grab the first part data")
        lock.acquire()
        global num
        num +=1
        lock.release()
        return num
    def run2():
        print("grab the second part data")
        lock.acquire()
        global  num2
        num2+=1
        lock.release()
        return num2
    def run3():
        lock.acquire()
        res = run1()
        print('--------between run1 and run2-----')
        res2 = run2()
        lock.release()
        print(res,res2)
     
     
    if __name__ == '__main__':
     
        num,num2 = 0,0
        lock = threading.RLock()
        for i in range(10):
            t = threading.Thread(target=run3)
            t.start()
     
    while threading.active_count() != 1:
        print(threading.active_count())
    else:
        print('----all threads done---')
        print(num,num2)
    

    信号量(Semaphone)

    互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

    port threading,time
     
    def run(n):
        semaphore.acquire()
        time.sleep(1)
        print("run the thread: %s
    " %n)
        semaphore.release()
     
    if __name__ == '__main__':
     
        num= 0
        semaphore  = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
        for i in range(20):
            t = threading.Thread(target=run,args=(i,))
            t.start()
     
    while threading.active_count() != 1:
        pass #print threading.active_count()
    else:
        print('----all threads done---')
        print(num)
    
    

    Timer

    定时器,指定n秒后执行某操作

    from threading import Timer
     
    def hello():
        print("hello, world")
     
    t = Timer(1, hello)
    t.start()  # after 1 seconds, "hello, world" will be printed
    

    Events(事件)

    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

    • clear:将“Flag”设置为False
    • set:将“Flag”设置为True
    import threading,time
    import random
    def light():
        if not event.isSet():
            event.set() #wait就不阻塞 #绿灯状态
        count = 0
        while True:
            if count < 10:
                print('33[42;1m--green light on---33[0m')
            elif count <13:
                print('33[43;1m--yellow light on---33[0m')
            elif count <20:
                if event.isSet():
                    event.clear()
                print('33[41;1m--red light on---33[0m')
            else:
                count = 0
                event.set() #打开绿灯
            time.sleep(1)
            count +=1
    def car(n):
        while 1:
            time.sleep(random.randrange(10))
            if  event.isSet(): #绿灯
                print("car [%s] is running.." % n)
            else:
                print("car [%s] is waiting for the red light.." %n)
    if __name__ == '__main__':
        event = threading.Event()
        Light = threading.Thread(target=light)
        Light.start()
        for i in range(3):
            t = threading.Thread(target=car,args=(i,))
            t.start()
    

    queue(队列)

    • queue.Queue 先入先出
    • queue.LifoQueue 后入先出
    • queue.PriorityQueue 存储数据时可设置优先级的队列
    q = queue.Queue()
    
    # q = queue.LifoQueue
    # q = queue.PriorityQueue
    q.put(2)
    print(q.get())
    # print(q.get())  # 卡住
    # print(q.get(timeout=1))  # 设置等待时间
    #  异常queue.Empty
    print(q.get(block=False))  # 不卡
    

    maxiside

    import queue
    
    q = queue.Queue(maxsize=3)
    q.put(1)
    q.put(2)
    q.put(2)
    q.put(3)  # 卡住
    
    

    queue.PriorityQueue

    import queue
    q = queue.PriorityQueue()
    
    q.put((-1, "chenronghua"))
    q.put((3, "hangyang"))
    q.put((10, "alex"))
    
    print(q.get())
    print(q.get())
    print(q.get())
    

    详细点击
    http://www.cnblogs.com/alex3714/articles/5230609.html

    队列的作用:
    解耦、提高效率

    队列和列表的区别:
    队列取出一块硬盘,队列里自动减少一块。列表相当于复制一块。

    子进程相对于父进程,两个进程相互独立,分别属于两块内存空间

    生产者消费者模型

    什么是生产者消费者模式?
    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    import time,random
    import queue,threading
    q = queue.Queue()
    def Producer(name):
      count = 0
      while count <20:
        time.sleep(random.randrange(3))
        q.put(count)
        print('Producer %s has produced %s baozi..' %(name, count))
        count +=1
    def Consumer(name):
      count = 0
      while count <20:
        time.sleep(random.randrange(4))
        if not q.empty():
            data = q.get()
            print(data)
            print('33[32;1mConsumer %s has eat %s baozi...33[0m' %(name, data))
        else:
            print("-----no baozi anymore----")
        count +=1
    p1 = threading.Thread(target=Producer, args=('A',))
    c1 = threading.Thread(target=Consumer, args=('B',))
    p1.start()
    c1.start()
    

    线程与进程的区别

    • 线程是执行的指令集, 进程是资源的集合
    • 线程共享内存空间,进程的内存是独立的
    • 同一个进程的线程之间可以直接通信,两个进程想要通信,必须通过一个中间代理来实现
    • 创建新线程很简单(快), 创建新进程需要对其父进程进行一次克隆(慢)。
    • 一个线程可以控制和操作同一个进程里的其他线程,但是进程只能操作子进程
    • 对主线程的更改(取消、优先级变更等)可能会影响进程的其他线程的行为;对父进程的更改不会影响子进程。

    既生进程何生线程

    进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。很多人就不理解了,既然进程这么优秀,为什么还要线程呢?其实,仔细观察就会发现进程还是有很多缺陷的,主要体现在两点上:

    • 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。

    • 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。

    多进程的应用场景

    • io 操作不占用cpu
    • 计算占用cpu

    python多线程,不适合cpu密集操作型的任务,适合io操作密集型的任务

    from multiprocessing import Process
    import os
    
    
    def info(title):
    	print(title)
    	print('module name:', __name__)
    	print('parent process:', os.getppid())
    	print('process id:', os.getpid())
    	# print("
    
    ")
    
    
    def f(name):
    	info('33[31;1m function f33[0m')
    	print('hello', name)
    
    
    if __name__ == '__main__':
    	info('33[32;1m main process line33[0m')
    	p = Process(target=f, args=('bob',))
    	p.start()
    	p.join()
    

    进程之间的通讯

    不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

    • Queues
    from multiprocessing import Process
    from multiprocessing import Queue
    import queue
    
    
    def f(q):
    	q.put([42, 'hello'])
    
    
    if __name__ == '__main__':
    	# q = queue.Queue()  # 线程queue
    	q = Queue()
    	p = Process(target=f, args=(q,))  # 复制了一份
    	p.start()
    	print(q.get())
    	p.join()
    
    • Pipes
    from multiprocessing import Process
    from multiprocessing import Pipe
    
    
    def f(conn):
    	conn.send([42, None, 'hello'])
    	print("from parents:", conn.recv())
    	conn.close()
    
    
    if __name__ == '__main__':
    	parent_conn, child_conn = Pipe()
    	p = Process(target=f, args=(child_conn,))
    	p.start()
    	print(parent_conn.recv())  # prints "[42, None, 'hello']"
    	parent_conn.send("hi son")
    	p.join()
    
    • Managers
    from multiprocessing import Process, Manager
    import os
    
    
    def f(d, list_1):
    	d[os.getpid()] = os.getpid()
    	list_1.append(os.getpid())
    	print(list_1)
    
    
    if __name__ == '__main__':
    	with Manager() as manager:
    		d = manager.dict()  # 生成一个字典,可在多个进程间共享和传递
    		list_1 = manager.list(range(3))  # 生成一个列表,可在多个进程间共享和传递
    		p_list = []
    		for i in range(10):
    			p = Process(target=f, args=(d, list_1))
    			p.start()
    			p_list.append(p)
    		for res in p_list:  # 等待结果
    			res.join()
    		print(d)
    		print(list_1)
    

    进程池

    进程池的作用:限制同一时间使用的进程数量。

    from multiprocessing import Process
    from multiprocessing import Pool
    import time
    import os
    
    
    def Foo(i):
    	time.sleep(2)
    	print("in process", os.getpid())
    	return i + 100
    
    
    def Bar(arg):
    	print('-->exec done:', arg, os.getpid())
    
    
    # print(__name__)
    if __name__ == '__main__': 
    	pool = Pool(processes=5)  # 允许进程池同时放入5个进程
    	print("主进程", os.getpid())
    	for i in range(10):   # 启动十个线程
    		pool.apply_async(func=Foo, args=(i,), callback=Bar)
    		# pool.apply(func=Foo, args=(i,))  # 串行
    		# pool.apply_async(func=Foo, args=(i,))  # 并行
    
    	pool.close() # 必须先close()后join()
    	pool.join()  # 进程池中进程执行完P毕后再关闭,如果注释,那么程序直接关闭。
    	print('end')
    

    进程同步&进程锁

    进程锁存在的意义是,因为多进程共享同一个屏幕,控制屏幕打印的时候不乱。

    from multiprocessing import Lock
    from multiprocessing import Process
    
    
    def f(l, i):
    	l.acquire()
    	print('hello world', i)
    	l.release()
    
    
    if __name__ == '__main__':
    	lock = Lock()
    	for num in range(10):
    		Process(target=f, args=(lock, num)).start()
    

    协程(Conroutine)

    协程是一种用户态的轻量级线程。可以在单线程下实现并发

    线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
    协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
    协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程。

    协程的好处

    • 无需线程上下文切换的开销
    • 无需原子操作锁定及同步的开销
    • "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
    • 方便切换控制流,简化编程模型
    • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理

    缺点:

    • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
    • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

    符合什么条件就能称之为协程?

    • 必须在只有一个单线程里实现并发
    • 修改共享数据不需加锁
    • 用户程序里自己保存多个控制流的上下文栈
    • 一个协程遇到IO操作自动切换到其它协程

    yeild协程例子

    def consumer(name):
    	print("--->starting eating baozi...")
    	while True:
    		new_baozi = yield
    		print("[%s] is eating baozi %s" % (name, new_baozi))
    
    
    def producer():
    	r = con.__next__()
    	r = con2.__next__()
    	n = 0
    	while n < 5:
    		n += 1
    		con.send(n)
    		con2.send(n)
    		print("33[32;1m[producer]33[0m is making baozi %s" % n)
    
    
    if __name__ == '__main__':
    	con = consumer("c1")
    	con2 = consumer("c2")
    	p = producer()
    

    Gevent

    import gevent
     
    def func1():
        print('33[31;1m李闯在跟海涛搞...33[0m')
        gevent.sleep(2)
        print('33[31;1m李闯又回去跟继续跟海涛搞...33[0m')
     
    def func2():
        print('33[32;1m李闯切换到了跟海龙搞...33[0m')
        gevent.sleep(1)
        print('33[32;1m李闯搞完了海涛,回来继续跟海龙搞...33[0m')
     
     
    gevent.joinall([
        gevent.spawn(func1),
        gevent.spawn(func2),
        #gevent.spawn(func3),
    ])
    

    Gevent版socket

    import sys
    import socket
    import time
    import gevent
    
    from gevent import socket, monkey
    
    monkey.patch_all()
    
    
    def server(port):
    	s = socket.socket()
    	s.bind(('0.0.0.0', port))
    	s.listen(500)
    	while True:
    		cli, addr = s.accept()
    		gevent.spawn(handle_request, cli)
    
    
    def handle_request(conn):
    	try:
    		while True:
    			data = conn.recv(1024)
    			print("recv:", data)
    			conn.send(data)
    			if not data:
    				conn.shutdown(socket.SHUT_WR)
    
    	except Exception as  ex:
    		print(ex)
    	finally:
    		conn.close()
    
    
    if __name__ == '__main__':
    	server(8001)
    

    RSA-非对称密钥验证

    • 公钥 public key
    • 私钥 private key
    import paramiko
    
    
    private_key = paramiko.RSAKey.from_private_key_file('id_rsa')
    ssh = paramiko.SSHClient()
    # 允许连接不在.ssh/know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 连接服务器
    ssh.connect(hostname='192.168.31.102', port=22, username='root', pkey=private_key)
    # 执行命令
    stdin, stdout, stderr = ssh.exec_command('df')
    # 获取命令结果
    res = stdout.read()
    
    ssh.close()
    print(res.decode())
    
  • 相关阅读:
    hdu4472-Count
    Codeforces Beta Round #55 (Div. 2)
    优化素数表
    Codeforces Beta Round #49 (Div. 2)-D. Physical Education
    Codeforces Beta Round #49 (Div. 2)-C. Little Frog
    一些不知道的函数
    kmp算法详解
    STL之accumulate用法小结
    STL——list学习笔记
    maven打war包包含源文件-eclipse
  • 原文地址:https://www.cnblogs.com/Jason-lin/p/7887584.html
Copyright © 2011-2022 走看看