zoukankan      html  css  js  c++  java
  • 多线程和多进程

    一、线程和进程

      多任务的实现方式有3中:
                            1:多进程模式
                            2:多线程模式
                            3:多进程+多线程模式
    Python既支持多线程又支持多进程。
    1:多进程:
    Unix/Linux操作系统中提供一个fork()函数 在Python的OS模块中就封装了常见的系统调用 其中就包括 fork() 可以再Python中轻松的创建子进程。
        #multiprocess.py
     import os
                        
     print 'Process (%s) start....' % os.getpid()
     pid=os.fork()
     if pid==0:
        print 'I am child process (%s) and my parent is %s ' %       (os.getpid(),os.getppid())
     else:
        print 'i am father process'
     由于windows上面没有fork()故上述代码不可以在windows上面运行。
                
    由于Python是跨平台的 自然也提供一个跨平台的多进程支持 multiprocessing 模块就是跨平台版本的多进程支持
                    
    multiprocessing 模块提供了一个Process类来代表 一个进程对象 下面的例子演示了启动一个子进程并等待其结束
     form multiprocessing import Process
                        import os
                    
                        #子进程要执行的代码
                        def run_proc(name):
                            print 'Run child process %s (%s)' %(name,os.getpid())
            
                        if __name__=='__main__':
                            print 'Parent process %s' % os.getpid()
                            p=Process(target=run_proc,args=('test',))
                            print 'Process will start'
                            p.start()
                            p.join()
                            print 'Process end'.
                        运行结果如下:
                        Parent process 389
                        Process will start
                        Run child process test (340)
                        Process end
     创建子进程时候 只需要传入一个执行函数和函数的参数即可 穿件一个process实例 然后用start()函数启动 join()函数等待子进程结束后在继续往下运行。
                        
     如果要启动大量的子进程 可以用进程池的方式批量创建子进程:
      from multiprocessing import Pool
                                import os,time,random
                
                                def long_time_task(name):
                                    print 'Run task %s %s' %(name,os.getpid())
                                    start = time.time()
                                    time.sleep(random.random*3)
                                    end=time.time()
                                    print 'Task %s runs %0.2f seconds' % (name,(end-start))
                    
                                if __name__=='__main__'
                                    print 'Parent process %s' % os.getpid()
                                    p=Pool()
                                    for i in range(5):
                                        p.apply_async(long_time_task,args(i,))
                                    print 'waiting for all subprocesses done.....'
                                    p.close()
                                    p.join()
                                    print 'All subprocesses done'
     调用join之前先调用close 调用close之后就不能够继续添加新的Process 了
                
                        进程间的通行:
                            Python的multiprocessing模块包装了底层的机制 提供了Queue Pipes 等多种方式来交换数据                
                            以Queue为例 在父进程中创建两个子进程 一个往Queue中写数据 一个从Queue中读数据
    from multiprocessing import Process ,Queue
                                import os,time,random
                    
                                #写数据进程执行的代码
                                def write(q):
                                    for value in ['A','B','C']:
                                        print 'Put %s to queue ....' %value
                                        q.put(value)
                                        time.sleep(random.random())
                                #读数据进程执行的代码:
                                def read(q):
                                    while True:
                                        value=q.get(True)
                                        print 'Get %s from queue'  %value
                        
                                if __name__=='__main__':
                                    #父进程创建Queue 并传给各个子进程
                                    q=Queue()
                                    pw=Process(target=write,args=(q,))
                                    pr=Process(target=read,args=(q,))
                                    #启动子进程 pw
                                    pw.start()
                                    #启动子进程pr
                                    pr.start()
                                    #等待pw结束
                                    pw.join()
                                    #pr进程里是死循环 无法等待结束 只能强行终止
                                    pr.terminate()
    2:多线程:
                            一个进程至少有一个线程 Python的线程是真正的 Posix Thread 而不是模拟出来的线程
                            Python 的标准库提供了两个模块 thread和threading 。thread是低级模块 threading是高级模块 对thread进行了封装 绝大多数情况下 我们只需要使用threading这个高级模块
                            启动一个线程就是把这个函数传入并创建thread实例 然后调用start()开始执行
     #-*-coding:utf-8-*-
    import time,threading
    #新线程执行的代码
    def loop():
     print 'Thread %s is runing ..' % threading.current_thread().name
     n=0
     while n<5:
      n=n+1
      print 'thread %s >>> %s' % (threading.current_thread().name,n)
      time.sleep(1)
     print 'thread %s ended' % threading.current_thread().name
    print 'thread %s is runing ' % threading.current_thread().name
    t=threading.Thread(target=loop,name='LoopThread')
    t.start()
    t.join()
    print 'thread %s ended:' % threading.current_thread().name
    由于任何进程默认启动一个线程,这个线程就是主线程,MainTread 主线程又可以启动新的线程 Python的threading模块有个current_thread()函数 它返回
                            当前线程的实例。子线程的名字在创建的时候指定,名字仅仅在打印的时候显示 完全没有其他的意义。
                                
                        lock:
                            多线程和多进程的最大的不同在与,多进程中 同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中所有的变量都有线程共享,所以 任何一个变量都可以被任何一个
                            线程修改,因此 吸纳成之间共享的数据最大的危险在与多个线程同时修改一个变量 把内容该乱了
                            为了保证多个线程不能同时执行同一条语句我们增加 了锁 的概念:
    #-*-coding:utf-8-*-
    import time,threading
    #假定这是银行存款
    balance=0
    lock=threading.Lock()
    def change_it(n):
     #先存后取 结果应该为0
     global balance
     print '%s balance = %d' %(threading.current_thread().name,balance)
     balance=balance+n
     print '%s balance = %d' %(threading.current_thread().name,balance)
     balance=balance-n
     print '%s balance = %d' %(threading.current_thread().name,balance)
    def run_thread(n):
     for x in range(100000):
      lock.acquire()
      try:
       change_it(n)
      except Exception, e:
       raise e
      finally:
       lock.release()
    t1=threading.Thread(target=run_thread,args=(5,))
    t2=threading.Thread(target=run_thread,args=(8,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print balance
    当多个线程执行到lock.acquire()时 只有一个线程能够成功的获取锁 然后继续执行代码 其他线程等待知道获得锁为止。
                            获得锁的线程用完后一定要释放锁 否则那些苦苦等待的进程将永远等待下去 成为死进程 
                            锁保证了某段关键代码只能有一个线程从头到尾的完整执行 坏处就是阻止 了多线程的并发 降低了效率 其次就是由于可以存在多个锁 不同线程持有不同的锁 可能导致死锁的发生。
     
                        多核CPU:
                            一个死循环会100%占用一个CPU 如果有两个死循环 在多核CPU中 可以监控到会占用200%的CPU 也就是占用两个CPU核心
                            要想把N个CPU的核心都爆满 就必须启动N个死循环线程
                            由于Python中代码有一个GIL锁 任何Python线程执行前 必须获得GIL锁 然后每执行100条字节码 解释器自动释放GIL锁 让别的线程又机会执行。这是历史遗留问题
                            因此 在Python中可以使用多线程 但是不要指望能够有效的利用多核  可以通过多进程来实现多核任务。
                       
                    4:ThreadLocal
                        在多线程的环境下 每个线程都有自己的数据 一个线程使用自己的局部变量比使用全局变量好 因为局部变量只有自己能够看见 不会影响其他的线程而全局变量的修改必须加锁
                        但是局部变量也有问题 就是在函数调用的时候 传递起来麻烦
    def process_student(name):
                            std =Student(name)
                            #std 是局部变量 但是每个函数都要用它 因此必须穿进去
                            do_task_1(std)
                            do_task_2(std)
                
                        def do_task_1(std)
                            do_subtask_1(std)
                            do_subtask_2(std)
                
                        def do_task_2(std)
                            do_subtask_1(std)
                            do_subtask_2(std)
     这样一级一级传递极为麻烦
                        因此 ThreadLocal 就应运而生 
    #-*-coding:utf-8-*-
    import threading
    #创建全局ThreadLocal对象:
    local_school=threading.local()
    def process_student():
     print 'Hello ,%s (in %s)' %(local_school.student,threading.current_thread().name)
    def process_thread(name):
     #绑定threadlocal的student
     local_school.student=name
     process_student()
    t1=threading.Thread(target=process_thread,args=('Alice',),name='Thread-A')
    t2=threading.Thread(target=process_thread,args=('Bob',),name='Thread-B')
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    全局变量local_student 就是一个ThreadLocal 对象 每个thread对他都可以读写student属性 但是相互之间不影响 
                                
                       
                 5 分布式进程:
                        在thread和process中应当优先选择process 因为process跟稳定 而且process可以分布到多台机器上 而thread最多只能在一台机器的多个CPU上
                        Python的multiprocess模块不但支持多进程 其中。managers子模块还支持把多个进程分布到多台机器上 一个服务进程作为调度 将任务分布到其他的进程中 依靠网络通信
                        由于managers 封装的很好 不必了解网络通信的细节 就可以很容易的编写分布式多进程程序
                        例如:  把发送任务的进程和处理任务的进程分布到两个机器上:
                        服务进程 启动Queue 把Queue 注册到网上 然后往   Queue里面写入任务:
     #taskmanager.py
                            import random,time,Queue
                            from multiprocessing.managers import BaseManager
                
                            #发送任务的队列
                            task_queue=Queue.Queue()
                            #接受结果的队列
                            result_queue=Queue.Queue()
                        
                            #从basemanager 继承queuemanager
                            class QueueManager(BaseManager):
                                pass
                    
                            # 把两个Queue都注册到网上 callable参数关联了Queue对象:
                            QueueManager.register('get_task_queue',callable=lambda:task_queue)
                            QueueManager.register('get_result_queue,callable=lambda:result_queue')
                
                            #绑定端口50000 设置验证码 abc
                            manager=QueueManager(address=('',5000),authkey='abc')
                    
                            #启动Queue
                            manager.start()
                            
                            #获得通过网络访问的Queue对象:
                            task=manager.get_task_queue()
                            result=manager.get_result_queue()
                            #放几个任务进去
                            for i in range(10):
                                n=random.randint(0,10000)
                                print('Put task %d' %n)
                                task.put(n)
                            #从result队列读取结果
                            print('Try get results....'):
                            for i in range(10):
                                r=result.get(timeout=10)
                                print('Result:%s' % r)
                            #关闭
                            manager.shutdown()

    在另一台计算机上启动:

     #taskworker.py
                            import time,sys,Queue
                            from multiprocessing.managers import BaseManager
                            #创建类似的QueueManager
                            class QueueManager(BaseManager):
                                pass
                
                            #由于这个QueueManager 只从网上获得注册时只提供了名字
                            QueueManager.register('get_task_queue')
                            QueueManager.register('get_task_result')
                        
                            #链接到服务器 也就是运行taskmanager.py的机器
                            server_addr='127.0.0.1'
                            print('connect to server %s ..' % server_addr)
            
                            #端口验证码一致
                            m=QueueManager(address=(server_addr,5000),authkey='abc')
                            #从网络链接
                            m.connect()
                            #获取queue对象
                            task=m.get_task_queue()
                            result=m.get_task_result()
                            #从task队列中取出数据 并把结果放到result队列中
                            for i in range(10):
                                try:
                                    n=task.get(timeout=1)
                                    print('run task %d*%d' %(n,n))
                                    r='%d*%d=%d' % (n,n,n*n)
                                    time.sleep(1)
                                    result.put(r)
                                except Queue.Empty:
                                    print('task queue is empty')
                            #处理结束
                            print('work exit')

    这样就可以将任务拆分 并将任务发送到几台 几十台机器上进行处理。

    二、memcache

    理解一些概念:
    Memcache是一个自由和开放源代码、高性能、分配的内存对象缓存系统。用于加速动态web应用程序,减轻数据库负载。它可以应对任意多个连接,使用非阻塞的网络IO。由于它的工作机制是在内存中开辟一块空间,然后建立一个HashTable,Memcached自管理这 些HashTable。Memcached是简单而强大的。它简单的设计促进迅速部署,易于发展所面临的问题,解决了很多大型数据缓存。它的API可供最流行的语言。Memcache是该系统的项目名称,Memcached是该系统的主程序文件,以守护程序方式运行于一个或多个服务器中,随时接受客 户端的连接操作,使用共享内存存取数据。
    Memcached最吸引人的一个特性就是支持分布式部署;也就是说可以在一群机器上建立一堆Memcached服务,每个服务可以根据具体服务器的硬件配置使用不同大小的内存块,这样一来,理论上可以建立一个无限巨大的基于内存的cache storage系统。

    1.分别把memcached和libevent下载回来,放到 /tmp 目录下
    Memcache用到了libevent这个库用于Socket的处理,所以还需要安装libevent。

    cd /tmp
    wget http://www.danga.com/memcached/dist/memcached-1.2.0.tar.gz
     wget http://www.monkey.org/~provos/libevent-1.2.tar.gz

    先安装libevent

    $ tar zxvf libevent-1.2.tar.gz
    $ cd libevent-1.2
    $ ./configure –prefix=/usr
    $ make
    $ make install

    3.测试libevent是否安装成功

    $ ls -al /usr/lib | grep libevent
    2
    lrwxrwxrwx 1 root root 21 11?? 12 17:38 libevent-1.2.so.1 -> libevent-1.2.so.1.0.3
    3
    -rwxr-xr-x 1 root root 263546 11?? 12 17:38 libevent-1.2.so.1.0.3
    4
    -rw-r–r– 1 root root 454156 11?? 12 17:38 libevent.a
    5
    -rwxr-xr-x 1 root root 811 11?? 12 17:38 libevent.la
    6
    lrwxrwxrwx 1 root root 21 11?? 12 17:38 libevent.so -> libevent-1.2.so.1.0.3

    还不错,都安装上了。

    4.安装memcached,同时需要安装中指定libevent的安装位置

    1
    $ cd /tmp
    2
    $ tar zxvf memcached-1.2.0.tar.gz
    3
    $ cd memcached-1.2.0
    4
    $ ./configure –with-libevent=/usr # 注意这个配置跟着libevent走
    5
    $ make
    6
    $ make install

    如果中间出现报错,请仔细检查错误信息,按照错误信息来配置或者增加相应的库或者路径。
    安装完成后会把memcached放到 /usr/local/bin/memcached ,
    5.测试是否成功安装memcached

    1
    $ ls -al /usr/local/bin/mem*
    2
    -rwxr-xr-x 1 root root 137986 11?? 12 17:39 /usr/local/bin/memcached
    3
    -rwxr-xr-x 1 root root 140179 11?? 12 17:39 /usr/local/bin/memcached-debug

    6 安装Python-memcached安装

    $ sudo apt-get install python-memcache

    memcached运行参数:

    /usr/local/memcached/bin/memcached -d -m 128 -u root -l 192.168.0.97 -c 256 -P /tmp/memcached.pid

    -d选项是启动一个守护进程,
    -m是分配给Memcache使用的内存数量,单位是MB,我这里是10MB,
    -u是运行Memcache的用户,我这里是root,
    -l是监听的服务器IP地址,如果有多个地址的话,我这里指定了服务器的IP地址192.168.22.200(不指定为本机)
    -p是设置Memcache监听的端口,我这里设置了12000,最好是1024以上的端口,
    -c选项是最大运行的并发连接数,默认是1024,我这里设置了256,按照你服务器的负载量来设定,
    -P是设置保存Memcache的pid文件,我这里是保存在 /tmp/memcached.pid,

    python使用例子:

    #!/usr/bin/env python
    import memcache
    mc = memcache.Client(['127.0.0.1:12000'],debug=0)
    mc.set("foo","bar")
    value = mc.get("foo")
    print value

    Python-memcached API总结
    整个memcache.py只有1241行,相当精简
    主要方法如下:
    @set(key,val,time=0,min_compress_len=0)
    无条件键值对的设置,其中的time用于设置超时,单位是秒,而min_compress_len则用于设置zlib压缩(注:zlib是提供数据压缩用的函式库)
    @set_multi(mapping,time=0,key_prefix=”,min_compress_len=0)
    设置多个键值对,key_prefix是key的前缀,完整的键名是key_prefix+key, 使用方法如下
    >>> mc.set_multi({‘k1′ : 1, ‘k2′ : 2}, key_prefix=’pfx_’) == []
    >>> mc.get_multi(['k1', 'k2', 'nonexist'], key_prefix=’pfx_’) == {‘k1′ : 1, ‘k2′ : 2}
    @add(key,val,time=0,min_compress_len=0)
    添加一个键值对,内部调用_set()方法
    @replace(key,val,time=0,min_compress_len=0)
    替换value,内部调用_set()方法
    @get(key)
    根据key去获取value,出错返回None
    @get_multi(keys,key_prefix=”)
    获取多个key的值,返回的是字典。keys为key的列表
    @delete(key,time=0)
    删除某个key。time的单位为秒,用于确保在特定时间内的set和update操作会失败。如果返回非0则代表成功
    @incr(key,delta=1)
    自增变量加上delta,默认加1,使用如下
    >>> mc.set(“counter”, “20″)
    >>> mc.incr(“counter”)
    @decr(key,delta=1)
    自减变量减去delta,默认减1

     
  • 相关阅读:
    ubuntu切换到root
    ubuntu vim退出时出错
    easy_install和pip的安装及使用
    Swap file "/etc/.hosts.swp" already exists! [O]pen Read-Only, (E)dit anyway, (R)ecover, (D)elete it,
    ERROR 2003 (HY000): Can't connect to MySQL server on 'ip地址' (110)
    Android-PullToRefresh(一)
    Android Exception 6 (adapter is not modified from a background thread)
    Android Exception 5(startActivityForResult & singleTask)
    【协议篇】UDP
    【协议篇】TCP
  • 原文地址:https://www.cnblogs.com/lidong94/p/5692115.html
Copyright © 2011-2022 走看看