#开进程的方法一:
# import time
# import random
# from multiprocessing import Process
# def piao(name):
# print('%s piaoing' %name)
# time.sleep(random.randrange(1,5))
# print('%s piao end' %name)
#
#
#
# p1=Process(target=piao,args=('egon',)) #必须加,号
# p2=Process(target=piao,args=('alex',))
# p3=Process(target=piao,args=('wupeqi',))
# p4=Process(target=piao,args=('yuanhao',))
#
# p1.start()
# p2.start()
# p3.start()
# p4.start()
# print('主线程')
#开进程的方法二:
# import time
# import random
# from multiprocessing import Process
#
#
# class Piao(Process):
# def __init__(self,name):
# super().__init__()
# self.name=name
# def run(self):
# print('%s piaoing' %self.name)
#
# time.sleep(random.randrange(1,5))
# print('%s piao end' %self.name)
#
# p1=Piao('egon')
# p2=Piao('alex')
# p3=Piao('wupeiqi')
# p4=Piao('yuanhao')
#
# p1.start() #start会自动调用run
# p2.start()
# p3.start()
# p4.start()
# print('主线程')
# import multiprocessing
#
# def worker(num):
# """thread worker function"""
# print('Worker:', num)
# return
#
# if __name__ == '__main__':
# jobs = []
# for i in range(5):
# p = multiprocessing.Process(target=worker, args=(i,))
# jobs.append(p)
# p.start()
# 创建线程
#
# Python提供两个模块进行多线程的操作,分别是thread和threading, 前者是比较低级的模块,用于更底层的操作,一般应用级别的开发不常用。
#
# 第一种方法是创建threading.Thread的子类,重写run方法。
# import time,threading
# class MyThread(threading.Thread):
# def run(self):
# for i in range(5):
# print('thread {},@number: {}'.format(self.name,i))
# time.sleep(1)
# def main():
# print('Start main threading')
# #创建三个线程
# threads = [MyThread() for i in range(3)]
# #启动三个线程
# for t in threads:
# t.start()
# print('End Main threading')
#
# if __name__ =='__main__':
# main()
# 线程合并(join方法)
#
# 主线程结束后,子线程还在运行,join方法使得主线程等到子线程结束 时才退出。
# import time,threading
# class MyThread(threading.Thread):
# def run(self):
# for i in range(5):
# print('thread {},@number: {}'.format(self.name,i))
# time.sleep(1)
# def main():
# print('Start main threading')
# #创建三个线程
# threads = [MyThread() for i in range(3)]
# #启动三个线程
# for t in threads:
# t.start()
# for t in threads:
# t.join()
# print('End Main threading')
# #一次让新创建的线程执行join
#
#
# if __name__ =='__main__':
# main()
# 线程同步与互斥锁
#
# 为了避免线程不同步造成是数据不同步,可以对资源进行加锁。 也就是访问资源的线程需要获得锁,才能访问。 threading模块正好提供了一个Lock功能
#
# mutex = threading.Lock()
# 在线程中获取锁
#
# mutex.acquire()
# 使用完后,释放锁
#
# mutex.release()
# 可重入锁
#
# 为了支持在同一线程中多次请求同一资源, python提供了可重入锁(RLock)。 RLock内部维护着一个Lock和一个counter变量, counter记录了acquire的次数,从而使得资源可以被多次require。 直到一个线程所有的acquire都被release,其他的线程才能获得资源。
#
#
# import time,threading
# # 创建RLock
# mutex = threading.RLock()
# class MyThread(threading.Thread):
# #线程内多次进入锁和释放锁
# def run(self):
# if mutex.acquire(2):
# print("thread {} get mutex".format(self.name))
# time.sleep(1)
# mutex.acquire()
# mutex.release()
# mutex.release()
# # for i in range(5):
# # print('thread {},@number: {}'.format(self.name,i))
# # time.sleep(1)
# def main():
# print('Start main threading')
# #创建三个线程
# threads = [MyThread() for i in range(3)]
# #启动三个线程
# for t in threads:
# t.start()
# for t in threads:
# t.join()
# print('End Main threading')
# #一次让新创建的线程执行join
#
#
# if __name__ =='__main__':
# main()
#
# 条件变量
#
# 实用锁可以达到线程同步,前面的互斥锁就是这种机制。更复杂的环境,需要针对锁进行一些条件判断。Python提供了Condition对象。它除了具有acquire和release方法之外,还提供了wait和notify方法。线程首先acquire一个条件变量锁。如果条件不足,则该线程wait,如果满足就执行线程,甚至可以notify其他线程。其他处于wait状态的线程接到通知后会重新判断条件。
#
# 条件变量可以看成不同的线程先后acquire获得锁,如果不满足条件,可以理解为被扔到一个(Lock或RLock)的waiting池。直达其他线程notify之后再重新判断条件。该模式常用于生成消费者模式:
# import time,threading,random
# queue = []
# con = threading.Condition()
#
# class Producer(threading.Thread):
# def run(self):
# while True:
# if con.acquire():
# if len(queue) > 100:
# con.wait()
# else:
# elem = random.randrange(100)
# queue.append(elem)
# print("Producer a elem {}, Now size is {}".format(elem,len(queue)))
# time.sleep(random.random())
# con.notify()
# con.release()
#
# class Consumer(threading.Thread):
# def run(self):
# while True:
# if con.acquire():
# if len(queue) < 0:
# con.wait()
# else:
# elem = queue.pop()
# print("Consumer a elem {},Now size is {}".format(elem,len(queue)))
# time.sleep(random.random())
# con.notify()
# con.release()
# def main():
# for i in range(3):
# Producer().start()
# for i in range(2):
# Consumer().start()
#
# if __name__ =='__main__':
# main()
# 队列
#
# 带锁的队列Queue。 创建10个元素的队列
# import time,threading,queue
# queue=queue.Queue(10)
#
# class MyThread(threading.Thread):
# def __init__(self,event):
# super(MyThread,self).__init__()
# self.event = event
#
# def run(self):
# print("thread {} is ready".format(self.name))
# self.event.wait()
# print("thread {} run".format(self.name))
# signal = threading.Event()
#
# def main():
# start = time.time()
# for i in range(3):
# t = MyThread(signal)
# t.start()
# time.sleep(3)
# print("after {}s".format(time.time()-start))
# signal.set()
#
# if __name__=="__main__":
# main()
#
# 后台线程
#
# 默认情况下,主线程退出之后,即使子线程没有join。那么主线程结束后, 子线程也依然会继续执行。如果希望主线程退出后, 其子线程也退出而不再执行,则需要设置子线程为后台线程。python提供了setDeamon方法。
#进程
# python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
#
# 类Process
#
# 创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]), target表示调用对象,args表示调用对象的位置参数元组。 kwargs表示调用对象的字典。name为别名。group实质上不使用。
#
# 方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。
#
# 属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。
#
# 例:创建函数并将其作为单个进程
# import multiprocessing,time
# def worker(interval):
# n = 5
# while n > 0:
# print("The time is {0}".format(time.time()))
# time.sleep(interval)
# n -= 1
# if __name__ == '__main__':
# p = multiprocessing.Process(target= worker, args=(3,),name='马大锤')
# p.start()
# print("p.pid",p.pid)
# print("p.name",p.name)
# #该函数返回当前线程是否存活状态,比较重要
# print("p.is_alive",p.is_alive())
# import multiprocessing,time
#
# def worker_1(interval):
# print("worker_1")
# time.sleep(interval)
# print("end worker_1")
#
# def worker_2(interval):
# print("worker_2")
# time.sleep(interval)
# print("end worker_2")
#
# def worker_3(interval):
# print("worker_3")
# time.sleep(interval)
# print("end worker_3")
#
# if __name__ == '__main__':
# p1 = multiprocessing.Process(target=worker_1,args=(2,))
# p2 = multiprocessing.Process(target=worker_2, args=(3,))
# p3 = multiprocessing.Process(target=worker_3, args=(4,))
#
# p1.start()
# p2.start()
# p3.start()
# #另外你还可以通过 cpu_count() 方法还有 active_children() 方法获取当前机器的 CPU 核心数量以及得到目前所有的运行的进程。
# print("The number of CPU is:" + str(multiprocessing.cpu_count()))
# for p in multiprocessing.active_children():
# print("child p.name:" + p.name + " p.id" + str(p.pid))
# print("END!!!!!!!!!!!!!!!!!!")
#例:将进程定义为类
# import multiprocessing,time
# class ClockProcess(multiprocessing.Process):
# def __init__(self,interval):
# multiprocessing.Process.__init__(self)
# self.interval = interval
#
# def run(self):
# n = 5
# while n > 0:
# print("the time is {0}".format(time.time()))
# time.sleep(self.interval)
# n -= 1
#
# if __name__ == '__main__':
# p = ClockProcess(3)
# p.start()
# 注:进程p调用start()时,自动调用run()
# import multiprocessing,time
# def worker(interval):
# print("work start:{0}".format(time.ctime()))
# time.sleep(interval)
# print('work end:{0}'.format(time.ctime()))
#
# if __name__ == '__main__':
# p = multiprocessing.Process(target= worker,args=(3,))
# p.daemon = True
# p.start()
# print('end!!!!!!!!1')
#注:因子进程设置了daemon属性,主进程结束,它们就随着结束了。
# #设置daemon执行完结束的方法,加join等待主进程执行结束再终止
# import multiprocessing,time
# def worker(interval):
# print("work start:{0}".format(time.ctime()))
# time.sleep(interval)
# print('work end:{0}'.format(time.ctime()))
#
# if __name__ == '__main__':
# p = multiprocessing.Process(target= worker,args=(3,))
# daemon默认设置为False 是意思不设置守护进程的意思
# p.daemon = True
# p.start()
# p.join()
#
# print('end!!!!!!!!')
#Lock
#当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
# 一、多线程同步
# 由于CPython的python解释器在单线程模式下执行,所以导致python的多线程在很多的时候并不能很好地发挥多核cpu的资源。大部分情况都推荐使用多进程。
# python的多线程的同步与其他语言基本相同,主要包含:
# Lock & RLock :用来确保多线程多共享资源的访问。
# Semaphore : 用来确保一定资源多线程访问时的上限,例如资源池。
# Event : 是最简单的线程间通信的方式,一个线程可以发送信号,其他的线程接收到信号后执行操作。
# 二、实例
# 1)Lock & RLock
# Lock对象的状态可以为locked和unlocked
# 使用acquire()设置为locked状态;
# 使用release()设置为unlocked状态。
# 如果当前的状态为unlocked,则acquire()会将状态改为locked然后立即返回。当状态为locked的时候,acquire()将被阻塞直到另一个线程中调用release()来将状态改为unlocked,然后acquire()才可以再次将状态置为locked。
# Lock.acquire(blocking=True, timeout=-1),blocking参数表示是否阻塞当前线程等待,timeout表示阻塞时的等待时间 。如果成功地获得lock,则acquire()函数返回True,否则返回False,timeout超时时如果还没有获得lock仍然返回False。
# import multiprocessing,sys
#
# def worker_with(lock,f):
# with lock:
# fs = open(f,'a+')
# n = 10
# while n > 1:
# fs.write("Lockd acquired via with
")
# n -= 1
# fs.close()
# def worker_no_with(lock,f):
# lock.acquire()
# try:
# fs = open(f,'a+')
# n = 10
# while n > 1:
# fs.write("Lockd acquired via with
")
# n -= 1
# fs.close()
# finally:
# lock.release()
#
# if __name__ == '__main__':
# lock = multiprocessing.Lock()
# f = "file.txt"
# w = multiprocessing.Process(target=worker_with,args=(lock,f))
# nw = multiprocessing.Process(target=worker_no_with,args=(lock,f))
# w.start()
# nw.start()
# print("end!!!!!!!!!!!!!!!!!!!!!!!")
#Semaphore
#Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。
# import multiprocessing,time
# def worker(s,i):
# s.acquire()
# print(multiprocessing.current_process().name + "acquire")
# time.sleep(i)
# print(multiprocessing.current_process().name + "release
")
# s.release()
#
# if __name__ == '__main__':
# s = multiprocessing.Semaphore(2)
# for i in range(5):
# p = multiprocessing.Process(target=worker,args=(s,i*2))
# p.start()
#Event
#Event用来实现进程间同步通信。
#Python线程event
# python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法wait、clear、set
#
# 事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为
# False,那么当程序执行
# event.wait
# 方法时就会阻塞,如果“Flag”值为True,那么event.wait
# 方法时便不再阻塞。
#
# clear:将“Flag”设置为False
# set:将“Flag”设置为True
#
# 用
# threading.Event
# 实现线程间通信
# 使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中,
# Event默认内置了一个标志,初始值为False。
# 一旦该线程通过wait()
# 方法进入等待状态,直到另一个线程调用该Event的set()
# 方法将内置标志设置为True时,
# 该Event会通知所有等待状态的线程恢复运行。
# import multiprocessing,time
# def wait_for_event(e):
# print("wait_for_event: starting")
# e.wait()
# print("wait_for_event: e.is_set()->" + str(e.is_set()))
# e.clear()
# print("wait_for_event: ending")
# print("wait_for_event: e.is_set()->" + str(e.is_set()))
#
# def wait_for_event_timeout(e,t):
# print("wait_for_event_timeout:starting")
# e.wait()
# print("wait_for_event_timeout:e.is_set" + str(e.is_set()))
# e.clear()
# print("wait_for_event_timeout:ending")
# print("wait_for_event_timeout:e.is_set" + str(e.is_set()))
#
# if __name__ == '__main__':
# e = multiprocessing.Event()
# w1 = multiprocessing.Process(name="block",
# target=wait_for_event,
# args=(e,))
# w2 = multiprocessing.Process(name="non_block",
# target=wait_for_event_timeout,
# args=(e,2))
#
# w1.start()
# w2.start()
#
# time.sleep(3)
#
# e.set()
# print("manin: event is set")
#Queue
# Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
#
# get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。Queue的一段示例代码:
# import multiprocessing
#
# def writer_proc(q):
# try:
# q.put(9,block = False)
# except:
# pass
# def reader_proc(q):
# try:
# print(q.get(block =False))
# except:
# pass
#
# if __name__ == '__main__':
# q = multiprocessing.Queue()
# writer = multiprocessing.Process(target=writer_proc,
# args=(q,))
# writer.start()
# reader = multiprocessing.Process(target=reader_proc,
# args=(q,))
# reader.start()
#
# reader.join()
# writer.join()
# Pipe
#
# Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
#
# send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。
# import multiprocessing,time
# def proc1(pipe):
# while True:
# for i in range(10):
# print("send: %s" %(i))
# pipe.send(i)
# time.sleep(1)
# def proc2(pipe):
# while True:
# print("proc2 rev",pipe.recv())
# time.sleep(1)
#
# def proc3(pipe):
# while True:
# print("proc3 rev",pipe.recv())
# time.sleep(1)
#
# if __name__ == '__main__':
# pipe = multiprocessing.Pipe()
# P1 = multiprocessing.Process(target=proc1,args=(pipe[0],))
# P2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
# P3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
#
# P1.start()
# P2.start()
# P3.start()
#
# P1.join()
# P2.join()
# P3.join()
# Pool
#
# 在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。 Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它
#例:使用进程池
# import multiprocessing,time
#
# def func(msg):
# print("msg:",msg)
# time.sleep(3)
# print("end")
#
# if __name__ == '__main__':
# pool = multiprocessing.Pool(processes=10)
# for i in range(4):
# msg = "hello %d" %(i)
# pool.apply_async(func,(msg,)) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
# #pool.apply(func, (msg,)) # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去(阻塞模式)
#
# print("Mark~ Mark~ -------------------------")
# pool.close()
# pool.join()
# print("Sub-process(es) done.")