zoukankan      html  css  js  c++  java
  • python网络爬虫(2)回顾Python编程

    文件写入

    def storFile(data,fileName,method='a'):
        with open(fileName,method,newline ='') as f:
            f.write(data)
            pass
        pass
    
    storFile('123', '1.txt')
    

    文件读取

    with open('1.txt','r') as f:
        print(f.read())
    

    序列化操作

    把内存中的数据变为可保存和共享,实现状态保存。cPickle使用C语言编写,效率高,优先使用。如果不存在则使用pickle。pickle使用dump和dumps实现序列化。

    try:
        import cPickle as pickle
    except ImportError:
        import pickle
    d=dict(url='index.html',title='1',content='2')
    f=open('2.txt','wb')
    pickle.dump(d,f)
    f.close()
    print(pickle.dumps(d))
    

    反序列化操作

    使用load实现反序列化

    try:
        import cPickle as pickle
    except ImportError:
        import pickle
    f=open('2.txt','rb')
    d=pickle.load(f)
    f.close()
    print(d)
    

    多进程创建

    多进程使用os的fork复制完全相同的进程,并对子进程返回0,对父进程返回子进程的pid。只在linux/unix中使用。

    import os
    if __name__ == '__main__':
      pid=os.fork()   if pid<0:    print('error pid')   elif pid==0:    print('child ,parent pid',os.getpid(),os.getppid())   else:    print('parent pid,create child ',os.getpid,pid)

    使用multiprocessing模块创建进程,使用start启动进程,使用join同步。

    import os
    from multiprocessing import Process
    def run_proc(name):
        print('name ,child pid   running',name,os.getpid())
    if __name__ == '__main__':
        print('parent pid',os.getpid())
        for i in range(5):
            p=Process(target=run_proc,args=(str(i),))
            print('Process will start')
            p.start()
        p.join()
        print('end')
    

    使用multiprocessing模块中的Pool限定进程数量

    import os
    from multiprocessing import Process,Pool
    import random,time
    def run_proc(name):
        print('name ,child pid   running ',name,os.getpid())
        time.sleep(random.random()*10)
        print('name ,child pid   running end',name,os.getpid())
    if __name__ == '__main__':
        print('parent pid',os.getpid())
        p=Pool(processes=3)
        for i in range(10):
            p.apply_async(run_proc,args=(i,))
        print('wait')
        p.close()
        p.join()
        print('end')
    

    进程间通信

    Queue通信

    适用多进程间通信,采用put和get方法。

    import os
    from multiprocessing import Process,Queue
    import time,random
    def write_proc(q,urls):
        print('w processing ',os.getpid(),'is running')
        for u in urls:
            q.put(u)
            print('put :',u)
            time.sleep(random.random())
        pass
    def read_proc(q):
        print('r processing ',os.getpid(),'is running')
        while(True):
            u=q.get(True)
            print('get:',u)
        pass
    
    if __name__ == '__main__':
        q=Queue()
        w1=Process(target=write_proc,args=(q,['u1','u2','u3']))
        w2=Process(target=write_proc,args=(q,['u4','u5','u6']))
        r1=Process(target=read_proc,args=(q,))
        w1.start()
        w2.start()
        r1.start()
        w1.join()
        w2.join()
        r1.terminate()
        pass
    

    Pipe通信

    Pipe方法返回conn1和conn2,全双工模式下均可收发(Pipe方法中duplex参数控制),通过send和recv控制。

    import os
    from multiprocessing import Process,Pipe
    import time,random
    def send_proc(p,urls):
        print('s processing ',os.getpid(),'is running')
        for u in urls:
            p.send(u)
            print('send :',u)
            time.sleep(random.random())
        pass
    def receive_proc(p):
        print('r processing ',os.getpid(),'is running')
        while(True):
            u=p.recv()
            print('receive:',u)
        pass
    
    if __name__ == '__main__':
        p=Pipe()
        p1=Process(target=send_proc,args=(p[0],['u1','u2','u3']))
        p2=Process(target=receive_proc,args=(p[1],))
        p1.start()
        p2.start()
    
        p1.join()
        p2.terminate()
        pass
    

    多线程

    一点理解。使用threading模块创建多线程

    import time,random,threading
    
    def run_proc(url):
        print('threading name',threading.current_thread().name)
        for u in url:
            print(threading.current_thread().name,'----->',u)
            time.sleep(random.random())
        print('end ',threading.current_thread().name)
        pass
    
    if __name__ == '__main__':
        print('running :',threading.current_thread().name)
        w1=threading.Thread(target=run_proc,name='T1',args=(['u1','u2','u3'],))
        w2=threading.Thread(target=run_proc,name='T2',args=(['u4','u5','u6'],))
        w1.start()
        w2.start()
        w1.join()
        w2.join()
        print('end')
        pass
    

    使用threading.Thread继承创建线程类:代码源:https://github.com/qiyeboy/SpiderBook

    import random
    import threading
    import time
    class myThread(threading.Thread):
        def __init__(self,name,urls):
            threading.Thread.__init__(self,name=name)
            self.urls = urls
    
        def run(self):
            print('Current %s is running...' % threading.current_thread().name)
            for url in self.urls:
                    print('%s ---->>> %s' % (threading.current_thread().name,url))
                    time.sleep(random.random())
            print('%s ended.' % threading.current_thread().name)
            
    print('%s is running...' % threading.current_thread().name)
    t1 = myThread(name='Thread_1',urls=['url_1','url_2','url_3'])
    t2 = myThread(name='Thread_2',urls=['url_4','url_5','url_6'])
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('%s ended.' % threading.current_thread().name)
    

    线程同步

    线程同步以保护数据,主要有Lock和RLock两种方案。参阅。另外,全局解释锁的存在,限制了线程资源访问,在CPU密集场合倾向使用多进程。对于IO密集型场合,使用多线程。

    import threading
    mylock = threading.RLock()
    num=0
    class myThread(threading.Thread):
        def __init__(self, name):
            threading.Thread.__init__(self,name=name)
    
        def run(self):
            global num
            while True:
                mylock.acquire()
                print( '%s locked, Number: %d'%(threading.current_thread().name, num))
                if num>=100:
                    mylock.release()
                    print( '%s released, Number: %d'%(threading.current_thread().name, num))
                    break
                num+=1
                print( '%s released, Number: %d'%(threading.current_thread().name, num))
                mylock.release()
    
    if __name__== '__main__':
        thread1 = myThread('Thread_1')
        thread2 = myThread('Thread_2')
        thread1.start()
        thread2.start()
    

    协程 

    from gevent import monkey; monkey.patch_all()
    import gevent
    import urllib.request as urllib2
    
    def run_task(url):
        print('Visit --> %s' % url)
        try:
            response = urllib2.urlopen(url)
            data = response.read()
            print('%d bytes received from %s.' % (len(data), url))
        except Exception:
            print(Exception)
    if __name__=='__main__':
        urls = ['https://github.com/','https://www.python.org/','http://www.cnblogs.com/']
        greenlets = [gevent.spawn(run_task, url) for url in urls  ]
        gevent.joinall(greenlets)
    

    支持的池

    from gevent import monkey
    monkey.patch_all()
    import urllib.request as urllib2
    from gevent.pool import Pool
    def run_task(url):
        print('Visit --> %s' % url)
        try:
            response = urllib2.urlopen(url)
            data = response.read()
            print('%d bytes received from %s.' % (len(data), url))
        except Exception:
            print(Exception)
        return 'url:%s --->finish'% url
    
    if __name__=='__main__':
        pool = Pool(2)
        urls = ['https://github.com/','https://www.python.org/','http://www.cnblogs.com/']
        results = pool.map(run_task,urls)
        print(results)
    

    分布式进程

     创建服务进程(Windows)代码源

    import queue as Queue
    from multiprocessing.managers import BaseManager
    from multiprocessing import freeze_support
    #任务个数
    task_number = 10
    #定义收发队列
    task_queue = Queue.Queue(task_number)
    result_queue = Queue.Queue(task_number)
    def get_task():
        return task_queue
    def get_result():
         return result_queue
    # 创建类似的QueueManager:
    class QueueManager(BaseManager):
        pass
    def win_run():
        #windows下绑定调用接口不能使用lambda,所以只能先定义函数再绑定
        QueueManager.register('get_task_queue',callable = get_task)
        QueueManager.register('get_result_queue',callable = get_result)
        #绑定端口并设置验证口令,windows下需要填写ip地址,linux下不填默认为本地
        manager = QueueManager(address = ('127.0.0.1',8001),authkey = b'qiye')
        #启动
        manager.start()
        try:
            #通过网络获取任务队列和结果队列
            task = manager.get_task_queue()
            result = manager.get_result_queue()
            #添加任务
            for url in ["ImageUrl_"+str(i) for i in range(10)]:
                print('put task %s ...' %url)
                task.put(url)
            print('try get result...')
            for i in range(10):
                print('result is %s' %result.get(timeout=10))
        except:
            print('Manager error')
        finally:
            #一定要关闭,否则会爆管道未关闭的错误
            manager.shutdown()
    
    if __name__ == '__main__':
        #windows下多进程可能会有问题,添加这句可以缓解
        freeze_support()
        win_run()
    

    创建任务进程:python后续版本的修正

    #coding:utf-8
    import time
    from multiprocessing.managers import BaseManager
    # 创建类似的QueueManager:
    class QueueManager(BaseManager):
        pass
    # 实现第一步:使用QueueManager注册获取Queue的方法名称
    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')
    # 实现第二步:连接到服务器:
    server_addr = '127.0.0.1'
    print('Connect to server %s...' % server_addr)
    # 端口和验证口令注意保持与服务进程设置的完全一致:
    m = QueueManager(address=(server_addr, 8001), authkey='qiye')
    # 从网络连接:
    m.connect()
    # 实现第三步:获取Queue的对象:
    task = m.get_task_queue()
    result = m.get_result_queue()
    # 实现第四步:从task队列取任务,并把结果写入result队列:
    while(not task.empty()):
            image_url = task.get(True,timeout=5)
            print('run task download %s...' % image_url)
            time.sleep(1)
            result.put('%s--->success'%image_url)
    
    # 处理结束:
    print('worker exit.')
    

    创建Linux版本的服务进程:(未测试)

    服务进程(taskManager.py)(linux版)
    
    import random,time,Queue
    from multiprocessing.managers import BaseManager
    #实现第一步:建立task_queue和result_queue,用来存放任务和结果
    task_queue=Queue.Queue()
    result_queue=Queue.Queue()
    
    class Queuemanager(BaseManager):
        pass
    #实现第二步:把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象,
    # 将Queue对象在网络中暴露
    Queuemanager.register('get_task_queue',callable=lambda:task_queue)
    Queuemanager.register('get_result_queue',callable=lambda:result_queue)
    
    #实现第三步:绑定端口8001,设置验证口令‘qiye’。这个相当于对象的初始化
    manager=Queuemanager(address=('',8001),authkey='qiye')
    
    #实现第四步:启动管理,监听信息通道
    manager.start()
    
    #实现第五步:通过管理实例的方法获得通过网络访问的Queue对象
    task=manager.get_task_queue()
    result=manager.get_result_queue()
    
    #实现第六步:添加任务
    for url in ["ImageUrl_"+str(i) for i in range(10)]:
        print 'put task %s ...' %url
        task.put(url) 
    #获取返回结果
    print 'try get result...'
    for i in range(10):
        print 'result is %s' %result.get(timeout=10)
    #关闭管理
    manager.shutdown()
    

    TCP编程

    创建服务端

    #coding:utf-8
    import socket
    import threading
    import time
    def dealClient(sock, addr):
        #第四步:接收传来的数据,并发送给对方数据
        print('Accept new connection from %s:%s...' % addr)
        sock.send(b'Hello,I am server!')
        while True:
            data = sock.recv(1024)
            time.sleep(1)
            if not data or data.decode('utf-8') == 'exit':
                break
            print('-->>%s!' % data.decode('utf-8'))
            sock.send(('Loop_Msg: %s!' % data.decode('utf-8')).encode('utf-8'))
        #第五步:关闭套接字
        sock.close()
        print('Connection from %s:%s closed.' % addr)
    if __name__=="__main__":
        #第一步:创建一个基于IPv4和TCP协议的Socket
        # 套接字绑定的IP(127.0.0.1为本机ip)与端口
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind(('127.0.0.1', 999))
        #第二步:监听连接
        s.listen(5)
        print('Waiting for connection...')
        while True:
            # 第三步:接受一个新连接:
            sock, addr = s.accept()
            # 创建新线程来处理TCP连接:
            t = threading.Thread(target=dealClient, args=(sock, addr))
            t.start()
    

    创建客户端

    #coding:utf-8
    import socket
    #初始化Socket
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    #连接目标的ip和端口
    s.connect(('127.0.0.1', 999))
    # 接收消息
    print('-->>'+s.recv(1024).decode('utf-8'))
    # 发送消息
    s.send(b'Hello,I am a client')
    print('-->>'+s.recv(1024).decode('utf-8'))
    s.send(b'exit')
    #关闭套接字
    s.close()
    

    UDP编程

    服务端与客户端

    import socket
    #创建Socket,绑定指定的ip和端口
    #SOCK_DGRAM指定了这个Socket的类型是UDP。绑定端口和TCP一样。
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.bind(('127.0.0.1', 9999))
    print('Bind UDP on 9999...')
    while True:
        # 直接发送数据和接收数据
        data, addr = s.recvfrom(1024)
        print('Received from %s:%s.' % addr)
        s.sendto(b'Hello, %s!' % data, addr)
    
    import socket
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    for data in [b'Hello', b'World']:
        # 发送数据:
        s.sendto(data, ('127.0.0.1', 9999))
        # 接收数据:
        print(s.recv(1024).decode('utf-8'))
    s.close()
    

     

  • 相关阅读:
    为什么这年头蓝牙功能越来越差
    猜数字-暴力枚举
    怎么使用PHPMailer实现邮件的发送??
    实现windows操作系统和VB下Linux虚拟操作系统相互传取文件方式总结
    第一篇 对Javascript中原型的深入理解
    每天进步一点点——关于SSD写入放大问题
    两步改动CentOS主机名称
    [CentOs7]搭建ftp服务器
    Another app is currently holding the yum lock
    [CentOs7]安装mysql(2)
  • 原文地址:https://www.cnblogs.com/bai2018/p/10959955.html
Copyright © 2011-2022 走看看