文件写入
def storFile(data,fileName,method='a'): with open(fileName,method,newline ='') as f: f.write(data) pass pass storFile('123', '1.txt')
文件读取
with open('1.txt','r') as f: print(f.read())
序列化操作
把内存中的数据变为可保存和共享,实现状态保存。cPickle使用C语言编写,效率高,优先使用。如果不存在则使用pickle。pickle使用dump和dumps实现序列化。
try: import cPickle as pickle except ImportError: import pickle d=dict(url='index.html',title='1',content='2') f=open('2.txt','wb') pickle.dump(d,f) f.close() print(pickle.dumps(d))
反序列化操作
使用load实现反序列化
try: import cPickle as pickle except ImportError: import pickle f=open('2.txt','rb') d=pickle.load(f) f.close() print(d)
多进程创建
多进程使用os的fork复制完全相同的进程,并对子进程返回0,对父进程返回子进程的pid。只在linux/unix中使用。
import os if __name__ == '__main__':
pid=os.fork() if pid<0: print('error pid') elif pid==0: print('child ,parent pid',os.getpid(),os.getppid()) else: print('parent pid,create child ',os.getpid,pid)
使用multiprocessing模块创建进程,使用start启动进程,使用join同步。
import os from multiprocessing import Process def run_proc(name): print('name ,child pid running',name,os.getpid()) if __name__ == '__main__': print('parent pid',os.getpid()) for i in range(5): p=Process(target=run_proc,args=(str(i),)) print('Process will start') p.start() p.join() print('end')
使用multiprocessing模块中的Pool限定进程数量
import os from multiprocessing import Process,Pool import random,time def run_proc(name): print('name ,child pid running ',name,os.getpid()) time.sleep(random.random()*10) print('name ,child pid running end',name,os.getpid()) if __name__ == '__main__': print('parent pid',os.getpid()) p=Pool(processes=3) for i in range(10): p.apply_async(run_proc,args=(i,)) print('wait') p.close() p.join() print('end')
进程间通信
Queue通信
适用多进程间通信,采用put和get方法。
import os from multiprocessing import Process,Queue import time,random def write_proc(q,urls): print('w processing ',os.getpid(),'is running') for u in urls: q.put(u) print('put :',u) time.sleep(random.random()) pass def read_proc(q): print('r processing ',os.getpid(),'is running') while(True): u=q.get(True) print('get:',u) pass if __name__ == '__main__': q=Queue() w1=Process(target=write_proc,args=(q,['u1','u2','u3'])) w2=Process(target=write_proc,args=(q,['u4','u5','u6'])) r1=Process(target=read_proc,args=(q,)) w1.start() w2.start() r1.start() w1.join() w2.join() r1.terminate() pass
Pipe通信
Pipe方法返回conn1和conn2,全双工模式下均可收发(Pipe方法中duplex参数控制),通过send和recv控制。
import os from multiprocessing import Process,Pipe import time,random def send_proc(p,urls): print('s processing ',os.getpid(),'is running') for u in urls: p.send(u) print('send :',u) time.sleep(random.random()) pass def receive_proc(p): print('r processing ',os.getpid(),'is running') while(True): u=p.recv() print('receive:',u) pass if __name__ == '__main__': p=Pipe() p1=Process(target=send_proc,args=(p[0],['u1','u2','u3'])) p2=Process(target=receive_proc,args=(p[1],)) p1.start() p2.start() p1.join() p2.terminate() pass
多线程
一点理解。使用threading模块创建多线程
import time,random,threading def run_proc(url): print('threading name',threading.current_thread().name) for u in url: print(threading.current_thread().name,'----->',u) time.sleep(random.random()) print('end ',threading.current_thread().name) pass if __name__ == '__main__': print('running :',threading.current_thread().name) w1=threading.Thread(target=run_proc,name='T1',args=(['u1','u2','u3'],)) w2=threading.Thread(target=run_proc,name='T2',args=(['u4','u5','u6'],)) w1.start() w2.start() w1.join() w2.join() print('end') pass
使用threading.Thread继承创建线程类:代码源:https://github.com/qiyeboy/SpiderBook
import random import threading import time class myThread(threading.Thread): def __init__(self,name,urls): threading.Thread.__init__(self,name=name) self.urls = urls def run(self): print('Current %s is running...' % threading.current_thread().name) for url in self.urls: print('%s ---->>> %s' % (threading.current_thread().name,url)) time.sleep(random.random()) print('%s ended.' % threading.current_thread().name) print('%s is running...' % threading.current_thread().name) t1 = myThread(name='Thread_1',urls=['url_1','url_2','url_3']) t2 = myThread(name='Thread_2',urls=['url_4','url_5','url_6']) t1.start() t2.start() t1.join() t2.join() print('%s ended.' % threading.current_thread().name)
线程同步
线程同步以保护数据,主要有Lock和RLock两种方案。参阅。另外,全局解释锁的存在,限制了线程资源访问,在CPU密集场合倾向使用多进程。对于IO密集型场合,使用多线程。
import threading mylock = threading.RLock() num=0 class myThread(threading.Thread): def __init__(self, name): threading.Thread.__init__(self,name=name) def run(self): global num while True: mylock.acquire() print( '%s locked, Number: %d'%(threading.current_thread().name, num)) if num>=100: mylock.release() print( '%s released, Number: %d'%(threading.current_thread().name, num)) break num+=1 print( '%s released, Number: %d'%(threading.current_thread().name, num)) mylock.release() if __name__== '__main__': thread1 = myThread('Thread_1') thread2 = myThread('Thread_2') thread1.start() thread2.start()
协程
from gevent import monkey; monkey.patch_all() import gevent import urllib.request as urllib2 def run_task(url): print('Visit --> %s' % url) try: response = urllib2.urlopen(url) data = response.read() print('%d bytes received from %s.' % (len(data), url)) except Exception: print(Exception) if __name__=='__main__': urls = ['https://github.com/','https://www.python.org/','http://www.cnblogs.com/'] greenlets = [gevent.spawn(run_task, url) for url in urls ] gevent.joinall(greenlets)
支持的池
from gevent import monkey monkey.patch_all() import urllib.request as urllib2 from gevent.pool import Pool def run_task(url): print('Visit --> %s' % url) try: response = urllib2.urlopen(url) data = response.read() print('%d bytes received from %s.' % (len(data), url)) except Exception: print(Exception) return 'url:%s --->finish'% url if __name__=='__main__': pool = Pool(2) urls = ['https://github.com/','https://www.python.org/','http://www.cnblogs.com/'] results = pool.map(run_task,urls) print(results)
分布式进程
创建服务进程(Windows)代码源
import queue as Queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support #任务个数 task_number = 10 #定义收发队列 task_queue = Queue.Queue(task_number) result_queue = Queue.Queue(task_number) def get_task(): return task_queue def get_result(): return result_queue # 创建类似的QueueManager: class QueueManager(BaseManager): pass def win_run(): #windows下绑定调用接口不能使用lambda,所以只能先定义函数再绑定 QueueManager.register('get_task_queue',callable = get_task) QueueManager.register('get_result_queue',callable = get_result) #绑定端口并设置验证口令,windows下需要填写ip地址,linux下不填默认为本地 manager = QueueManager(address = ('127.0.0.1',8001),authkey = b'qiye') #启动 manager.start() try: #通过网络获取任务队列和结果队列 task = manager.get_task_queue() result = manager.get_result_queue() #添加任务 for url in ["ImageUrl_"+str(i) for i in range(10)]: print('put task %s ...' %url) task.put(url) print('try get result...') for i in range(10): print('result is %s' %result.get(timeout=10)) except: print('Manager error') finally: #一定要关闭,否则会爆管道未关闭的错误 manager.shutdown() if __name__ == '__main__': #windows下多进程可能会有问题,添加这句可以缓解 freeze_support() win_run()
创建任务进程:python后续版本的修正:
#coding:utf-8 import time from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 实现第一步:使用QueueManager注册获取Queue的方法名称 QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 实现第二步:连接到服务器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证口令注意保持与服务进程设置的完全一致: m = QueueManager(address=(server_addr, 8001), authkey='qiye') # 从网络连接: m.connect() # 实现第三步:获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 实现第四步:从task队列取任务,并把结果写入result队列: while(not task.empty()): image_url = task.get(True,timeout=5) print('run task download %s...' % image_url) time.sleep(1) result.put('%s--->success'%image_url) # 处理结束: print('worker exit.')
创建Linux版本的服务进程:(未测试)
服务进程(taskManager.py)(linux版) import random,time,Queue from multiprocessing.managers import BaseManager #实现第一步:建立task_queue和result_queue,用来存放任务和结果 task_queue=Queue.Queue() result_queue=Queue.Queue() class Queuemanager(BaseManager): pass #实现第二步:把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象, # 将Queue对象在网络中暴露 Queuemanager.register('get_task_queue',callable=lambda:task_queue) Queuemanager.register('get_result_queue',callable=lambda:result_queue) #实现第三步:绑定端口8001,设置验证口令‘qiye’。这个相当于对象的初始化 manager=Queuemanager(address=('',8001),authkey='qiye') #实现第四步:启动管理,监听信息通道 manager.start() #实现第五步:通过管理实例的方法获得通过网络访问的Queue对象 task=manager.get_task_queue() result=manager.get_result_queue() #实现第六步:添加任务 for url in ["ImageUrl_"+str(i) for i in range(10)]: print 'put task %s ...' %url task.put(url) #获取返回结果 print 'try get result...' for i in range(10): print 'result is %s' %result.get(timeout=10) #关闭管理 manager.shutdown()
TCP编程
创建服务端
#coding:utf-8 import socket import threading import time def dealClient(sock, addr): #第四步:接收传来的数据,并发送给对方数据 print('Accept new connection from %s:%s...' % addr) sock.send(b'Hello,I am server!') while True: data = sock.recv(1024) time.sleep(1) if not data or data.decode('utf-8') == 'exit': break print('-->>%s!' % data.decode('utf-8')) sock.send(('Loop_Msg: %s!' % data.decode('utf-8')).encode('utf-8')) #第五步:关闭套接字 sock.close() print('Connection from %s:%s closed.' % addr) if __name__=="__main__": #第一步:创建一个基于IPv4和TCP协议的Socket # 套接字绑定的IP(127.0.0.1为本机ip)与端口 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('127.0.0.1', 999)) #第二步:监听连接 s.listen(5) print('Waiting for connection...') while True: # 第三步:接受一个新连接: sock, addr = s.accept() # 创建新线程来处理TCP连接: t = threading.Thread(target=dealClient, args=(sock, addr)) t.start()
创建客户端
#coding:utf-8 import socket #初始化Socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #连接目标的ip和端口 s.connect(('127.0.0.1', 999)) # 接收消息 print('-->>'+s.recv(1024).decode('utf-8')) # 发送消息 s.send(b'Hello,I am a client') print('-->>'+s.recv(1024).decode('utf-8')) s.send(b'exit') #关闭套接字 s.close()
UDP编程
服务端与客户端
import socket #创建Socket,绑定指定的ip和端口 #SOCK_DGRAM指定了这个Socket的类型是UDP。绑定端口和TCP一样。 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.bind(('127.0.0.1', 9999)) print('Bind UDP on 9999...') while True: # 直接发送数据和接收数据 data, addr = s.recvfrom(1024) print('Received from %s:%s.' % addr) s.sendto(b'Hello, %s!' % data, addr) import socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) for data in [b'Hello', b'World']: # 发送数据: s.sendto(data, ('127.0.0.1', 9999)) # 接收数据: print(s.recv(1024).decode('utf-8')) s.close()