zoukankan      html  css  js  c++  java
  • 网络编程之进程3

     什么叫做水平扩展:增加计算机的数量,并没有提高计算机的性能

     什么叫开源:开放源代码

     什么叫做虚拟化:同时跑多个系统

     什么是分布式和集中式:集中式就是在一台机器上执行任务;分布式就是将任务分散到多台机器上。

    一 JoinableQueue模块

     JoinableQueue模块介绍:比Queue多了两个函数,一个是task_done,另一个是join,都是专用于全球进行编程的,大多数用于生产者和消费者之间的。

      task_done:是用在get后面的 ,告诉os已经处理完了内容。

      join:是说Queue里面的生产数据全部处理完了。

    使用方法如下:

    # import multiprocessing
    # import time
    # import random
    # def sheng(name,q,wuping):
    #     for i in range(5):
    #         time.sleep(random.random())
    #         ret='%s%s'%(wuping,i)
    #         q.put(ret)
    #         print('厨师%s创建了%s'%(name,ret))
    #     q.join()
    # def xiao(name,q):
    #     while True:
    #         time.sleep(random.random())
    #         ret=q.get()
    #         print('%s吃了%s'%(name,ret))
    #         q.task_done()
    # if __name__=='__main__':
    #     q=multiprocessing.JoinableQueue()
    #     s1=multiprocessing.Process(target=sheng,args=('egon1',q,'包子'))
    #     s2=multiprocessing.Process(target=sheng,args=('egon2',q,'rou'))
    #     s3=multiprocessing.Process(target=sheng,args=('egon3',q,'骨头'))
    #     x1=multiprocessing.Process(target=xiao,args=('alex1',q))
    #     x2=multiprocessing.Process(target=xiao,args=('alex2',q))
    #     x1.daemon=True
    #     x2.daemon=True
    #     s1.start()
    #     s2.start()
    #     s3.start()
    #     x1.start()
    #     x2.start()
    #     s1.join()
    #     s2.join()
    #     s3.join()
     

    二 Manager:共享内存空间

     dict:共享的以恶搞字典,能够被多个共享

    如下1:

    # import multiprocessing
    # def walk(d):
    #     d['count']-=1
    # if __name__=='__main__':
    #     m=multiprocessing.Manager()
    #     d=m.dict({'count':100})
    #     for i in range(100):
    #         w=multiprocessing.Process(target=walk,args=(d,))
    #         w.start()
    #     w.join()
    #     print(d)
    

     如下2:

    # import multiprocessing
    # def walk(d,loak):
    #     with loak:
    #         d['count']-=1
    # if __name__=='__main__':
    #     m=multiprocessing.Manager()
    #     d=m.dict({'count':100})
    #     lock=multiprocessing.Lock()
    #     li=[]
    #     for i in range(100):
    #         w=multiprocessing.Process(target=walk,args=(d,lock))
    #         li.append(w)
    #         w.start()
    #     for j in li:
    #         j.join()
    #     print(j)
    #     print(d)
    

     三 进程池

     什么是进程池:创建一定数量的进程个数

     同步和异步:提交任务的两种方式。

     Pool:创建进程池和控制进程的数目,默认的个数是根据CPU的核数

     apply:传入两个参数,第一个是指定任务。向进程池提交一个任务,实现了串行和同步调用。结束任务后,立马会拿到结果。

      开启的进程数目有几个,就会有几个pid。

     什么是同步调用:提交一个任务,等到任务结束后才能执行下一个任务。

    # import multiprocessing
    # import time
    # import random
    # import os
    # def walk(n):
    #     print('%s is walking'%os.getpid())
    #     time.sleep(random.random())
    #     return n
    #
    # if __name__=='__main__':
    #     p=multiprocessing.Pool(4)
    #     for i in range(10):
    #         q=p.apply(walk,args=(i,))
    #         print(q)
    

     apply_async:向进程池提交任务,提交完任务后就不管了,只管提交任务,不能直接执行任务。实现了一个并发和恶异步调用

      执行方法:先close:关闭任务,好让任务结束

           然后join:等待一个进程池不在提交任务,并且任务结束和计算任务个数

           最后get:获取返回值

     什么是异步调用:提交完一个任务过后不会在原地等待,而是继续提交下一个任务。等待所有的任务结束后在用get获取任务

    如下:

    # import multiprocessing
    # import time
    # import random
    # import os
    # def walk(n):
    #     print('%s is walking'%os.getpid())
    #     time.sleep(random.random())
    #     return n
    #
    # if __name__=='__main__':
    #     p=multiprocessing.Pool(4)
    #     li=[]
    #     for i in range(10):
    #         q=p.apply_async(walk,args=(i,))
    #         li.append(q)
    #     p.close()
    #     p.join()
    #     for i in li:
    #         print(i.get())
    #
    

    同步和异步详情:https://www.zhihu.com/question/19732473

     进程池没有任务了,进程就会被回收调。

    如下:

    from  multiprocessing import Pool
    import os,time,random
    def work(n):
        print('%s is working' %os.getpid())
    if __name__ == '__main__':
        p=Pool(4)
        for i in range(50):
            # res=p.apply(work,args=(i,))
            obj=p.apply_async(work,args=(i,))
        p.close()
        p.join()
    

     如果在执行和提交任务的时间非常快,有时会提交任务后,第一个任务没有执行结束,就会开启一个新的进程执行这个任务;有时会在提交任务后,某进程刚好执行完,并且没有被回收掉,那么这次提交的任务就会被这个进程执行。

     为什么要用进程池:为了实现并发,然后在并发的基础上对进程的数目进行一个限制。

    四  回调函数

     什么是回调函数:通过一个函数内存调用的函数,如果将这个函数的地址当作参数传给另一个函数,当这个函数的地址用来调用其所只想的函数是,这个所指向的函数就是回调函数。详情访问:http://www.cnblogs.com/hainan-zhang/p/6222552.html

      callback:后面加上的是回调函数

      回调函数的进程其实就是主进程。

     用回调函数实现一个网络爬虫;需要用到requests模块

      get:获取网址

      status_code:返回状态码

      text:查看下载网址的内容

     

    from multiprocessing import Pool,Process
    import requests
    import os
    import time,random
    def get(url):
        print('%s GET %s' %(os.getpid(),url))
        response=requests.get(url)
        time.sleep(random.randint(1,3))
        if response.status_code == 200:
            print('%s DONE %s' % (os.getpid(), url))
            return {'url':url,'text':response.text}
    
    def parse(dic):
        print('%s PARSE %s' %(os.getpid(),dic['url']))
        time.sleep(1)
        res='%s:%s
    ' %(dic['url'],len(dic['text']))
        with open('db.txt','a') as f:
            f.write(res)
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
        p=Pool(2)
        start_time=time.time()
        objs=[]
        for url in urls:
            obj=p.apply_async(get,args=(url,),callback=parse) #主进程负责干回调函数的活
            objs.append(obj)
        p.close()
        p.join()
    
        print('主',(time.time()-start_time))
    

      

  • 相关阅读:
    原创:ESXi5.1安装实验1
    Xeon 5000系列说起
    ESXi5.1安装实验3
    VMware虚拟化网络和存储功能简介
    Understanding Core Exchange Server 2007 Design Plans
    Exchange 2007和2003,2000等共存。
    自动发现服务深入理解
    管理 POP3 和 IMAP4 服务
    原创:vsphere client 5.1安装实战
    UPNP解读2含netbios,wins,DNS
  • 原文地址:https://www.cnblogs.com/fangjie0410/p/7657150.html
Copyright © 2011-2022 走看看