zoukankan      html  css  js  c++  java
  • 进程和线程-day9

    本节内容

    1. 进程、与线程区别
    2. python GIL全局解释器锁
    3. 线程
      1. 语法
      2. join
      3. 线程锁之LockRlock信号量
      4. Event事件 
      5. queue队列
      6. 线程池
    4. 进程
      1. 语法
      2. 进程间通讯
      3. 进程

       5. 协程

    一、进程和线程的区别

    1. 线程共享创建它的进程的地址空间; 进程拥有自己的地址空间。
    2. 线程可以直接访问其进程的数据段; 进程拥有父进程的数据段的自己的副本。
    3. 线程可以直接与其进程的其他线程通信; 进程必须使用进程间通信与兄弟进程进行通信。
    4. 线程很容易创建,而进程需要复制父进程的地址
    5. 线程可以对同一进程的线程进行相当程度的控制; 进程只能对子进程进行控制。
    6. 对主线程的更改(取消,优先级更改等)可能会影响进程其他线程的行为; 对父进程的更改不会影响子进程

    二、Python GIL锁

      Python的一个进程中在同一时刻只能有一个线程能被CPU给调度,我擦这句话岂不是说Python中的多线程不就是一个幌子吗?就算我创造了很多线程但是由于有gil锁的原因,CPU只调度(运行)一个,其他的还是需要等候CPU调度完成之后再调度下一个?其实不然,在一个线程被调用的时候只要CPU不需要等待,那么下一个进程则会被立即调度。IO请求就是一个很好的应用到多线程的场景,因为在IO等待的时候不需要CPU进行计算。所以线程之间调度速度很快。

      从上面我们可以得到一个结论在——在IO密集性的时候我们可以运用到多线程,而在计算密集性的时候我们可以运用到多进程。

    三、线程

     3.1:线程的基本使用

    线程在Python3中是一个单独的模块,我们需要导入threading。

    创建一个线程
    threading.Thread

    如果我们光创建了一个进程,什么东西都不做,没有任何意义,所以我们需要给这个线程派发一点任务

    import threading
    import time
    
    def task(arg):
        time.sleep(2)
        print(arg)
    
    t = threading.Thread(target=task,args=(13,))
    #启动这个线程
    t.start()
    
    print("end")

    创建多个线程试一下

    import threading
    import time
    
    
    def task(arg):
        time.sleep(2)
        print(arg)
    
    for i in range(10):
        t = threading.Thread(target=task,args=(i,))
        t.start()
    
    print("end")

     3.2:setDaemon和join

      setDaemon:在上面的代码执行后首先主线程走到最后打印出了end字符,然后10个线程并发输出,但是有时候我们需要主线程执行完不等待其他线程,这是我们需要在start前执行setdaemon。PS:True为不等待,False为等待

    import threading
    import time
    
    # t = threading.Thread(target=)
    
    def task(arg):
        time.sleep(2)
        print(arg)
    
    for i in range(10):
        t = threading.Thread(target=task,args=(i,))
        t.setDaemon(True)
        t.start()
    
    print("end")

      join:可以让多线程变成串行,并且可以在join里面设置数字,表示最大等待的时间

    import threading
    import time
    
    def task(arg):
        time.sleep(2)
        print(arg)
    
    for i in range(10):
        t = threading.Thread(target=task,args=(i,))
        t.start()
        t.join()
    print("end")

      变种调用

    class Thread:
        """A class that represents a thread of control.
    
        This class can be safely subclassed in a limited fashion. There are two ways
        to specify the activity: by passing a callable object to the constructor, or
        by overriding the run() method in a subclass.
    
        """
    
        _initialized = False
        # Need to store a reference to sys.exc_info for printing
        # out exceptions when a thread tries to use a global var. during interp.
        # shutdown and thus raises an exception about trying to perform some
        # operation on/with a NoneType
        _exc_info = _sys.exc_info
        # Keep sys.exc_clear too to clear the exception just before
        # allowing .join() to return.
        #XXX __exc_clear = _sys.exc_clear
    
        def __init__(self, group=None, target=None, name=None,
                     args=(), kwargs=None, *, daemon=None):
            """This constructor should always be called with keyword arguments. Arguments are:
    
            *group* should be None; reserved for future extension when a ThreadGroup
            class is implemented.
    
            *target* is the callable object to be invoked by the run()
            method. Defaults to None, meaning nothing is called.
    
            *name* is the thread name. By default, a unique name is constructed of
            the form "Thread-N" where N is a small decimal number.
    
            *args* is the argument tuple for the target invocation. Defaults to ().
    
            *kwargs* is a dictionary of keyword arguments for the target
            invocation. Defaults to {}.
    
            If a subclass overrides the constructor, it must make sure to invoke
            the base class constructor (Thread.__init__()) before doing anything
            else to the thread.
    
            """
            assert group is None, "group argument must be None for now"
            if kwargs is None:
                kwargs = {}
            self._target = target
            self._name = str(name or _newname())
            self._args = args
            self._kwargs = kwargs
            if daemon is not None:
                self._daemonic = daemon
            else:
                self._daemonic = current_thread().daemon
            self._ident = None
            self._tstate_lock = None
            self._started = Event()
            self._is_stopped = False
            self._initialized = True
            # sys.stderr is not stored in the class like
            # sys.exc_info since it can be changed between instances
            self._stderr = _sys.stderr
            # For debugging and _after_fork()
            _dangling.add(self)
    
        def _reset_internal_locks(self, is_alive):
            # private!  Called by _after_fork() to reset our internal locks as
            # they may be in an invalid state leading to a deadlock or crash.
            self._started._reset_internal_locks()
            if is_alive:
                self._set_tstate_lock()
            else:
                # The thread isn't alive after fork: it doesn't have a tstate
                # anymore.
                self._is_stopped = True
                self._tstate_lock = None
    
        def __repr__(self):
            assert self._initialized, "Thread.__init__() was not called"
            status = "initial"
            if self._started.is_set():
                status = "started"
            self.is_alive() # easy way to get ._is_stopped set when appropriate
            if self._is_stopped:
                status = "stopped"
            if self._daemonic:
                status += " daemon"
            if self._ident is not None:
                status += " %s" % self._ident
            return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
    
        def start(self):
            """Start the thread's activity.
    
            It must be called at most once per thread object. It arranges for the
            object's run() method to be invoked in a separate thread of control.
    
            This method will raise a RuntimeError if called more than once on the
            same thread object.
    
            """
            if not self._initialized:
                raise RuntimeError("thread.__init__() not called")
    
            if self._started.is_set():
                raise RuntimeError("threads can only be started once")
            with _active_limbo_lock:
                _limbo[self] = self
            try:
                _start_new_thread(self._bootstrap, ())
            except Exception:
                with _active_limbo_lock:
                    del _limbo[self]
                raise
            self._started.wait()
    
        def run(self):
            """Method representing the thread's activity.
    
            You may override this method in a subclass. The standard run() method
            invokes the callable object passed to the object's constructor as the
            target argument, if any, with sequential and keyword arguments taken
            from the args and kwargs arguments, respectively.
    
            """
            try:
                if self._target:
                    self._target(*self._args, **self._kwargs)
            finally:
                # Avoid a refcycle if the thread is running a function with
                # an argument that has a member that points to the thread.
                del self._target, self._args, self._kwargs
    
        def _bootstrap(self):
            # Wrapper around the real bootstrap code that ignores
            # exceptions during interpreter cleanup.  Those typically
            # happen when a daemon thread wakes up at an unfortunate
            # moment, finds the world around it destroyed, and raises some
            # random exception *** while trying to report the exception in
            # _bootstrap_inner() below ***.  Those random exceptions
            # don't help anybody, and they confuse users, so we suppress
            # them.  We suppress them only when it appears that the world
            # indeed has already been destroyed, so that exceptions in
            # _bootstrap_inner() during normal business hours are properly
            # reported.  Also, we only suppress them for daemonic threads;
            # if a non-daemonic encounters this, something else is wrong.
            try:
                self._bootstrap_inner()
            except:
                if self._daemonic and _sys is None:
                    return
                raise
    
        def _set_ident(self):
            self._ident = get_ident()
    
        def _set_tstate_lock(self):
            """
            Set a lock object which will be released by the interpreter when
            the underlying thread state (see pystate.h) gets deleted.
            """
            self._tstate_lock = _set_sentinel()
            self._tstate_lock.acquire()
    
        def _bootstrap_inner(self):
            try:
                self._set_ident()
                self._set_tstate_lock()
                self._started.set()
                with _active_limbo_lock:
                    _active[self._ident] = self
                    del _limbo[self]
    
                if _trace_hook:
                    _sys.settrace(_trace_hook)
                if _profile_hook:
                    _sys.setprofile(_profile_hook)
    
                try:
                    self.run()
                except SystemExit:
                    pass
                except:
                    # If sys.stderr is no more (most likely from interpreter
                    # shutdown) use self._stderr.  Otherwise still use sys (as in
                    # _sys) in case sys.stderr was redefined since the creation of
                    # self.
                    if _sys and _sys.stderr is not None:
                        print("Exception in thread %s:
    %s" %
                              (self.name, _format_exc()), file=_sys.stderr)
                    elif self._stderr is not None:
                        # Do the best job possible w/o a huge amt. of code to
                        # approximate a traceback (code ideas from
                        # Lib/traceback.py)
                        exc_type, exc_value, exc_tb = self._exc_info()
                        try:
                            print((
                                "Exception in thread " + self.name +
                                " (most likely raised during interpreter shutdown):"), file=self._stderr)
                            print((
                                "Traceback (most recent call last):"), file=self._stderr)
                            while exc_tb:
                                print((
                                    '  File "%s", line %s, in %s' %
                                    (exc_tb.tb_frame.f_code.co_filename,
                                        exc_tb.tb_lineno,
                                        exc_tb.tb_frame.f_code.co_name)), file=self._stderr)
                                exc_tb = exc_tb.tb_next
                            print(("%s: %s" % (exc_type, exc_value)), file=self._stderr)
                        # Make sure that exc_tb gets deleted since it is a memory
                        # hog; deleting everything else is just for thoroughness
                        finally:
                            del exc_type, exc_value, exc_tb
                finally:
                    # Prevent a race in
                    # test_threading.test_no_refcycle_through_target when
                    # the exception keeps the target alive past when we
                    # assert that it's dead.
                    #XXX self._exc_clear()
                    pass
            finally:
                with _active_limbo_lock:
                    try:
                        # We don't call self._delete() because it also
                        # grabs _active_limbo_lock.
                        del _active[get_ident()]
                    except:
                        pass
    
        def _stop(self):
            # After calling ._stop(), .is_alive() returns False and .join() returns
            # immediately.  ._tstate_lock must be released before calling ._stop().
            #
            # Normal case:  C code at the end of the thread's life
            # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
            # that's detected by our ._wait_for_tstate_lock(), called by .join()
            # and .is_alive().  Any number of threads _may_ call ._stop()
            # simultaneously (for example, if multiple threads are blocked in
            # .join() calls), and they're not serialized.  That's harmless -
            # they'll just make redundant rebindings of ._is_stopped and
            # ._tstate_lock.  Obscure:  we rebind ._tstate_lock last so that the
            # "assert self._is_stopped" in ._wait_for_tstate_lock() always works
            # (the assert is executed only if ._tstate_lock is None).
            #
            # Special case:  _main_thread releases ._tstate_lock via this
            # module's _shutdown() function.
            lock = self._tstate_lock
            if lock is not None:
                assert not lock.locked()
            self._is_stopped = True
            self._tstate_lock = None
    
        def _delete(self):
            "Remove current thread from the dict of currently running threads."
    
            # Notes about running with _dummy_thread:
            #
            # Must take care to not raise an exception if _dummy_thread is being
            # used (and thus this module is being used as an instance of
            # dummy_threading).  _dummy_thread.get_ident() always returns -1 since
            # there is only one thread if _dummy_thread is being used.  Thus
            # len(_active) is always <= 1 here, and any Thread instance created
            # overwrites the (if any) thread currently registered in _active.
            #
            # An instance of _MainThread is always created by 'threading'.  This
            # gets overwritten the instant an instance of Thread is created; both
            # threads return -1 from _dummy_thread.get_ident() and thus have the
            # same key in the dict.  So when the _MainThread instance created by
            # 'threading' tries to clean itself up when atexit calls this method
            # it gets a KeyError if another Thread instance was created.
            #
            # This all means that KeyError from trying to delete something from
            # _active if dummy_threading is being used is a red herring.  But
            # since it isn't if dummy_threading is *not* being used then don't
            # hide the exception.
    
            try:
                with _active_limbo_lock:
                    del _active[get_ident()]
                    # There must not be any python code between the previous line
                    # and after the lock is released.  Otherwise a tracing function
                    # could try to acquire the lock again in the same thread, (in
                    # current_thread()), and would block.
            except KeyError:
                if 'dummy_threading' not in _sys.modules:
                    raise
    
        def join(self, timeout=None):
            """Wait until the thread terminates.
    
            This blocks the calling thread until the thread whose join() method is
            called terminates -- either normally or through an unhandled exception
            or until the optional timeout occurs.
    
            When the timeout argument is present and not None, it should be a
            floating point number specifying a timeout for the operation in seconds
            (or fractions thereof). As join() always returns None, you must call
            isAlive() after join() to decide whether a timeout happened -- if the
            thread is still alive, the join() call timed out.
    
            When the timeout argument is not present or None, the operation will
            block until the thread terminates.
    
            A thread can be join()ed many times.
    
            join() raises a RuntimeError if an attempt is made to join the current
            thread as that would cause a deadlock. It is also an error to join() a
            thread before it has been started and attempts to do so raises the same
            exception.
    
            """
            if not self._initialized:
                raise RuntimeError("Thread.__init__() not called")
            if not self._started.is_set():
                raise RuntimeError("cannot join thread before it is started")
            if self is current_thread():
                raise RuntimeError("cannot join current thread")
    
            if timeout is None:
                self._wait_for_tstate_lock()
            else:
                # the behavior of a negative timeout isn't documented, but
                # historically .join(timeout=x) for x<0 has acted as if timeout=0
                self._wait_for_tstate_lock(timeout=max(timeout, 0))
    
        def _wait_for_tstate_lock(self, block=True, timeout=-1):
            # Issue #18808: wait for the thread state to be gone.
            # At the end of the thread's life, after all knowledge of the thread
            # is removed from C data structures, C code releases our _tstate_lock.
            # This method passes its arguments to _tstate_lock.acquire().
            # If the lock is acquired, the C code is done, and self._stop() is
            # called.  That sets ._is_stopped to True, and ._tstate_lock to None.
            lock = self._tstate_lock
            if lock is None:  # already determined that the C code is done
                assert self._is_stopped
            elif lock.acquire(block, timeout):
                lock.release()
                self._stop()
    
        @property
        def name(self):
            """A string used for identification purposes only.
    
            It has no semantics. Multiple threads may be given the same name. The
            initial name is set by the constructor.
    
            """
            assert self._initialized, "Thread.__init__() not called"
            return self._name
    
        @name.setter
        def name(self, name):
            assert self._initialized, "Thread.__init__() not called"
            self._name = str(name)
    
        @property
        def ident(self):
            """Thread identifier of this thread or None if it has not been started.
    
            This is a nonzero integer. See the thread.get_ident() function. Thread
            identifiers may be recycled when a thread exits and another thread is
            created. The identifier is available even after the thread has exited.
    
            """
            assert self._initialized, "Thread.__init__() not called"
            return self._ident
    
        def is_alive(self):
            """Return whether the thread is alive.
    
            This method returns True just before the run() method starts until just
            after the run() method terminates. The module function enumerate()
            returns a list of all alive threads.
    
            """
            assert self._initialized, "Thread.__init__() not called"
            if self._is_stopped or not self._started.is_set():
                return False
            self._wait_for_tstate_lock(False)
            return not self._is_stopped
    
        isAlive = is_alive
    
        @property
        def daemon(self):
            """A boolean value indicating whether this thread is a daemon thread.
    
            This must be set before start() is called, otherwise RuntimeError is
            raised. Its initial value is inherited from the creating thread; the
            main thread is not a daemon thread and therefore all threads created in
            the main thread default to daemon = False.
    
            The entire Python program exits when no alive non-daemon threads are
            left.
    
            """
            assert self._initialized, "Thread.__init__() not called"
            return self._daemonic
    
        @daemon.setter
        def daemon(self, daemonic):
            if not self._initialized:
                raise RuntimeError("Thread.__init__() not called")
            if self._started.is_set():
                raise RuntimeError("cannot set daemon status of active thread")
            self._daemonic = daemonic
    
        def isDaemon(self):
            return self.daemon
    
        def setDaemon(self, daemonic):
            self.daemon = daemonic
    
        def getName(self):
            return self.name
    
        def setName(self, name):
            self.name = name
    多线程源码

     通过查看源码我们发现task(函数)其实是被赋值给了_target,而在run方法中又将这个方法加上了()也就是调用这个方法。

    import threading
    import time
    """
    # t = threading.Thread(target=)
    
    def task(arg):
        time.sleep(2)
        print(arg)
    
    for i in range(10):
        t = threading.Thread(target=task,args=(i,))
        t.setDaemon(False)
        t.start()
    
    print("end")
    """
    def task(arg):
        time.sleep(1)
        print(arg)
    
    class MyThread(threading.Thread):
        def __init__(self,*args,**kwargs):
            super(MyThread,self).__init__(*args,**kwargs)
    
        def run(self):
            print(".....None")
    
    M = MyThread()
    M.start()
    def task(arg):
        time.sleep(1)
        print(arg)
    
    class MyThread(threading.Thread):
        def __init__(self,func,*args,**kwargs):
            super(MyThread,self).__init__(*args,**kwargs)
            self.func = func
    
        def run(self):
            self.func(*self._args, **self._kwargs)
    
    M = MyThread(func=task,args=(11,))
    M.start()
    变异写法

     3.3:线程锁之LockRlock信号量

      - Lock:单锁

    import threading
    import time
    
    v = 10
    lock = threading.Lock()
    
    def task():
        time.sleep(2)
        lock.acquire()
        global v
        v -= 1
        print(v)
        lock.release()
    
    for i in range(10):
        t = threading.Thread(target=task)
        t.start()

      - RLock:多锁,他和Lock的最主要区别就是可以acquire两次,只需要在解锁的时候release两次就行

    import threading
    import time
    
    v = 10
    lock = threading.RLock()
    
    def task():
        time.sleep(2)
        lock.acquire()
        lock.acquire()
        global v
        v -= 1
        print(v)
        lock.release()
        lock.release()
    
    for i in range(10):
        t = threading.Thread(target=task)
        t.start()

      - 信号量:同一时刻可以有多个人穿过锁

    import threading
    import time
    
    v = 10
    lock = threading.BoundedSemaphore(2)
    
    def task():
        lock.acquire()
        time.sleep(2)
        global v
        v -= 1
        print(v)
        lock.release()
    
    for i in range(10):
        t = threading.Thread(target=task)
        t.start()

     3.4:Event事件锁

    import threading
    import time
    
    lock = threading.Event()
    
    def task(arg):
    
        time.sleep(1)
        lock.wait()
        print(arg)
    
    for i in range(10):
        t = threading.Thread(target=task,args=(i,))
        t.start()
        
    while True:
        world = input(">>>").strip()
        if world == "1":
            lock.set()

     - Condition

    import threading
    import time
    
    lock = threading.Condition()
    
    def task(arg):
        time.sleep(1)
        # 锁住所有的线程
        lock.acquire()
        lock.wait()
        # 申请使用锁,其他人等
        print('线程',arg)
        lock.release()
    
    
    for i in range(10):
        t = threading.Thread(target=task,args=(i,))
        t.start()
    while True:
        value = input('>>>>')
        lock.acquire()
        lock.notify(int(value))
        lock.release()

     3.6线程池

       线程池的用处:一般我们在使用多线程工作的时候,当线程过多,线程在上下文切换的时候,多线程不仅不会加快速度有可能还会拖垮机器,如果这个时候我们可以固定创建数个线程让他们工作,反而会将多线程的优势发挥。

    #导入创建线程池的模块
    from concurrent.futures import ThreadPoolExecutor
    
    #创建一个线程池,池中最多有5个线程
    
    pool=ThreadPoolExecutor(5)
    
    def task(arg):
        time.sleep(2)
        print(arg)
    
    for i in range(10):
        pool.submit(task,i)
    基本使用
    def txt(future):
        download_response = future.result()
        print('处理中',download_response.url,download_response.status_code)
    
    def download(url):
        response = requests.get(url)
    
        return response # response包含了下载的所有内容
    
    pool = ThreadPoolExecutor(2)
    url_list = [
        'http://www.oldboyedu.com',
        'http://www.autohome.com.cn',
        'http://www.baidu.com',
    ]
    for url in url_list:
        # 去连接池中获取链接
        # 去下载吧
        print('开始请求',url)
        future = pool.submit(download,url)
        # 下载完成之后,执行txt函数
        future.add_done_callback(txt)
    分布式执行
    #-*-coding:utf8-*-
    import requests
    
    from concurrent.futures import ThreadPoolExecutor
    
    class Multithreading(object):
        def __init__(self,task,*args,**kwargs):
            self.task = task
            self._args = args
            self._kwargs = kwargs
    
        def run_task(self,thread_num=None,):
            pool = ThreadPoolExecutor(thread_num)
            for item in self.task:
                key = item['url']
                value = item['function']
                future = pool.submit(self.download,key)
                if hasattr(self,value):
                    function = getattr(self,value)
                    function(future,key)
                else:
                    raise KeyError
    
        def download(self,http_url):
            response = requests.get(http_url,timeout=5)
            return response
    
    
        def get_code(self,future,url):
            download_response = future.result()
            print("执行了get_code函数")
    
    url_list = [
        {'url':"http://www.baidu.com",'function':'get_code'},
        {'url':"http://www.12306.cn",'function':'get_code'},
        {'url':"http://www.zimuzu.tv",'function':'get_code'},
    ]
    
    M = Multithreading(url_list)
    M.run_task(3)
    变种方法

    四、进程、

     4.1 进程的基本使用

      - function.daemon = True or False 主进程是否等待其他进程执行完成,FALSE为等待,Ture为不等待

      - function.join() 设置主进程最多等待其他进程多少秒

    from multiprocessing import Process
    import time
    
    def task(arg):
        time.sleep(arg)
        print(arg)
    
    
    if __name__ == '__main__': #windows需要将进程执行的过程放到main里面,linux或者mac os 可以直接执行
        for i in range(10):
            p = Process(target=task,args=(i,))
            p.daemon = True
            p.start()
            p.join(1)
        print("主进程执行完毕")

      4.2 进程之间数据共享

       - 在上面介绍进程和线程之间的区别中我们写到,线程之间数据共享而进程之间无法互相访问数据。但是在python中通过特定的模块,也能使进程之间共享数据

      

    from multiprocessing import Process
    import time
    
    def task(arg,li):
        li.append(arg)
        print(li)
    
    
    if __name__ == '__main__':
        v = []
        for i in range(10):
            p = Process(target=task,args=(i,v))
            p.start()
        print("主进程执行完毕")
    进程数据不共享实例

      在python3 中我们可以导入一个叫做Array的模块在帮助进程之间实现数据共享

    from multiprocessing import Process,Array #导入array模块
    import time
    
    def task(arg,li):
        li[arg] = 1
        print(list(li))
    
    
    if __name__ == '__main__':
        v = Array('i',10)   #array有两个参数,第一个是数据类型,第二个是这个变量的长度
        for i in range(10):
            p = Process(target=task,args=(i,v))
            p.start()
        print("主进程执行完毕")

       在Python3还有另外一种方式可以共享多进程的数据,Manger。这种方式的原理就是每个进程之间启动一个socket监听。这种方式的缺点就是不知道对方的进程是否已经执行完成,所以监控的socket可能当进程结束之后都不知道它已经结束了。所以我们使用p.join把这个多进程模式改成了串行。或者自己可以写监听程序等。。。

    from multiprocessing import Process,Manager
    import  time
    
    def task(num,li):
        li.append(num)
        print(li)
    
    if __name__ == '__main__':
        v = Manager().list()
        # v = Manager().dict()
        for i in range(10):
            p = Process(target=task,args=(i,v,))
            p.start()
            p.join()

     4.3 进程池

      基本使用:进程池的使用方法和线程池是一模一样的,我们的代码甚至都不需要改变,只需要把导入的模块改变一下就行了。

    from concurrent.futures import ProcessPoolExecutor
    import time
    
    def task(arg):
        time.sleep(1)
        print(arg   )
    
    if __name__ == '__main__':
        
        for i in range(10):
            pool = ProcessPoolExecutor(5)
            pool.submit(task,i)
            

      分布执行。

    from concurrent.futures import ProcessPoolExecutor
    import time
    
    def task(arg):
        time.sleep(1)
        print(arg   )
    
    def callback(future):
        result = future.result()
        print(result)
    
    if __name__ == '__main__':
    
        for i in range(10):
            pool = ProcessPoolExecutor(5)
            future = pool.submit(task,i)
            future.add_done_callback(callable)

      4 协程

      定义:协程是一个线程,对线程的一个分片处理。协程在线程里面只有来回切换的功能。

      PS:单纯的协程是没有任何意义的。它的一个比较经典案例——IO多路复用。IO多路复用就是用协程在线程之间切换,因为线程在遇到IO会阻塞住,这也就是表示这个线程必须要等待IO相应,等待对方的回应,这样的一个IO操作才算完成。IO操作会使线程的效率大大下降,如果我们能在线程遇到IO等待的时候做一点其他的事情这样是不是能让这个线程最大化呢。

      - select

      如果用过NGINX的人看到select会很熟悉,我们通常在配置NGINX的时候会添加一个use   epoll;epoll实际是一种事件驱动模型,像这样的还有其他两种一个是poll另一个是select。

      select创建会创建三个对象(都是列表),分别监听着可读,可写,和错误信息,如果有可读的信息就自动会被放入第一个对象中,可写就会放入第二个对象,错误的信息则会被放到第三个对象中。

    import socket
    import select
    
    sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sk.bind(('127.0.0.1',3309))
    sk.listen(5)
    
    sk2 = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sk2.bind(('127.0.0.1',3310))
    sk2.listen(5)
    
    inputs = [sk,sk2]
    
    while True:
        # print("test")
        r,w,e = select.select(inputs,[],[],0.05)
        #如果客户端连接3309端口r=[sk]
        #如果客户端连接3310 r=[sk2]
        #如果客户端同时连接两个端口 r = [sk,sk2]
        #如果客户端没有连接过来 r = [],0.05是循环的超时时间,这里需要记住,每次循环的时候如果没有新的连接或者是回话r都是空列表,所r是检测inputs的变化
        for obj in r:
            #如果对象是r==》sk  or sk2 这就是一个连接循环
            if obj in [sk,sk2]:
                print("有用户连接进来了",obj)
                conn,add = obj.accept()  #等待client连接,获取回话循环
                # append  to read msg ,so inputs have client msg
                inputs.append(conn)
            else:
                print("有新回话过来了",obj)
                try:
                    # wait client send msg and if client disconnect 
                    client_date = obj.recv(1024)
                except Exception as e:
                    # client_date is None on client disconnect
                    client_date = ""
                # if client have msg ,and send to it 
                if client_date:
                    obj.sendall(client_date)
                else:
                    #user is disconnect ,connect close and remove object list msg
                    print("用户的连接断开了",obj)
                    obj.close()
                    inputs.remove(obj)
    import socket
    import select
    
    sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sk.bind(('127.0.0.1',3309))
    sk.listen(5)
    
    sk2 = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sk2.bind(('127.0.0.1',3310))
    sk2.listen(5)
    
    inputs = [sk,sk2]
    w_inputs = []
    
    while True:
        # print("test")
        r,w,e = select.select(inputs,w_inputs,[],0.05)
        #如果客户端连接3309端口r=[sk]
        #如果客户端连接3310 r=[sk2]
        #如果客户端同时连接两个端口 r = [sk,sk2]
        #如果客户端没有连接过来 r = [],0.05是循环的超时时间,这里需要记住,每次循环的时候如果没有新的连接或者是回话r都是空列表,所r是检测inputs的变化
        for obj in r:
            #如果对象是r==》sk  or sk2 这就是一个连接循环
            if obj in [sk,sk2]:
                print("有用户连接进来了",obj)
                conn,add = obj.accept()  #等待client连接,获取回话循环
                # append  to read msg ,so inputs have client msg
                inputs.append(conn)
            else:
                print("有新回话过来了",obj)
                try:
                    # wait client send msg and if client disconnect
                    client_date = obj.recv(1024)
                except Exception as e:
                    # client_date is None on client disconnect
                    client_date = ""
                # if client have msg ,and send to it
                if client_date:
                    w_inputs.append(obj)
                else:
                    #user is disconnect ,connect close and remove object list msg
                    print("用户的连接断开了",obj)
                    obj.close()
                    inputs.remove(obj)
    
        for w_obj in w:
            w_obj.sendall(client_date)
            w_inputs.remove(w_obj)
    读写分离
    setDaemon
  • 相关阅读:
    jsonp 监控简陋代码
    Oracle多线程并行使用、关联与指定索引执行
    Oracle计算时间差表达式
    行查列显
    JS时间
    通过string型类名实例化一个类
    sotower1.5-LS_工作流容易出错的地方
    oracle11g ORA-12505
    华项笔记本显示颜色变黄,调整好了重启还是偏黄
    ORA-00031: session marked for kill 处理Oracle中杀不掉的锁
  • 原文地址:https://www.cnblogs.com/yanlinux/p/8540561.html
Copyright © 2011-2022 走看看