zoukankan      html  css  js  c++  java
  • 进程 线程 协程

    进程

    假如有两个程序A和B,程序A在执行到一半的过程中,需要读取大量的数据输入(I/O操作),而此时CPU只能静静地等待任务A读取完数据才能继续执行,这样就白白浪费了CPU资源。是不是在程序A读取数据的过程中,让程序B去执行,当程序A读取完数据之后,让 程序B暂停,然后让程序A继续执行? 当然没问题,但这里有一个关键词:切换 既然是切换,那么这就涉及到了状态的保存,状态的恢复,加上程序A与程序B所需要的系统资 源(内存,硬盘,键盘等等)是不一样的。自然而然的就需要有一个东西去记录程序A和程序B分别需要什么资源,怎样去识别程序A和程序B等等,所以就有了一个叫进程的抽象
    进程定义:
    
        进程就是一个程序在一个数据集上的一次动态执行过程。
        进程一般由程序、数据集、进程控制块三部分组成。
        我们编写的程序用来描述进程要完成哪些功能以及如何完成;
        数据集则是程序在执行过程中所需要使用的资源;
        进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系
        统感知进程存在的唯一标志。
    举一例说明进程:
       想象一位有一手好厨艺的计算机科学家正在为他的女儿烘制生日蛋糕。他有做生日蛋糕的食谱,厨房里有所需
       的原料:面粉、鸡蛋、糖、香草汁等。在这个比喻中,做蛋糕的食谱就是程序(即用适当形式描述的算法)计算机科学家就是处理器(cpu),
    而做蛋糕的各种原料就是输入数据。进程就是厨师阅读食谱、取来各种原料以及烘制蛋糕等一系列动作的总和。 现在假设计算机科学家的儿子哭着跑了进来,说他的头被一只蜜蜂蛰了。计算机科学家就记录下他 照着食谱做到哪儿了(保存进程的当前状态),然后拿出一本急救手册,按照其中的指示处理蛰伤。这 里,我们看到处理机从一个进程(做蛋糕)切换到另一个高优先级的进程(实施医疗救治),每个进程 拥有各自的程序(食谱和急救手册)。当蜜蜂蛰伤处理完之后,这位计算机科学家又回来做蛋糕,从他 离开时的那一步继续做下去。
     

     线程

    线程的出现是为了降低上下文切换的消耗,提高系统的并发性,并突破一个进程只能干一样事的缺陷,
    使到进程内并发成为可能。
    
    假设,一个文本程序,需要接受键盘输入,将内容显示在屏幕上,还需要保存信息到硬盘中。若只有
    一个进程,势必造成同一时间只能干一样事的尴尬(当保存时,就不能通过键盘输入内容)。若有多
    个进程,每个进程负责一个任务,进程A负责接收键盘输入的任务,进程B负责将内容显示在屏幕上的
    任务,进程C负责保存内容到硬盘中的任务。这里进程A,B,C间的协作涉及到了进程通信问题,而且
    有共同都需要拥有的东西-------文本内容,不停的切换造成性能上的损失。若有一种机制,可以使
    任务A,B,C共享资源,这样上下文切换所需要保存和恢复的内容就少了,同时又可以减少通信所带
    来的性能损耗,那就好了。是的,这种机制就是线程。
    
    线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序
    计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发
    性能。线程没有自己的系统资源

    线程进程的关系区别

    1 一个程序至少有一个进程,一个进程至少有一个线程.(进程可以理解成线程的容器)
    
    2 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。
    
    3 线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和
      程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。 
    
    4 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调
      度的一个独立单位. 
      线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程
      自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)但是
      它可与同属一个进程的其他的线程共享进程所拥有的全部资源. 
      一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行.

    线程的调用及join方法

    先看看没有join时,如何执行

    import threading
    import time
    
    
    def music():
        print("begin to listen %s" % time.time())
        time.sleep(3)
        print("stop to listen %s" % time.time())
    
    
    def game():
        print("begin to play game %s" % time.time())
        time.sleep(5)
        print("stop to play game %s" % time.time())
    
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=music)  # 创建线程t1
    
        t2 = threading.Thread(target=game)   # 创建线程t2
    
        t1.start()  # 启动线程
        t2.start()  # 启动线程
    
    
        print("ending")
    C:Python35python.exe E:/django学习/pachong/11.py
    begin to listen 1529803604.4184706    线程1 的执行
    begin to play game 1529803604.4194708   线程2 的执行
    ending  主线程执行
    stop to listen 1529803607.4196422  线程1 的执行
    stop to play game 1529803609.4197567  线程2 的执行
    
    Process finished with exit code 0

    上述代码 先执行线程1 紧接着线程2,然后执行主线程的
    print("ending"),接着线程1等待3秒 然后执行 print('stop'), 再接着线程2, 2秒后执行 print('stop')

    再看看加上 jion()方法

    import threading
    import time
    
    
    def music():
        print("begin to listen %s" % time.time())
        time.sleep(3)
        print("stop to listen %s" % time.time())
    
    
    def game():
        print("begin to play game %s" % time.time())
        time.sleep(5)
        print("stop to play game %s" % time.time())
    
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=music)  # 创建线程t1
    
        t2 = threading.Thread(target=game)   # 创建线程t2
    
        t1.start()  # 启动线程
        t2.start()  # 启动线程
    
        t1.join()   # 阻塞主线程,如果子线程没有执行完,主线程不能往下执行
        t2.join()
    
        print("ending")
    C:Python35python.exe E:/django学习/pachong/11.py
    begin to listen 1529804033.7880292
    begin to play game 1529804033.7880292
    stop to listen 1529804036.7882009
    stop to play game 1529804038.7883153
    ending
    
    Process finished with exit code 0

    看看这次的执行过程,主线程的
    print("ending")在最后打印出来,是因为 t1.join(), t2.join() 阻塞了主线程,要等到子线程执行完毕之后,主线程才执行。因此 ending 在最后执行。
    
    

    再看看join()的位置不同,结果的变化

    import threading
    import time
    
    
    def music():
        print("begin to listen %s" % time.time())
        time.sleep(3)
        print("stop to listen %s" % time.time())
    
    
    def game():
        print("begin to play game %s" % time.time())
        time.sleep(5)
        print("stop to play game %s" % time.time())
    
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=music)  # 创建线程t1
    
        t2 = threading.Thread(target=game)   # 创建线程t2
    
        t1.start()  # 启动线程
        t2.start()  # 启动线程
    
        t1.join()   # 阻塞主线程,如果子线程没有执行完,主线程不能往下执行
        # t2.join()
    
        print("ending")
    C:Python35python.exe E:/django学习/pachong/11.py
    begin to listen 1529804305.4875696
    begin to play game 1529804305.4875696
    stop to listen 1529804308.487741
    ending
    stop to play game 1529804310.4878554

    这次 只是线程1 join(), 所以线程1 执行完毕之后,主线程立即执行,ending打印出来,接着执行线程2

    再看一例

    import threading
    import time
    
    
    def music():
        print("begin to listen %s" % time.time())
        time.sleep(3)
        print("stop to listen %s" % time.time())
    
    
    def game():
        print("begin to play game %s" % time.time())
        time.sleep(5)
        print("stop to play game %s" % time.time())
    
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=music)  # 创建线程t1
    
        t2 = threading.Thread(target=game)   # 创建线程t2
    
        t1.start()  # 启动线程
        t1.join()  # 阻塞主线程,如果子线程没有执行完,主线程不能往下执行
        t2.start()  # 启动线程
        t2.join()
    
        print("ending")
    C:Python35python.exe E:/django学习/pachong/11.py
    begin to listen 1529804588.9817843
    stop to listen 1529804591.981956
    begin to play game 1529804591.981956
    stop to play game 1529804596.982242
    ending
    
    Process finished with exit code 0

    看这个执行过程,相当于没有多线程,还是函数顺序执行,是因为 t1.join()阻塞主线程,不让其执行,待t1执行完毕之后,执行t2,t2也阻塞主线程,待t2执行完毕之后,主线程才执行。
    join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
    
    setDaemon(True):
    
             将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。
    
             当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成
    
             想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程
    
             完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦

    其它方法:

    ...

    setDaemon(True)

    import threading
    from time import ctime, sleep
    import time
    
    
    def ListenMusic(name):
        print("Begin listening to %s. %s" % (name, ctime()))
        sleep(3)
        print("end listening %s" % ctime())
    
    
    def RecordBlog(title):
        print("Begin recording the %s! %s" % (title, ctime()))
        sleep(5)
        print('end recording %s' % ctime())
    
    
    threads = []
    
    t1 = threading.Thread(target=ListenMusic, args=('水手',))
    t2 = threading.Thread(target=RecordBlog, args=('python线程',))
    
    threads.append(t1)
    threads.append(t2)
    
    if __name__ == '__main__':
        t2.setDaemon(True)
    
        for t in threads:
            t.start()
    C:Python35python.exe E:/django学习/pachong/11.py
    Begin listening to 水手. Sun Jun 24 10:10:46 2018
    Begin recording the python线程! Sun Jun 24 10:10:46 2018
    end listening Sun Jun 24 10:10:49 2018
    
    Process finished with exit code 0

    上述将线程2 设置为
    t2.setDaemon(True),守护线程,即主线程退出时,它也退出,以上执行过程中,先执行线程1,线程1执行完之后,主线程退出,主线程一退出,子线程2也退出,所以子线程2的
    最后一句没有打印出来。
    
    

     线程调用方式二

    import threading
    import time
    
    
    class MyThread(threading.Thread):
    
        def __init__(self, num):
            threading.Thread.__init__(self)  # 执行父类的方法
            self.num = num
    
        def run(self):  # 定义每个线程要运行的函数
    
            print("running on number:%s" % self.num)
    
            time.sleep(3)
    
    if __name__ == '__main__':
    
        t1 = MyThread(1)
        t2 = MyThread(2)
        t1.start()
        t2.start()
        print("ending......")
    View Code

    使用进程池采集数据

    from multiprocessing import Process, Manager, Pool
    from lxml import etree
    import requests
    import re
    import json
    import time
    
    """
    多进程采集数据
    """
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36',
    }
    URL = "http://hr.tencent.com/position.php"
    response = requests.get(URL, headers=headers)
    response.encoding = response.apparent_encoding
    html = etree.HTML(response.text)
    # 获取最后一页的页码
    a = html.xpath('//div[@class="pagenav"]/child::a[last()-1]/text()')[0]
    # 所有需要访问的页码url
    url_list = []
    for i in range(0, int(a)):
        if i == 0:
            url = 'https://hr.tencent.com/position.php'
        else:
            url = 'https://hr.tencent.com/position.php?&start=%s#a' % str(i * 10)
        url_list.append(url)
    
    
    # 处理 标题 url
    def handle_page_url_request(urls, q):
        rep = requests.get(urls, headers=headers)
        if rep.status_code == 200:
            et = etree.HTML(rep.text)
            link1 = et.xpath('//table[@class="tablelist"]//tr[@class="even"]//a/@href')
            link2 = et.xpath('//table[@class="tablelist"]//tr[@class="odd"]//a/@href')
            link1.extend(link2)
            for link in link1:
                link = '%s' + link
                b = link % 'https://hr.tencent.com/'
                q.put(b)
    
    
    # 处理 详情url
    def handle_request_url(urls, q1):
        dic = {}
        rep = requests.get(urls, headers=headers)
        if rep.status_code == 200:
            et = etree.HTML(rep.text)
            title = et.xpath('//table[@class="tablelist textl"]//tr[@class="h"]/td/text()')
            work_place = et.xpath('//table[@class="tablelist textl"]//tr[@class="c bottomline"]/td[1]/text()')
            zw = et.xpath('//table[@class="tablelist textl"]//tr[@class="c bottomline"]/td[2]/text()')
            rs = et.xpath('//table[@class="tablelist textl"]//tr[@class="c bottomline"]/td[3]/text()')
            gzzz = et.xpath('//table[@class="tablelist textl"]//tr[@class="c"][1]//ul[@class="squareli"]/li/text()')
            gzyq = et.xpath('//table[@class="tablelist textl"]//tr[@class="c"][2]//ul[@class="squareli"]/li/text()')
            dic['title'] = title[0]
            dic['职位类别'] = zw[0]
            dic['工作地点'] = work_place[0]
            dic['人数'] = rs[0]
            dic['工作职责'] = "&".join(gzzz)
            dic['工作要求'] = "&".join(gzyq)
            q1.put(dic)
    
    
    if __name__ == '__main__':
        Q = Manager().Queue()  # 封装了数据类型,专门做进程数据共享
        data = []
        pool = Pool(4)
        print('one start....')
        for url in url_list:
            pool.apply_async(handle_page_url_request, args=(url, Q))
    
        pool.close()  # 关闭以后就不能再创建新的进程了
        pool.join()   # 等待所有进程池的进程运行完毕
        print('one end....')
    
        print('two start....')
        pool1 = Pool(4)
        Q1 = Manager().Queue()
        for item in range(Q.qsize()):
            url = Q.get()
            pool1.apply_async(handle_request_url, args=(url, Q1))
        pool1.close()  # 关闭以后就不能再创建新的进程了
        pool1.join()  # 等待所有进程池的进程运行完毕
        print('two end....')
    
        while not Q1.empty():
            data.append(Q1.get())
    
        data = json.dumps(data, ensure_ascii=False, indent=4)
        with open('process_tz', 'wb') as f:
            f.write(bytes(data, encoding='utf-8'))
    View Code

    使用多线程采集数据

    import requests
    from lxml import etree
    import threading
    from queue import Queue
    import json
    
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36',
    }
    URL = "http://hr.tencent.com/position.php"
    response = requests.get(URL, headers=headers)
    response.encoding = response.apparent_encoding
    html = etree.HTML(response.text)
    # 获取最后一页的页码
    a = html.xpath('//div[@class="pagenav"]/child::a[last()-1]/text()')[0]
    
    
    def handle_context_url(arg, args):
        """
        :param arg:  task_q 任务队列Q  每页的url集
        :param args: list_q 任务队列Q  所有的职位url集
        :return:
        """
        while not arg.empty():
                urls = arg.get()
                rep = requests.get(urls, headers=headers)
                if rep.status_code == 200:
                    print(urls)
                    rep.encoding = response.apparent_encoding
                    et = etree.HTML(rep.text)
                    link1 = et.xpath('//table[@class="tablelist"]//tr[@class="even"]//a/@href')
                    link2 = et.xpath('//table[@class="tablelist"]//tr[@class="odd"]//a/@href')
                    link1.extend(link2)
                    for link in link1:
                        link = '%s' + link
                        b = link % 'https://hr.tencent.com/'
                        args.put(b)
    
    
    def handle_detail_url(q, da):
        dic = {}
        while not q.empty():
            urls = q.get()
            rep = requests.get(urls, headers=headers)
            if rep.status_code == 200:
                et = etree.HTML(rep.text)
                title = et.xpath('//table[@class="tablelist textl"]//tr[@class="h"]/td/text()')
                work_place = et.xpath('//table[@class="tablelist textl"]//tr[@class="c bottomline"]/td[1]/text()')
                zw = et.xpath('//table[@class="tablelist textl"]//tr[@class="c bottomline"]/td[2]/text()')
                rs = et.xpath('//table[@class="tablelist textl"]//tr[@class="c bottomline"]/td[3]/text()')
                gzzz = et.xpath('//table[@class="tablelist textl"]//tr[@class="c"][1]//ul[@class="squareli"]/li/text()')
                gzyq = et.xpath('//table[@class="tablelist textl"]//tr[@class="c"][2]//ul[@class="squareli"]/li/text()')
                # print(title)
                dic['title'] = title[0]
                try:
                    dic['职位类别'] = zw[0]
                except IndexError:
                    zw = None
                dic['工作地点'] = work_place[0]
                dic['人数'] = rs[0]
                dic['工作职责'] = "&".join(gzzz)
                dic['工作要求'] = "&".join(gzyq)
                da.put(dic)
    
    
    if __name__ == '__main__':
        task_q = Queue()  # 每页的url集
        list_q = Queue()  # 所有的职位url集
        data_q = Queue()  # 每个url采集到的数据
        data = []
        for i in range(0, int(a)):
            if i == 0:
                url = 'https://hr.tencent.com/position.php'
            else:
                url = 'https://hr.tencent.com/position.php?&start=%s#a' % str(i * 10)
            task_q.put(url)
    
        # 创建10个线程 采集每个页码上的url
        page_list = []
        for j in range(10):
            t = threading.Thread(target=handle_context_url, args=(task_q, list_q))
            t.start()
            page_list.append(t)
    
        # 设置阻塞, 在采集页码线程执行完毕之后才能执行采集url详情线程
        for m in page_list:
            m.join()
    
        # 创建10个线程 采集url详情
        detail_list = []
        for k in range(10):
            t = threading.Thread(target=handle_detail_url, args=(list_q, data_q))
            t.start()
            detail_list.append(t)
    
        for n in detail_list:
            n.join()
    
        while not data_q.empty():
            data.append(data_q.get())
    
        data = json.dumps(data, ensure_ascii=False, indent=4)
        with open('thread_tz', 'wb') as f:
            f.write(bytes(data, encoding='utf-8'))
    
        print('over')
    View Code

    使用协程(单线程的异步IO)采集数据

  • 相关阅读:
    Hadoop-2.4.0中HDFS文件块大小默认为128M
    看两种截然不同的设计理念:分布式文件系统支持大和小文件的思考
    SecondaryNameNode中的“Inconsistent checkpoint fields”错误原因
    查看HDFS集群信息
    执行“hdfs dfs -ls”时报ConnectException
    启动Hadoop HDFS时的“Incompatible clusterIDs”错误原因分析
    “hdfs dfs -ls”命令的使用
    SuSE Linux上修改主机名
    strerror线程安全分析
    两个SSH2间免密码登录
  • 原文地址:https://www.cnblogs.com/yuqiangli0616/p/9219629.html
Copyright © 2011-2022 走看看