zoukankan      html  css  js  c++  java
  • 七月在线爬虫班学习笔记(七)——高级内容-并发编程

    第七课主要内容:

    • 多进程
    • 多线程
    • FIFO,LIFO,优先队列
    • 线程局部变量
    • 进程与线程的选择
    • 线程池
    • 异步IO概念及twisted案例
    • 股票数据抓取

    Linux下实例:

    import os
    
    print('Process (%s) start...' % os.getpid())
    pid = os.fork()
    if pid == 0:
        print('Child process (%s), ppid is %s.' % (os.getpid(), os.getppid()))
    else:
        print('I (%s) just created a child process.' % os.getpid())
    

     windows下实例(需要导入multiprocessing这个库):

    from multiprocessing import Process
    import os
    
    def run_proc(name):
        print('Run child process %s (%s)...' % (name, os.getpid()))
    
    if __name__ == '__main__':
        print('Parent process %s.' % os.getpid())
        p = Process(target = run_proc, args = ('test',))
        p.start()
        p.join()
        print('End')
    

     

    多线程运行实例:

    如果想深入多线程,请参考书籍《unix环境高级编程》

    import time, threading
    
    def loop():
        thread_name = threading.current_thread().name
        print('Thread %s is running...' % thread_name)
        n = 0
        while n < 5:
            n = n + 1
            print('Thread %s >>> %d' % (thread_name, n))
        print('Thread %s ends.' % thread_name)
        
    thread_name = threading.current_thread().name
    print('Thread %s is running...' % thread_name)
    t = threading.Thread(target = loop, name = 'loopThread')
    t.start()
    t.join()
    print('Thread %s ends.' % thread_name)
    

     多线程竞争实例(操作银行存款):

    线程先要获得锁,然后才可以运行,之后再释放掉获得的锁。在实际生产环境中必须得有锁,否则会出错。

    import threading
    import time
    
    balance = 0
    lock = threading.Lock()
    
    def change_it(n):
        global balance
        balance = balance + n
        balance = balance - n
    
    '''
    def run_thread(n):
        for i in range(10000):
            change_it(n)
    '''
    
    def run_thread(n):
        for i in range(10000):
            lock.acquire()
            try:
                change_it(n)
            finally:
                lock.release()
    
    t1 = threading.Thread(target = run_thread, args = (5, ))
    t2 = threading.Thread(target = run_thread, args = (8, ))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(balance)
    

     

    有三种队列:FIFO:先进先出;LIFO:last in first out(相当于堆栈);优先队列:每进去的时候都会排序,在出来的时候保障是有序的。

    实例:

    # FIFO: First in first out
    # LIFO: Last in firstt out
    # Priority Queue
    
    import queue
    import threading
    
    q = queue.Queue()
    for i in range(5):
        q.put(i)
    
    while not q.empty():
        print(q.get())
    
    q = queue.LifoQueue()
    for i in range(5):
        q.put(i)
    
    while not q.empty():
        print(q.get())
    
    class Task:
        def __init__(self, priority, description):
            self.priority = priority
            self.description = description
    
        def __lt__(self, other):    # Python 2.7, implement __cmp__
            return self.priority < other.priority
    
    q = queue.PriorityQueue()
    q.put(Task(1, 'Important task'))
    q.put(Task(10, 'Normal task'))
    q.put(Task(100, 'Lazy task'))
    
    def job(q):
        while True:
            task = q.get()
            print('Task: %s
    ' % task.description)
            q.task_done()
    
    threads = [threading.Thread(target = job, args = (q, )), threading.Thread(target = job, args = (q, ))]
    for t in threads:
        t.setDaemon(True)
        t.start()
    q.join()
    

     

    实例:

    import multiprocessing
    import threading
    
    def loop():
        x = 0
        while True:
            x = x ^ 1
    
    for i in range(multiprocessing.cpu_count()):
        t = threading.Thread(target = loop)
        t.start()
    

     

    实例:

    import threading
    
    local_school = threading.local()
    
    def process_student():
        std = local_school.student
        print('Hello %s (%s)
    ' % (std, threading.current_thread().name))
    
    def process_thread(name):
        local_school.student = name
        process_student()
    
    t1 = threading.Thread(target = process_thread, args = ('Tom', ), name = 'TA')
    t2 = threading.Thread(target = process_thread, args = ('Jack', ), name = 'TB')
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    

     

    实例:

    import time
    import threadpool
    
    def long_op(n):
        print('%d
    ' % n)
        time.sleep(2)
    
    pool = threadpool.ThreadPool(2)
    tasks = threadpool.makeRequests(long_op, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    print(len(tasks))
    [pool.putRequest(task) for task in tasks]
    pool.wait()
    

     

    异步IO例子:

    import asyncio
    
    @asyncio.coroutine
    def wget(host):
        connect = asyncio.open_connection(host, 80)
        reader, writer = yield from connect
        header = 'GET / HTTP/1.0
    Host: %s
    
    ' % host
        print(header)
        writer.write(header.encode('utf-8'))
        yield from writer.drain()
        while True:
            line = yield from reader.readline()
            if line == b'
    ':
                break
            print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
        writer.close()
    
    loop = asyncio.get_event_loop()
    tasks = [wget(host) for host in ['www.sina.com', 'www.sohu.com', 'www.163.com']]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    

     

    实例:

    import sys
    from twisted.internet.protocol import ServerFactory
    from twisted.protocols.basic import LineReceiver
    from twisted.python import log
    from twisted.internet import reactor
     
    class CmdProtocol(LineReceiver):
      delimiter = '
    '
     
      def connectionMade(self):
        self.client_ip = self.transport.getPeer()[1]
        log.msg("Client connection from %s" % self.client_ip)
        if len(self.factory.clients) >= self.factory.clients_max:
          log.msg("Too many connections. bye !")
          self.client_ip = None
          self.transport.loseConnection()
        else:
          self.factory.clients.append(self.client_ip)
     
      def connectionLost(self, reason):
        log.msg('Lost client connection. Reason: %s' % reason)
        if self.client_ip:
          self.factory.clients.remove(self.client_ip)
     
      def lineReceived(self, line):
        log.msg('Cmd received from %s : %s' % (self.client_ip, line))
     
    class MyFactory(ServerFactory):
      protocol = CmdProtocol
     
      def __init__(self, clients_max=10):
        self.clients_max = clients_max
        self.clients = []
     
    log.startLogging(sys.stdout)
    reactor.listenTCP(9999, MyFactory(2))
    reactor.run()
    

     

    import requests
    import threading
    
    def get_stock(code):
        url = 'http://hq.sinajs.cn/list=' + code
        resp = requests.get(url).text
        print('%s
    ' % resp)
    
    codes = ['sz000878', 'sh600993', 'sz000002', 'sh600153', 'sz002230', 'sh600658']
    threads = [threading.Thread(target = get_stock, args = (code, )) for code in codes]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    

     

    import requests
    import threadpool
    
    def get_stock(code):
        url = 'http://hq.sinajs.cn/list=' + code
        resp = requests.get(url).text
        print('%s
    ' % resp)
    
    codes = ['sz000878', 'sh600993', 'sz000002', 'sh600153', 'sz002230', 'sh600658']
    pool = threadpool.ThreadPool(2)
    tasks = threadpool.makeRequests(get_stock, codes)
    [pool.putRequest(task) for task in tasks]
    pool.wait()
    

     

    import aiohttp
    import asyncio
    
    @asyncio.coroutine
    def get_stock(code):
        url = 'http://hq.sinajs.cn/list=' + code
        resp = yield from aiohttp.request('GET', url)
        body = yield from resp.read()
        print(body.decode('gb2312'))
    
    codes = ['sz000878', 'sh600993', 'sz000002', 'sh600153', 'sz002230', 'sh600658']
    tasks = [get_stock(code) for code in codes]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    
    情不知所起一往而深
  • 相关阅读:
    CodeForces 459D Pashmak and Parmida's problem
    cf 459c Pashmak and Buses
    hdu 1006 Tick and Tick 有技巧的暴力
    hdu 1005 Number Sequence
    hdu 1004 Let the Balloon Rise
    c++ 单引号和双引号
    c++一些总结
    剑指offer23 从上往下打印二叉树
    E: Unable to locate package
    vector
  • 原文地址:https://www.cnblogs.com/xingbiaoblog/p/9036590.html
Copyright © 2011-2022 走看看