zoukankan      html  css  js  c++  java
  • 进程管理工具Supervisor(二)Events

    supervisor可以当做一个简单的进程启动、重启、控制工具使用,也可以作为一个进程监控框架使用,作为后者,需要使用supervisor的Events机制。

    Event Listeners

    supervisor对子程序的监控通过叫做event listener的程序实现。supervisor控制的子程序状态发生变化时,就会产生一些事件通知,event listener可以对这些事件通知进行订阅。

    event listener本身也是作为supervisor的子程序运行的。事件通知协议的实现基于event listener子程序的stdin和stdout。supervisor发送特定格式的信息到event listener的stdin,然后从event listener的stdout获得特定格式的输出,从而形成一个请求/应答循环。

    配置

    event listener的配置放置于配置文件中的[eventlistener:x]块中。

    [eventlistener:mylistener]
    command=my_custom_listener.py
    events=PROCESS_STATE,TICK_60
    

    x是listener的名称,command是执行listener脚本的命令,events是要监控的事件类型。

    event listener本身是作为supervisor的子程序运行的,所以与配置子程序[program:x]块类似,官网例子:

    [eventlistener:theeventlistenername]
    command=/bin/eventlistener
    process_name=%(program_name)s_%(process_num)02d
    numprocs=5
    events=PROCESS_STATE
    buffer_size=10
    directory=/tmp
    umask=022
    priority=-1
    autostart=true
    autorestart=unexpected
    startsecs=1
    startretries=3
    exitcodes=0,2
    stopsignal=QUIT
    stopwaitsecs=10
    stopasgroup=false
    killasgroup=false
    user=chrism
    redirect_stderr=false
    stdout_logfile=/a/path
    stdout_logfile_maxbytes=1MB
    stdout_logfile_backups=10
    stdout_events_enabled=false
    stderr_logfile=/a/path
    stderr_logfile_maxbytes=1MB
    stderr_logfile_backups=10
    stderr_events_enabled=false
    environment=A="1",B="2"
    serverurl=AUTO
    

    事件通知协议

    一个event listener可以处于三种状态,ACKNOWLEDGED、READY、BUSY,只有在READY状态下才可以接收事件通知。

    event listener启动时处于ACKNOWLEDGED状态,直到event listener向stdout中输出“READY ”字符串为止。

    event listener向stdout中输出“READY ”之后就处于READY状态,supervisor会向处于READY状态的listener发送listener订阅的事件通知。

    listener接收事件通知之后就处于BUSY状态,期间listener对接收到的事件通知进行处理,处理结束后向stdout输出RESULT 2 OK或者RESULT 4 FAIL”,前者代表处理成功,后者代表处理失败。

    supervisor收到OK或者FAIL输出后,就将event listener的状态置于ACKNOWLEDGED。FAIL的事件通知会被缓存然后再次发送。

    event listener的状态处于ACKNOWLEDGED后可以退出执行,也可以继续执行,继续执行就可以向stdout输出“READY ”形成一个循环。

    supervisor向listener发送的事件通知由两部分组成,header和body,由" "换行符分开。

    一个header例子:

    ver:3.0 server:supervisor serial:21 pool:listener poolserial:10 eventname:PROCESS_COMMUNICATION_STDOUT len:54
    

    ver:协议版本

    server:supervisor的标识符,由[supervisord]块中的identifier选项设置。

    serial:event的序列号

    pool:listener的pool的名字。

    poolserial:event在pool中的的序列号

    eventname:event类型名称

    len:header后面的body长度。

    一个body例子:

    processname:foo groupname:bar pid:123
    This is the data that was sent between the tags

    processname:事件所属的子进程名字

    groupname:子进程所属组名

    pid:子进程pid

    一个简单的listener脚本,listener.py:

    import sys
    
    def write_stdout(s):
        # only eventlistener protocol messages may be sent to stdout
        sys.stdout.write(s)
        sys.stdout.flush()
    
    def write_stderr(s):
        sys.stderr.write(s)
        sys.stderr.flush()
    
    def main():
        while True:
            # 进入READY状态
        ┆   write_stdout('READY
    ')
        ┆   # 读取事件通知的header
        ┆   line = sys.stdin.readline()
        ┆   write_stderr(line)
            # 获取body长度,读取body
        ┆   headers=dict([x.split(':') for x in line.split() ])
        ┆   data = sys.stdin.read(int(headers['len']))
        ┆   write_stderr(data+'
    ')
            # 发送OK进入ACKNOWLEDGED状态
        ┆   write_stdout('RESULT 2
    OK')
    if __name__ == '__main__':
        main()
    

    在conf.d目录中建立一个listener配置文件mylistener.conf:

    [eventlistener:mylistener]
    command=python listener.py
    directory=/thedirectoroflistener.py
    user=user
    events=PROCESS_STATE,TICK_5
    stdout_logfile=/path/to/mylistener_stdout.log
    stderr_logfile=/path/to/mylistener_stderr.log
    

    启动:

    ubuntu:$ sudo supervisorctl start all
    mylistener: started
    celerybeat: started
    ubuntu:$ sudo supervisorctl status
    celerybeat                       RUNNING   pid 87729, uptime 0:00:20
    mylistener                       RUNNING   pid 87728, uptime 0:00:20
    

    监控就开始了,可以到日志中查看事件通知的内容:

    ver:3.0 server:supervisor serial:15361 pool:mylistener poolserial:15361 eventname:PROCESS_STATE_RUNNING len:73
    processname:mylistener groupname:mylistener from_state:STARTING pid:87728
    ver:3.0 server:supervisor serial:15362 pool:mylistener poolserial:15362 eventname:TICK_5 len:15
    when:1514313560
    ver:3.0 server:supervisor serial:15364 pool:mylistener poolserial:15364 eventname:PROCESS_STATE_RUNNING len:73
    processname:celerybeat groupname:celerybeat from_state:STARTING pid:87729
    

    可以根据自己的需要设定监控的事件类型,然后根据不同的事件类型和内容做出不同的应变,具体的事件类型可以官网查看。

    python的supervisor.childutils模块对header和body的处理进行了包装:

    def get_headers(line):
        return dict([ x.split(':') for x in line.split() ])
    
    def eventdata(payload):
        headerinfo, data = payload.split('
    ', 1)
        headers = get_headers(headerinfo)
        return headers, data
    
    def get_asctime(now=None):
        if now is None: # for testing
            now = time.time() # pragma: no cover
        msecs = (now - long(now)) * 1000
        part1 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now))
        asctime = '%s,%03d' % (part1, msecs)
        return asctime
    
    class ProcessCommunicationsProtocol:
        def send(self, msg, fp=sys.stdout):
            fp.write(ProcessCommunicationEvent.BEGIN_TOKEN)
            fp.write(msg)
            fp.write(ProcessCommunicationEvent.END_TOKEN)
    
        def stdout(self, msg):
            return self.send(msg, sys.stdout)
    
        def stderr(self, msg):
            return self.send(msg, sys.stderr)
    
    pcomm = ProcessCommunicationsProtocol()
    
    class EventListenerProtocol:
        def wait(self, stdin=sys.stdin, stdout=sys.stdout):
            self.ready(stdout)
            line = stdin.readline()
            headers = get_headers(line)
            payload = stdin.read(int(headers['len']))
            return headers, payload
    
        def ready(self, stdout=sys.stdout):
            stdout.write(PEventListenerDispatcher.READY_FOR_EVENTS_TOKEN)
            stdout.flush()
    
        def ok(self, stdout=sys.stdout):
            self.send('OK', stdout)
    
        def fail(self, stdout=sys.stdout):
            self.send('FAIL', stdout)
    
        def send(self, data, stdout=sys.stdout):
            resultlen = len(data)
            result = '%s%s
    %s' % (PEventListenerDispatcher.RESULT_TOKEN_START,
                                   str(resultlen),
                                   data)
            stdout.write(result)
            stdout.flush()
    
    listener = EventListenerProtocol()
    

    listener脚本可以方便的写做:

    import sys
    
    from supervisor import childutils
    
    def write_stdout(s):
        # only eventlistener protocol messages may be sent to stdout
        sys.stdout.write(s)
        sys.stdout.flush()
    
    def write_stderr(s):
        sys.stderr.write(s)
        sys.stderr.flush()
    
    def main():
        while True:
        ┆   headers, payload = childutils.listener.wait()
        ┆   write_stderr(payload+'
    ')
        ┆   childutils.listener.ok(sys.stdout)
    
    if __name__ == '__main__':
        main()
    

      

  • 相关阅读:
    is as运算符
    继承,多态
    封装等
    面向对象
    在JDBC中使用带参数的SQL语句
    我的程序库:HiCSDB
    我的程序库:HiCSUtil
    Java中,将ResultSet映射为对象和队列及其他辅助函数
    Java版的对象关系映射实现
    Java中的基本数据类型转换
  • 原文地址:https://www.cnblogs.com/linxiyue/p/8121982.html
Copyright © 2011-2022 走看看