zoukankan      html  css  js  c++  java
  • python-gearman使用

    yum -y install gearmand
    chkconfig gearmand on && /etc/init.d/gearmand start
    
    # /etc/sysconfig/gearmand
    指定 OPTIONS=""里的内容,添加持久性等功能
    

    example

    worker.py

    #!/usr/bin/env python
    # -*- encoding: utf-8; py-indent-offset: 4 -*-
    
    import gearman
    
    gm_worker = gearman.GearmanWorker(['localhost:4730'])
    
    def task_listener_reverse(gearman_worker, gearman_job):
        print 'Reversing string: ' + gearman_job.data
        return gearman_job.data[::-1]
    
    # gm_worker.set_client_id is optional
    gm_worker.set_client_id('python-worker')
    gm_worker.register_task('reverse', task_listener_reverse)
    
    # Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity
    gm_worker.work()
    

    client.py

    #!/usr/bin/env python
    # -*- encoding: utf-8; py-indent-offset: 4 -*-
    
    import gearman
    
    def check_request_status(job_request):
        if job_request.complete:
            print "Job %s finished!  Result: %s - %s" % (job_request.job.unique, job_request.state, job_request.result)
        elif job_request.timed_out:
            print "Job %s timed out!" % job_request.unique
        elif job_request.state == JOB_UNKNOWN:
            print "Job %s connection failed!" % job_request.unique
    
    gm_client = gearman.GearmanClient(['localhost:4730'])
    
    completed_job_request = gm_client.submit_job("reverse", "Hello World!")
    check_request_status(completed_job_request)
    

    results

    python client.py 
    # Job b5175e15d8e09ba6470cbedb4b6e2b1a finished!  Result: COMPLETE - !dlroW olleH
    
    python worker.py 
    # Reversing string: Hello World!
    

    API

    Gearman worker

    class gearman.worker.GearmanWorker(host_list=None)
    	GearmanWorker :: Interface to accept jobs from a Gearman server
    

    1 Job processing

    GearmanWorker.set_client_id(client_id)
    # Notify the server that we should be identified as this client ID
    
    GearmanWorker.register_task(task, callback_function)
    # Register a function with this worker
    
    def function_callback(calling_gearman_worker, current_job):
    	return current_job.data
    
    GearmanWorker.unregister_task(task)
    # Unregister a function with worker
    
    GearmanWorker.work(poll_timeout=60.0)
    # Loop indefinitely, complete tasks from all connections.
    
    # example
    gm_worker = gearman.GearmanWorker(['localhost:4730'])
    
    # See gearman/job.py to see attributes on the GearmanJob
    # Send back a reversed version of the 'data' string
    def task_listener_reverse(gearman_worker, gearman_job):
        return reversed(gearman_job.data)
    
    # gm_worker.set_client_id is optional
    gm_worker.set_client_id('your_worker_client_id_name')
    gm_worker.register_task('reverse', task_listener_reverse)
    
    # Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity
    gm_worker.work()
    

    2 Sending in-flight job updates

    GearmanWorker.send_job_data(current_job, data, poll_timeout=None)
    # Send a Gearman JOB_DATA update for an inflight job
    
    GearmanWorker.send_job_status(current_job, numerator, denominator, poll_timeout=None)
    # Send a Gearman JOB_STATUS update for an inflight job
    
    GearmanWorker.send_job_warning(current_job, data, poll_timeout=None)
    # Send a Gearman JOB_WARNING update for an inflight job
    
    # example
    Callback function sending back inflight job updates:
    
    gm_worker = gearman.GearmanWorker(['localhost:4730'])
    
    # See gearman/job.py to see attributes on the GearmanJob
    # Send back a reversed version of the 'data' string through WORK_DATA instead of WORK_COMPLETE
    def task_listener_reverse_inflight(gearman_worker, gearman_job):
        reversed_data = reversed(gearman_job.data)
        total_chars = len(reversed_data)
    
        for idx, character in enumerate(reversed_data):
            gearman_worker.send_job_data(gearman_job, str(character))
            gearman_worker.send_job_status(gearman_job, idx + 1, total_chars)
    
        return None
    
    # gm_worker.set_client_id is optional
    gm_worker.register_task('reverse', task_listener_reverse_inflight)
    
    # Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity
    gm_worker.work()
    

    3 Extending the worker

    GearmanWorker.data_encoder
    # Provide common object dumps for all communications over gearman
    
    GearmanWorker.after_poll(any_activity)
    # Polling callback to notify any outside listeners whats going on with the GearmanWorker.
    
    Return True to continue polling, False to exit the work loop
    
    # example
    Send/receive Python objects and do work between polls:
    
    # By default, GearmanWorker's can only send off byte-strings
    # If we want to be able to send out Python objects, we can specify a data encoder
    # This will automatically convert byte strings <-> Python objects for ALL commands that have the 'data' field
    #
    # See http://gearman.org/index.php?id=protocol for Worker commands that send/receive 'opaque data'
    #
    import json # Or similarly styled library
    class JSONDataEncoder(gearman.DataEncoder):
        @classmethod
        def encode(cls, encodable_object):
            return json.dumps(encodable_object)
    
        @classmethod
        def decode(cls, decodable_string):
            return json.loads(decodable_string)
    
    class DBRollbackJSONWorker(gearman.GearmanWorker):
        data_encoder = JSONDataEncoder
    
        def after_poll(self, any_activity):
            # After every select loop, let's rollback our DB connections just to be safe
            continue_working = True
            self.db_connections.rollback()
            return continue_working
    

    Gearman client

     def check_request_status(job_request):
        if job_request.complete:
            print "Job %s finished!  Result: %s - %s" % (job_request.job.unique, job_request.state, job_request.result)
        elif job_request.timed_out:
            print "Job %s timed out!" % job_request.unique
        elif job_request.state == JOB_UNKNOWN:
            print "Job %s connection failed!" % job_request.unique
    
    class gearman.client.GearmanClient(host_list=None, random_unique_bytes=16)
    # GearmanClient :: Interface to submit jobs to a Gearman server
    

    1 Submitting jobs

    GearmanClient.submit_job(task, data, unique=None, priority=None, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None)
    # Submit a single job to any gearman server
    
    # example
    Sending a simple job as a blocking call:
    gm_client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730'])
    
    # See gearman/job.py to see attributes on the GearmanJobRequest
    # Defaults to PRIORITY_NONE, background=False (synchronous task), wait_until_complete=True
    completed_job_request = gm_client.submit_job("task_name", "arbitrary binary data")
    check_request_status(completed_job_request)
    
    Sending a high priority, background, blocking call:
    gm_client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730'])
    
    # See gearman/job.py to see attributes on the GearmanJobRequest
    submitted_job_request = gm_client.submit_job("task_name", "arbitrary binary data", priority=gearman.PRIORITY_HIGH, background=True)
    
    check_request_status(submitted_job_request)
    
    GearmanClient.submit_multiple_jobs(jobs_to_submit, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None)
    Takes a list of jobs_to_submit with dicts of
    
    {‘task’: task, ‘data’: data, ‘unique’: unique, ‘priority’: priority}
    
    # example
    Sending multiple jobs all at once and behave like a non-blocking call (wait_until_complete=False):
    
    import time
    gm_client = gearman.GearmanClient(['localhost:4730'])
    
    list_of_jobs = [dict(task="task_name", data="binary data"), dict(task="other_task", data="other binary data")]
    submitted_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False, wait_until_complete=False)
    
    # Once we know our jobs are accepted, we can do other stuff and wait for results later in the function
    # Similar to multithreading and doing a join except this is all done in a single process
    time.sleep(1.0)
    
    # Wait at most 5 seconds before timing out incomplete requests
    completed_requests = gm_client.wait_until_jobs_completed(submitted_requests, poll_timeout=5.0)
    for completed_job_request in completed_requests:
        check_request_status(completed_job_request)
    
    GearmanClient.submit_multiple_requests(job_requests, wait_until_complete=True, poll_timeout=None)
    # Take GearmanJobRequests, assign them connections, and request that they be done.
    
    Blocks until our jobs are accepted (should be fast) OR times out
    Optionally blocks until jobs are all complete
    You MUST check the status of your requests after calling this function as “timed_out” or “state == JOB_UNKNOWN” maybe True
    
    # example
    Recovering from failed connections:
    
    import time
    gm_client = gearman.GearmanClient(['localhost:4730'])
    
    list_of_jobs = [dict(task="task_name", data="task binary string"), dict(task="other_task", data="other binary string")]
    failed_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False)
    
    # Let's pretend our assigned requests' Gearman servers all failed
    assert all(request.state == JOB_UNKNOWN for request in failed_requests), "All connections didn't fail!"
    
    # Let's pretend our assigned requests' don't fail but some simply timeout
    retried_connection_failed_requests = gm_client.submit_multiple_requests(failed_requests, wait_until_complete=True, poll_timeout=1.0)
    
    timed_out_requests = [job_request for job_request in retried_requests if job_request.timed_out]
    
    # For our timed out requests, lets wait a little longer until they're complete
    retried_timed_out_requests = gm_client.submit_multiple_requests(timed_out_requests, wait_until_complete=True, poll_timeout=4.0)
    
    GearmanClient.wait_until_jobs_accepted(job_requests, poll_timeout=None)
    # Go into a select loop until all our jobs have moved to STATE_PENDING
    
    GearmanClient.wait_until_jobs_completed(job_requests, poll_timeout=None)
    # Go into a select loop until all our jobs have completed or failed
    

    2 Retrieving job status

    GearmanClient.get_job_status(current_request, poll_timeout=None)
    # Fetch the job status of a single request
    
    GearmanClient.get_job_statuses(job_requests, poll_timeout=None)
    #Fetch the job status of a multiple requests
    

    3 Extending the client

    GearmanClient.data_encoder
    # Provide common object dumps for all communications over gearman
    
    # example
    Send/receive Python objects (not just byte strings):
    
    # By default, GearmanClient's can only send off byte-strings
    # If we want to be able to send out Python objects, we can specify a data encoder
    # This will automatically convert byte strings <-> Python objects for ALL commands that have the 'data' field
    #
    # See http://gearman.org/index.php?id=protocol for client commands that send/receive 'opaque data'
    import pickle
    
    class PickleDataEncoder(gearman.DataEncoder):
        @classmethod
        def encode(cls, encodable_object):
            return pickle.dumps(encodable_object)
    
        @classmethod
        def decode(cls, decodable_string):
            return pickle.loads(decodable_string)
    
    class PickleExampleClient(gearman.GearmanClient):
        data_encoder = PickleDataEncoder
    
    my_python_object = {'hello': 'there'}
    
    gm_client = PickleExampleClient(['localhost:4730'])
    gm_client.submit_job("task_name", my_python_object)
    

    Gearman Admin client

    class gearman.admin_client.GearmanAdminClient(host_list=None, poll_timeout=10.0)
    GearmanAdminClient :: Interface to send/receive administrative commands to a Gearman server
    
    This client acts as a BLOCKING client and each call will poll until it receives a satisfactory server response
    
    http://gearman.org/index.php?id=protocol See section ‘Administrative Protocol’
    

    1 Interacting with a server

    GearmanAdminClient.send_maxqueue(task, max_size)
    # Sends a request to change the maximum queue size for a given task
    
    GearmanAdminClient.send_shutdown(graceful=True)
    # Sends a request to shutdown the connected gearman server
    
    GearmanAdminClient.get_status()
    # Retrieves a list of all registered tasks and reports how many items/workers are in the queue
    
    GearmanAdminClient.get_version()
    # Retrieves the version number of the Gearman server
    
    GearmanAdminClient.get_workers()
    # Retrieves a list of workers and reports what tasks they’re operating on
    
    # example
    Checking server state:
    
    gm_admin_client = gearman.GearmanAdminClient(['localhost:4730'])
    
    # Inspect server state
    status_response = gm_admin_client.get_status()
    version_response = gm_admin_client.get_version()
    workers_response = gm_admin_client.get_workers()
    

    2 Testing server response times

    GearmanAdminClient.ping_server()
    # Sends off a debugging string to execute an application ping on the Gearman server
    
    # example
    Checking server response time:
    
    gm_admin_client = gearman.GearmanAdminClient(['localhost:4730'])
    response_time = gm_admin_client.ping_server()
    

    Gearman job definitions

    1 GearmanJob - Basic information about a requested job

    class gearman.job.GearmanJob(connection, handle, task, unique, data)
    # Represents the basics of a job... used in GearmanClient / GearmanWorker to represent job states
    

    1.1 Server identifers

    GearmanJob.connection
    # GearmanConnection - Server assignment. Could be None prior to client job submission
    
    GearmanJob.handle
    # string - Job’s server handle. Handles are NOT interchangeable across different gearman servers
    

    1.2 Job parameters

    GearmanJob.task
    # string - Job’s task
    
    GearmanJob.unique
    # string - Job’s unique identifier (client assigned)
    
    GearmanJob.data
    # binary - Job’s binary payload
    

    2 GearmanJobRequest - State tracker for requested jobs

    class gearman.job.GearmanJobRequest(gearman_job, initial_priority=None, background=False, max_attempts=1)
    # Represents a job request... used in GearmanClient to represent job states
    

    2.1 Tracking job submission

    GearmanJobRequest.gearman_job
    # GearmanJob - Job that is being tracked by this GearmanJobRequest object
    
    GearmanJobRequest.priority
    # PRIORITY_NONE [default]
    # PRIORITY_LOW
    # PRIORITY_HIGH
    
    GearmanJobRequest.background
    # boolean - Is this job backgrounded?
    
    GearmanJobRequest.connection_attempts
    # integer - Number of attempted connection attempts
    
    GearmanJobRequest.max_connection_attempts
    # integer - Maximum number of attempted connection attempts before raising an exception
    

    2.2 Tracking job progress

    GearmanJobRequest.result
    # binary - Job’s returned binary payload - Populated if and only if JOB_COMPLETE
    
    GearmanJobRequest.exception
    # binary - Job’s exception binary payload
    
    GearmanJobRequest.state
    # JOB_UNKNOWN - Request state is currently unknown, either unsubmitted or connection failed
    # JOB_PENDING - Request has been submitted, pending handle
    # JOB_CREATED - Request has been accepted
    # JOB_FAILED - Request received an explicit job failure (job done but errored out)
    # JOB_COMPLETE - Request received an explicit job completion (job done with results)
    
    GearmanJobRequest.timed_out
    # boolean - Did the client hit its polling_timeout prior to a job finishing?
    
    GearmanJobRequest.complete
    # boolean - Does the client need to continue to poll for more updates from this job?
    

    2.3 Tracking in-flight job updates

    Certain GearmanJob’s may send back data prior to actually completing. GearmanClient uses these queues to keep track of what/when we received certain updates.
    
    GearmanJobRequest.warning_updates
    # collections.deque - Job’s warning binary payloads
    
    GearmanJobRequest.data_updates
    # collections.deque - Job’s data binary payloads
    
    GearmanJobRequest.status¶
    # dictionary - Job’s status
    
    handle - string - Job handle
    # known - boolean - Is the server aware of this request?
    # running - boolean - Is the request currently being processed by a worker?
    # numerator - integer
    # denominator - integer
    # time_received - integer - Time last updated
    
    New in version 2.0.1: Replaces GearmanJobRequest.status_updates and GearmanJobRquest.server_status
    
    GearmanJobRequest.status_updates
    # Deprecated since version 2.0.1: Replaced by GearmanJobRequest.status
    
    GearmanJobRequest.server_status
    # Deprecated since version 2.0.1: Replaced by GearmanJobRequest.status
    
  • 相关阅读:
    Oracle里的执行计划
    Java线程锁总结
    Java面试总结-链接
    oracle 排序函数(转载)
    微软今天的Windows 10硬件发布会汇总:手机瞬间变PC
    DevOps 在公司项目中的实践落地
    阿里云云计算工程师ACP学习笔记--知识点总结
    中小型互联网公司微服务实践-经验和教训
    Prometheus监控的最佳实践——关于监控的3项关键指标
    深度学习的Attention模型
  • 原文地址:https://www.cnblogs.com/liujitao79/p/4549666.html
Copyright © 2011-2022 走看看