zoukankan      html  css  js  c++  java
  • 分布式任务系统gearman的python实战

    Gearman是一个用来把工作委派给其他机器、分布式的调用更适合做某项工作的机器、并发的做某项工作在多个调用间做负载均衡、或用来在调用其它语言的函数的系统。Gearman是一个分发任务的程序框架,可以用在各种场合,开源、多语言支持、灵活、快速、可嵌入、可扩展、无消息大小限制、可容错,与Hadoop相比,Gearman更偏向于任务分发功能。它的任务分布非常简单,简单得可以只需要用脚本即可完成。Gearman最初用于LiveJournal的图片resize功能,由于图片resize需要消耗大量计算资 源,因此需要调度到后端多台服务器执行,完成任务之后返回前端再呈现到界面。

    gearman的任务传递模式是一对一的,不能实现一对多,一个client通过job server最后只能够到达一个worker上。如果需要一对多,需要定义多个worker的function,依次向这些worker进行发送,非常的不方便。这一点就不如ZeroMQ,ZeroMQ支持的模式很多,能够满足各种消息队列需求。他们用在不同的场合,Gearman是分布式任务系统,而ZeroMQ是分布式消息系统,任务只需要做一次就行。

    1. Server

    1.1 Gearman工作原理

    Gearman 服务有很多要素使得它不仅仅是一种提交和共享工作的方式,但是主要的系统只由三个组件组成: gearmand 守护进程(server),用于向 Gearman 服务提交请求的 client ,执行实际工作的 worker。其关系如下图所示:

    Gearmand server执行一个简单的功能,即从client收集job请求并充当一个注册器,而worker可以在此提交关于它们支持的job和操作类型的信息,这样server实际上就充当了Client和Worker的中间角色。Client将job直接丢给server,而server根据worker提交的信息,将这些job分发给worker来做,worker完成后也可返回结果,server将结果传回client。举个例子,在一个公司里面,有老板1、老板2、老板3(client),他们的任务就是出去喝酒唱歌拉项目(job),将拉来的项目直接交给公司的主管(server),而主管并不亲自来做这些项目,他将这些项目分给收手下的员工(worker)来做,员工做完工作后,将结果交给主管,主管将结果报告给老板们即可。

    要使用gearman,首先得安装server,下载地址:https://launchpad.net/gearmand。当下载安装完成后,可以启动gearmand,启动有很多参数选项,可以man gearmand来查看,主要的 选项有:

    • -b, --backlog=BACKLOG       Number of backlog connections for listen. 
    • -d, --daemon                Daemon, detach and run in the background. 
    • -f, --file-descriptors=FDS  Number of file descriptors to allow for the process                             
    • (total connections will be slightly less). Default     is max allowed for user. 
    • -h, --help                  Print this help menu. 
    • -j, --job-retries=RETRIES   Number of attempts to run the job before the job  server removes it. Thisis helpful to ensure a bad  job does not crash all available workers. Default  is no limit. 
    • -l, --log-file=FILE         Log file to write errors and information to. Turning this option on also forces the first  verbose level to be enabled. 
    • -L, --listen=ADDRESS        Address the server should listen on. Default is  INADDR_ANY. 
    • -p, --port=PORT             Port the server should listen on. 
    • -P, --pid-file=FILE         File to write process ID out to. 
    • -r, --protocol=PROTOCOL     Load protocol module. 
    • -R, --round-robin           Assign work in round-robin order per  workerconnection. The default is to assign work in  the order of functions added by the worker. 
    • -q, --queue-type=QUEUE      Persistent queue type to use. 
    • -t, --threads=THREADS       Number of I/O threads to use. Default=0. 
    • -u, --user=USER             Switch to given user after startup. 
    • -v, --verbose               Increase verbosity level by one. 
    • -V, --version               Display the version of gearmand and exit. 
    • -w, --worker-wakeup=WORKERS Number of workers to wakeup for each job received.   The default is to wakeup all available workers.

    启动gearmand:

    [plain] view plain copy
     
    1. sudo gearmand --pid-file=/var/run/gearmand/gearmand.pid --daemon --log-file=/var/log/gearman.log  
    若提示没有/var/log/gearman.log这个文件的话,自己新建一个就可以了。

    1.2 实例化queue与容错

    Gearman默认是将queue保存在内存中的,这样能够保障速速,但是遇到宕机或者server出现故障时,在内存中缓存在queue中的任务将会丢失。Gearman提供了了queue实例化的选项,能够将queue保存在数据库中,比如:SQLite3、Drizzle、MySQL、PostgresSQL、Redis(in dev)、MongoDB(in dev).在执行任务前,先将任务存入持久化队列中,当执行完成后再将该任务从持久化队列中删除。要使用db来实例化queue,除了在启动时加入-q参数和对应的数据库之外,还需要根据具体的数据库使用相应的选项,例如使用sqlit3来实例化queue,并指明使用用来存储queue的文件:

    [plain] view plain copy
     
    1. gearmand -d -q libsqlite3 --libsqlite3-db=/tmp/demon/gearman.db --listen=localhost --port=4370  

    再如使用mysql来实例化queue,选项为:

    [plain] view plain copy
     
    1. <pre name="code" class="plain">/usr/local/gearmand/sbin/gearmand  -d  -u root   
    2. –queue-type=MySQL   
    3. –mysql-host=localhost   
    4. –mysql-port=3306   
    5. –mysql-user=gearman   
    6. –mysql-password=123456   
    7. –mysql-db=gearman   
    8. –mysql-table=gearman_queue  
    
    

    还要创建相应的数据库和表,并创建gearman用户,分配相应的权限:

    [sql] view plain copy
     
    1. CREATE DATABASE gearman;  
    2. CREATE TABLE `gearman_queue` (  
    3. `id` int(10) unsigned NOT NULL AUTO_INCREMENT,  
    4. `unique_key` varchar(64) NOT NULL,  
    5. `function_name` varchar(255) NOT NULL,  
    6. `when_to_run` int(10) NOT NULL,  
    7. `priority` int(10) NOT NULL,  
    8. `data` longblob NOT NULL,  
    9. PRIMARY KEY (`id`),  
    10. UNIQUE KEY `unique_key_index` (`unique_key`,`function_name`)  
    11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;  
    12.   
    13. create USER gearman@localhost identified by ’123456′;  
    14. GRANT ALL on gearman.* to gearman@localhost;  
    可以在gearman的配置文件中加入相关配置,以免每次启动都需要写一堆东西:
    [plain] view plain copy
     
    1. # /etc/conf.d/gearmand: config file for /etc/init.d/gearmand  
    2.   
    3. # Persistent queue store  
    4. # The following queue stores are available:  
    5. # drizzle|memcache|mysql|postgre|sqlite|tokyocabinet|none  
    6. # If you do not wish to use persistent queues, leave this option commented out.  
    7. # Note that persistent queue mechanisms are mutally exclusive.  
    8. PERSISTENT="mysql"  
    9.   
    10. # Persistent queue settings for drizzle, mysql and postgre  
    11. #PERSISTENT_SOCKET=""  
    12. PERSISTENT_HOST="localhost"  
    13. PERSISTENT_PORT="3306"  
    14. PERSISTENT_USER="gearman"  
    15. PERSISTENT_PASS="your-pass-word-here"  
    16. PERSISTENT_DB="gearman"  
    17. PERSISTENT_TABLE="gearman_queue"  
    18.   
    19. # Persistent queue settings for sqlite  
    20. #PERSISTENT_FILE=""  
    21.   
    22. # Persistent queue settings for memcache  
    23. #PERSISTENT_SERVERLIST=""  
    24.   
    25. # General settings  
    26. #  
    27. # -j, --job-retries=RETRIES   Number of attempts to run the job before the job  
    28. #                             server removes it. Thisis helpful to ensure a bad  
    29. #                             job does not crash all available workers. Default  
    30. #                             is no limit.  
    31. # -L, --listen=ADDRESS        Address the server should listen on. Default is  
    32. #                             INADDR_ANY.  
    33. # -p, --port=PORT             Port the server should listen on. Default=4730.  
    34. # -r, --protocol=PROTOCOL     Load protocol module.  
    35. # -t, --threads=THREADS       Number of I/O threads to use. Default=0.  
    36. # -v, --verbose               Increase verbosity level by one.  
    37. # -w, --worker-wakeup=WORKERS Number of workers to wakeup for each job received.  
    38. #                             The default is to wakeup all available workers.  
    39. GEARMAND_PARAMS="-L 127.0.0.1 --verbose=DEBUG"  

    这其实并不是一个很好的方案,因为当使用数据库来实例化queue时,会增加两个步骤:Client和worker必须连接到server上去读写job,并且数据库在处理的速度上也会大大降低。在大并发任务量的情况下,性能会受到直接影响,你会发现SQLite或者mysql并不能满足处理大量BLOB的性能要求,job会不断地积攒而得不到处理,给一个任务犹如石牛如一样海毫无反应。归根结底,需要根据自己的应用场景,合理设计一些测试用例和自动化脚本,通过实际的运行状态进行参数的调整。

    job分布式系统一个基本的特点就是要有单点容错能力(no  single point failure),还不能有单点性能瓶颈(no single point of bottleneck)。即:一个节点坏了不影响整个系统的业务,一个节点的性能不能决定整个系统的性能。那如果server挂了该怎么办?解决方法是使用多个server:

    [plain] view plain copy
     
    1. gearmand -d -q libsqlite3  --listen=localhost --port=4370  
    [plain] view plain copy
     
    1. gearmand -d -q libsqlite3  --listen=localhost --port=4371  

    每个client连接多个server,并使用负载最低的那个server,当该server挂掉之后,gearman会自动切换到另一个server上,如下图所示:

    1.3 轮询调度

    当job不断地增加时,我们可能需要增加worker服务器来增加处理能力,但你可能会发现任务并不是均匀地分布在各个worker服务器上,因为server分配任务给worker的方式默认按照循序分配的,比如你现有worker-A,在server上注册了5个worker进程,随着任务的增加,又加了一台worker-B,并向同一个server注册了5个worker进程。默认情况下,server会按照worker注册的先后顺序进行调度,即:只有给worker-A分配满任务后才会给worker-B分配任务,即分配方式是wA, wA,wA, wA,wA,wB, wB,wB, wB, wB。为了能够给worker-A和worker-B均匀地分配任务,server可以采用轮询的方式给worker服务器分配任务,即分配方式为: wA, wB, wA, wB ...,那么在启动server时加上选项:-R或者--round-robin

    1.4 受限唤醒

    根据gearman协议的设计,Worker 如果发现队列中没有任务需要处理,是可以通过发送 PRE_SLEEP 命令给服务器,告知说自己将进入睡眠状态。在这个状态下,Worker 不会再去主动抓取任务,只有服务器发送 NOOP 命令唤醒后,才会恢复正常的任务抓取和处理流程。因此 Gearmand 在收到任务时,会去尝试唤醒足够的 Worker 来抓取任务;此时如果 Worker 的总数超过可能的任务数,则有可能产生惊群效应。而通过 –worker-wakeup 参数,则可以指定收到任务时,需要唤醒多少个 Worker 进行处理,避免在 Worker 数量非常大时,发送大量不必要的 NOOP 报文,试图唤醒所有的 Worker。

    1.6 线程模型

    Gearman中有三种线程:

    1. 监听和管理线程。只有一个(负责接收连接,然后分配给I/O线程来处理,如果有多个I/O线程的话,同时也负责启动和关闭服务器,采用libevent来管理socket和信号管道)
    2. I/O线程。可以有多个(负责可读可写的系统调用和对包初步的解析,将初步解析的包放入各自的异步队列中,每个I/O线程都有自己的队列,所以竞争很少,通过-t选项来指定I/O线程数)
    3. 处理线程。只有一个(负责管理各种信息列表和哈希表,比如跟踪唯一键、工作跟踪句柄、函数、工作队列等。将处理结果信息包返回给I/O线程,I/O线程将该包挑选出来并向该连接发送数据)
    其中第1, 3种线程对全局处理性能没有直接影响,虽然处理线程有可能成为瓶颈,但他的工作足够简单消耗可忽略不计,因此我们的性能调优主要目标是在IO线程的数量。对每个IO线程来说,它都会有一个libevent的实例;所有Gearman的操作会以异步任务方式提交到处理线程,并由IO线程获取完成实际操作,因此IO线程的数量是与可并行处理任务数成正比。Gearmand 提供 -t 参数调整总IO线程数,需要使用 libevent 1.4 以上版本提供多线程支持。

    进程句柄数

    另外一个影响大规模部署的是进程句柄数,Gearman会为每一个注册的Worker分配一个fd(文件描述符),而这个fd的总数是受用户限制的,可以使用 ulimit -n 命令查看当前限制
    [plain] view plain copy
     
    1. flier@debian:~$ ulimit -n  
    2. 1024  
    3. flier@debian:~$ ulimit -HSn 4096 // 设置进程句柄数的最大软硬限制  
    4. 4096  
    也就是说gearman缺省配置下,最多允许同时有小于1024个worker注册上来,fd用完之后的Worker和Client会出现连接超时或无响应等异常情况。因此,发生类似情况时,我们应首先检查 /proc/[PID]/fd/ 目录下的数量,是否已经超过 ulimit -n 的限制,并根据需要进行调整。而全系统的打开文件设置,可以参考 /proc/sys/fs/file-max 文件,并通过 sysctl -w fs.file-max=[NUM] 进行修改。
    [plain] view plain copy
     
    1. flier@debian:~$ cat /proc/sys/fs/file-max  
    2. 24372  
    3. flier@debian:~# sysctl -w fs.file-max=100000  
    4. 100000  
    Gearmand 本身也提供了调整句柄数量限制的功能,启动时则可以通过 -f或者–file-descriptors 参数指定,但非特权进程不能设置超过soft limit的数额。"The soft limit is the value that the kernel enforces for the corresponding resource. The hard limit acts as a ceiling for the soft limit: an unprivileged process may only set its soft limit to a value in the range from 0 up to the hard limit, and (irreversibly) lower its hard limit. A privileged process (under Linux: one with the
    CAP_SYS_RESOURCE capability) may make arbitrary changes to either limit value."


    2. Client

    对于发送单个job,python-gearman提供了一个简单的函数:submit_job,可以将job发送到server,其定义如下:
    GearmanClient.submit_job(task, data, unique=None, priority=None, background=False, wait_until_complete=True, max_retries=0,poll_timeout=None)
     
    下面来看看gearman的一个简单样例:
    [python] view plain copy
     
    1. import gearman  
    2. import time  
    3. from gearman.constants import JOB_UNKNOWN  
    4.   
    5. def check_request_status(job_request):  
    6.     """check the job status"""  
    7.     if job_request.complete:  
    8.         print 'Job %s finished! Result: %s - %s' % (job_request.job.unique, job_request.state, job_request.result)  
    9.     elif job_request.time_out:  
    10.         print 'Job %s timed out!' % job_request.unique  
    11.     elif job_request.state == JOB_UNKNOWN:  
    12.         print "Job %s connection failed!" % job_request.unique  
    13.   
    14. gm_client = gearman.GearmanClient(['localhost:4730','localhost:4731'])  
    15.   
    16. complete_job_request = gm_client.submit_job("reverse", "Hello World!")  
    17. check_request_status(complete_job_request)  
    gm_client连接到本地的4730/4731端口的server上,然后用submit_job函数将”reverse“和参数“Hello World!"传给server,返回一个request,最后用check_request_status()函数检查这个request的状态。是不是很简单?
     

    2.1  task与job

    task与job是有区别的区别主要在于:
    1. Task是一组job,在下发后会执行并返回结果给调用方
    2. Task内的子任务悔下发给多个work并执行
    3. client下放给server的任务为job,而整个下方并返回结果的过程为task,每个job会在一个work上执行
    4. task是一个动态的概念,而job是一个静态的概念。这有点类似“进程”和“程序”概念的区别。既然是动态的概念,就有完成(complete)、超时(time_out)、携带的job不识别(JOB_UNKNOWN)等状态

    2.2 job优先级(priority)

    client在发送job的时候,可以设定job的优先级,只需要在submit_job函数中添加选项“priority=gearman.PRIORITY_HIGH”即可创建高优先级task,priority可以有三个选项:PRIORITY_HIGH、PRIORITY_LOW、PRIORITY_NONE(default)
     

    2.3 同步与异步(background)

    默认情况下,client以同步方式发送job到server,所谓的同步,即client在向server发送完job后,不停地询问该(组)job执行的情况,直到server返回结果。而异步方式则是client在得知task创建完成之后,不管该task的执行结果。要使client采用异步方式,则在submit_job加入参数“background=True”即可。下面展示了gearman同步/异步的方式时的时序图。
    由上面的同步时序图可知,client端在job执行的整个过程中,与job server端的链接都是保持着的,这也给job完成后job server返回执行结果给client提供了通路。同时,在job执行过程当中,client端还可以发起job status的查询。当然,这需要worker端的支持的。
    由上面的异步时序图可知,client提交完job,job server成功接收后返回JOB_CREATED响应之后,client就断开与job server之间的链接了。后续无论发生什么事情,client都是不关心的。同样,job的执行结果client端也没办法通过Gearman消息框架 获得。
     

    2.4 阻塞与非阻塞(wait_until_complete)

    client创建task时,默认情况下使用的是阻塞模式,所谓的阻塞模式在进程上的表现为:在执行完submit_job后,卡在此处等待server返回结果。而非阻塞模式则是一旦job被server接收,程序可以继续向下执行,我们可以在后面适当的位置(程序最后或者需要用到返回结果的地方)来检查并取回这些task的状态和结果。要使用非阻塞模式,则在submit_job里加入选项“wait_until_complete=False”即可。
     

    2.5 送多个job

    • GearmanClient.submit_multiple_jobs(jobs_to_submit, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None)
    Takes a list of jobs_to_submit with dicts of{‘task’: task, ‘data’: data, ‘unique’: unique, ‘priority’: priority}
    这里jobs_to_submit是一组job,每个job是上述格式的字典,这里解释一下unique,unique是设置task的unique key,即在小结2.1中的job_request.job.unique的值,如果不设置的话,会自动分配。
    • GearmanClient.wait_until_jobs_accepted(job_requests, poll_timeout=None)
    Go into a select loop until all our jobs have moved to STATE_PENDING
    • GearmanClient.wait_until_jobs_completed(job_requests, poll_timeout=None)
    Go into a select loop until all our jobs have completed or failed
    • GearmanClient.submit_multiple_requests(job_requests, wait_until_complete=True, poll_timeout=None)
    Take Gearman JobRequests, assign them connections, and request that they be done.
    • GearmanClient.wait_until_jobs_accepted(job_requests, poll_timeout=None)
    Go into a select loop until all our jobs have moved to STATE_PENDING
    • GearmanClient.wait_until_jobs_completed(job_requests, poll_timeout=None)
    Go into a select loop until all our jobs have completed or failed

    下面是官网给的一个同步非阻塞方式发送多个job的例子,在该例子的最后,在取得server返回结果之前,用了wait_until_jobs_completed函数来等待task中的所有job返回结果:

    [python] view plain copy
     
    1. import time  
    2. gm_client = gearman.GearmanClient(['localhost:4730'])  
    3.   
    4. list_of_jobs = [dict(task="task_name", data="binary data"), dict(task="other_task", data="other binary data")]  
    5. submitted_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False, wait_until_complete=False)  
    6.   
    7. # Once we know our jobs are accepted, we can do other stuff and wait for results later in the function  
    8. # Similar to multithreading and doing a join except this is all done in a single process  
    9. time.sleep(1.0)  
    10.   
    11. # Wait at most 5 seconds before timing out incomplete requests  
    12. completed_requests = gm_client.wait_until_jobs_completed(submitted_requests, poll_timeout=5.0)  
    13. for completed_job_request in completed_requests:  
    14.     check_request_status(completed_job_request)  
    下面这个例子中,用到了submit_multiple_requests函数对超时的请求再次检查。
    [python] view plain copy
     
    1. import time  
    2. gm_client = gearman.GearmanClient(['localhost:4730'])  
    3.   
    4. list_of_jobs = [dict(task="task_name", data="task binary string"), dict(task="other_task", data="other binary string")]  
    5. failed_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False)  
    6.   
    7. # Let's pretend our assigned requests' Gearman servers all failed  
    8. assert all(request.state == JOB_UNKNOWN for request in failed_requests), "All connections didn't fail!"  
    9.   
    10. # Let's pretend our assigned requests' don't fail but some simply timeout  
    11. retried_connection_failed_requests = gm_client.submit_multiple_requests(failed_requests, wait_until_complete=True, poll_timeout=1.0)  
    12.   
    13. timed_out_requests = [job_request for job_request in retried_requests if job_request.timed_out]  
    14.   
    15. # For our timed out requests, lets wait a little longer until they're complete  
    16. retried_timed_out_requests = gm_client.submit_multiple_requests(timed_out_requests, wait_until_complete=True, poll_timeout=4.0)  

    2.6 序列化

    默认情况下,gearman的client只能传输的data只能是字符串格式的,因此,要传输python数据结构,必须使用序列化方法。所幸的是,GearmanClient提供了data_encoder,允许定义序列化和反序列化方法,例如:
    [python] view plain copy
     
    1. import pickle  
    2.   
    3. class PickleDataEncoder(gearman.DataEncoder):  
    4.     @classmethod  
    5.     def encode(cls, encodable_object):  
    6.         return pickle.dumps(encodable_object)  
    7.  
    8.     @classmethod  
    9.     def decode(cls, decodable_string):  
    10.         return pickle.loads(decodable_string)  
    11.   
    12. class PickleExampleClient(gearman.GearmanClient):  
    13.     data_encoder = PickleDataEncoder  
    14.   
    15. my_python_object = {'hello': 'there'}  
    16.   
    17. gm_client = PickleExampleClient(['localhost:4730'])  
    18. gm_client.submit_job("task_name", my_python_object)  


    3 worker

    3.1 主要API

    worker端同样提供了丰富的API,主要有:
    • GearmanWorker.set_client_id(client_id):设置自身ID
    • GearmanWorker.register_task(task, callback_function):为task注册处理函数callback_function,其中callback_function的定义格式为:
      [python] view plain copy
       
      1. def function_callback(calling_gearman_worker, current_job):  
      2.     return current_job.data  
    • GearmanWorker.unregister_task(task):注销worker上定义的函数
    • GearmanWorker.work(poll_timeout=60.0): 无限次循环, 完成发送过来的job.
    • GearmanWorker.send_job_data(current_job, data, poll_timeout=None): Send a Gearman JOB_DATA update for an inflight job
    • GearmanWorker.send_job_status(current_job, numerator, denominator, poll_timeout=None):Send a Gearman JOB_STATUS update for an inflight job
    • GearmanWorker.send_job_warning(current_job, data, poll_timeout=None):Send a Gearman JOB_WARNING update for an inflight job
     

    3.2 简单示例

    而worker端其实和client端差不多,也是要连接到server端,不同的是,worker端需要绑定函数来处理具体的job:
    [python] view plain copy
     
    1. import gearman  
    2.   
    3. gm_worker = gearman.GearmanWorker(['localhost:4730'])  
    4.   
    5. def task_listener_reverse(gearman_worker, gearman_job):  
    6.     print 'Reversing string:' + gearman_job.data  
    7.     return gearman_job.data[::-1]  
    8.   
    9. gm_worker.set_client_id("worker_revers")  
    10. gm_worker.register_task("reverse", task_listener_reverse)  
    11.   
    12. gm_worker.work()  
    可以看到,在worker同样要连接到本地4730端口的server,给了一个job处理函数,反序job传来的数据并返回,register_task函数将名为”reverse“的job和task_listener_reverse函数注册在一起,说明该函数用来处理名为”reverse”的job的,最后调用work函数来工作。来,我们看看效果吧,首先启用client.py文件,此时因为worker还没启动,client在此阻塞住,等待task处理。然后运行worker程序,可以看到client和worker的输出:
     



    3.2 返回结果

    worker提供了3个API可以在worker函数中发送job的数据、状态和警告:
    • GearmanWorker.send_job_data(current_job, data, poll_timeout=None): Send a Gearman JOB_DATA update for an inflight job
    • GearmanWorker.send_job_status(current_job, numerator, denominator, poll_timeout=None): Send a Gearman JOB_STATUS update for an inflight job
    • GearmanWorker.send_job_warning(current_job, data, poll_timeout=None): Send a Gearman JOB_WARNING update for an inflight job
    下面是来自官网的例子:
    [python] view plain copy
     
    1. gm_worker = gearman.GearmanWorker(['localhost:4730'])  
    2.   
    3. # See gearman/job.py to see attributes on the GearmanJob  
    4. # Send back a reversed version of the 'data' string through WORK_DATA instead of WORK_COMPLETE  
    5. def task_listener_reverse_inflight(gearman_worker, gearman_job):  
    6.     reversed_data = reversed(gearman_job.data)  
    7.     total_chars = len(reversed_data)  
    8.   
    9.     for idx, character in enumerate(reversed_data):  
    10.         gearman_worker.send_job_data(gearman_job, str(character))  
    11.         gearman_worker.send_job_status(gearman_job, idx + 1, total_chars)  
    12.   
    13.     return None  
    14.   
    15. # gm_worker.set_client_id is optional  
    16. gm_worker.register_task('reverse', task_listener_reverse_inflight)  
    17.   
    18. # Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity  
    19. gm_worker.work()  
     

    3.3 数据序列化

    同client一样,worker端也只能发送字符类型的数据,如果想要发送python里的结构体,必须用序列化将其转化成字符串。与client一样,worker也有一个encoder,你同样可以在里面定义序列化和反序列化的方法,不过值得注意的是,worker端的序列化和反序列化方法必须对应,否则client和worker端的发送的数据都不能被彼此争取的反序列化。下面演示了使用JSON方法来进行序列化:
    [python] view plain copy
     
    1. import json # Or similarly styled library  
    2. class JSONDataEncoder(gearman.DataEncoder):  
    3.     @classmethod  
    4.     def encode(cls, encodable_object):  
    5.         return json.dumps(encodable_object)  
    6.  
    7.     @classmethod  
    8.     def decode(cls, decodable_string):  
    9.         return json.loads(decodable_string)  
    10.   
    11. class DBRollbackJSONWorker(gearman.GearmanWorker):  
    12.     data_encoder = JSONDataEncoder  
    13.   
    14.     def after_poll(self, any_activity):  
    15.         # After every select loop, let's rollback our DB connections just to be safe  
    16.         continue_working = True  
    17.         # self.db_connections.rollback()  
    18.         return continue_working  

    worker端提供了rollback函数,每次轮询完查看socket是否活跃或者timeout时就会调用这个函数:
    GearmanWorker.after_poll(any_activity)

    Polling callback to notify any outside listeners whats going on with the GearmanWorker.

    Return True to continue polling, False to exit the work loop


    4 admin_client

    前面讲了Client和Worker,对于server也提供了一些API,可以对其进行监控和设置,比如:设置queue大小、关闭连接、查看状态、查看worker等,用于操作的对象时GearmanAdminClient,其定义如下:

    class gearman.admin_client.GearmanAdminClient(host_list=None,poll_timeout=10.0)
    所提供的API有:
    • GearmanAdminClient.send_maxqueue(task, max_size): Sends a request to change the maximum queue size for a given task
    • GearmanAdminClient.send_shutdown(graceful=True): Sends a request to shutdown the connected gearman server
    • GearmanAdminClient.get_status():Retrieves a list of all registered tasks and reports how many items/workers are in the queue
    • GearmanAdminClient.get_version(): Retrieves the version number of the Gearman server
    • GearmanAdminClient.get_workers():Retrieves a list of workers and reports what tasks they’re operating on
    • GearmanAdminClient.ping_server(): Sends off a debugging string to execute an application ping on the Gearman server, return the response time
    [python] view plain copy
     
    1. gm_admin_client = gearman.GearmanAdminClient(['localhost:4730'])  
    2.   
    3. # Inspect server state  
    4. status_response = gm_admin_client.get_status()  
    5. version_response = gm_admin_client.get_version()  
    6. workers_response = gm_admin_client.get_workers()  
    7. response_time = gm_admin_client.ping_server()  


    5. job对象

    5.1 GearmanJob

    GearmanJob对象提供了发送到server的job的最基本信息,其定义如下:
    class gearman.job.GearmanJob(connection, handle, task, unique, data)

     server信息

    当我们得到一个job对象后,想知道与之相连的server信息时,就可以调用如下两个属性:
    • GearmanJob.connection: GearmanConnection - Server assignment. Could be None prior to client job submission
    • GearmanJob.handle:string - Job’s server handle. Handles are NOT interchangeable across different gearman servers
     
     job参数
    • GearmanJob.task:string - Job’s task
    • GearmanJob.unique:string - Job’s unique identifier (client assigned)
    • GearmanJob.data:binary - Job’s binary payload
     

    5.2  GearmanJobRequest

    GearmanJobRequest是job请求的状态跟踪器,代表一个job请求,可用于GearmanClient中,其定义如下:
    class gearman.job.GearmanJobRequest(gearman_jobinitial_priority=Nonebackground=False,max_attempts=1) 

    跟踪job发送

    • GearmanJobRequest.gearman_job:             GearmanJob - Job that is being tracked by this GearmanJobRequest object
    • GearmanJobRequest.priority:                PRIORITY_NONE [default]、PRIORITY_LOW、PRIORITY_HIGH
    • GearmanJobRequest.background:              boolean - Is this job backgrounded?
    • GearmanJobRequest.connection_attempts:     integer - Number of attempted connection attempts
    • GearmanJobRequest.max_connection_attempts: integer - Maximum number of attempted connection attempts before raising an exception
     

    跟踪job执行过程

    • GearmanJobRequest.result:    binary - Job’s returned binary payload - Populated if and only if JOB_COMPLETE
    • GearmanJobRequest.exception:    binary - Job’s exception binary payload
    • GearmanJobRequest.state:     
    • GearmanJobRequest.timed_out:    boolean - Did the client hit its polling_timeout prior to a job finishing?
    • GearmanJobRequest.complete:     boolean - Does the client need to continue to poll for more updates from this job?
    其中GearmanJobRequest.state的返回值可以有:
    • JOB_UNKNOWN - Request state is currently unknown, either unsubmitted or connection failed
    • JOB_PENDING - Request has been submitted, pending handle
    • JOB_CREATED - Request has been accepted
    • JOB_FAILED - Request received an explicit job failure (job done but errored out)
    • JOB_COMPLETE - Request received an explicit job completion (job done with results)
     

    跟踪运行中的job状态

    某些特定的GearmanJob在实际完成之前就可能发回数据。GearmanClient用一些队列来保存跟踪这些发回数据的时间和内容等
    • GearmanJobRequest.warning_updates: collections.deque - Job’s warning binary payloads
    • GearmanJobRequest.data_updates:        collections.deque - Job’s data binary payloads
    • GearmanJobRequest.status:                       dictionary - Job’s status
    其中,GearmanJobRequest.status返回job的状态是一个字典,内容有:
    • handle - string - Job handle
    • known - boolean - Is the server aware of this request?
    • running - boolean - Is the request currently being processed by a worker?
    • numerator - integer
    • denominator - integer
    • time_received - integer - Time last updated
     
     
  • 相关阅读:
    MFC绘图基础
    MFC绘图基础
    MFC坐标问题
    利用Graphziv帮助理解复杂的类层次关系
    priority_quenue
    1060. Are They Equal (25)
    1057. Stack (30)
    1056. Mice and Rice (25)
    1053. Path of Equal Weight (30)
    1051. Pop Sequence (25)
  • 原文地址:https://www.cnblogs.com/sunsky303/p/8779507.html
Copyright © 2011-2022 走看看