一、简介
守护进程最重要的特性是后台运行;它必须与其运行前的环境隔离开来,这些环境包括未关闭的文件描述符、控制终端、会话和进程组、工作目录以及文件创建掩码等;它可以在系统启动时从启动脚本/etc/rc.d中启动,可以由inetd守护进程启动,也可以有作业规划进程crond启动,还可以由用户终端(通常是shell)执行。
Python有时需要保证只运行一个脚本实例,以避免数据的冲突。
二、Python守护进程
1、函数实现
- #!/usr/bin/env python
- #coding: utf-8
- import sys, os
- '''将当前进程fork为一个守护进程
- 注意:如果你的守护进程是由inetd启动的,不要这样做!inetd完成了
- 所有需要做的事情,包括重定向标准文件描述符,需要做的事情只有chdir()和umask()了
- '''
- def daemonize (stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
- #重定向标准文件描述符(默认情况下定向到/dev/null)
- try:
- pid = os.fork()
- #父进程(会话组头领进程)退出,这意味着一个非会话组头领进程永远不能重新获得控制终端。
- if pid > 0:
- sys.exit(0) #父进程退出
- except OSError, e:
- sys.stderr.write ("fork #1 failed: (%d) %s " % (e.errno, e.strerror) )
- sys.exit(1)
- #从母体环境脱离
- os.chdir("/") #chdir确认进程不保持任何目录于使用状态,否则不能umount一个文件系统。也可以改变到对于守护程序运行重要的文件所在目录
- os.umask(0) #调用umask(0)以便拥有对于写的任何东西的完全控制,因为有时不知道继承了什么样的umask。
- os.setsid() #setsid调用成功后,进程成为新的会话组长和新的进程组长,并与原来的登录会话和进程组脱离。
- #执行第二次fork
- try:
- pid = os.fork()
- if pid > 0:
- sys.exit(0) #第二个父进程退出
- except OSError, e:
- sys.stderr.write ("fork #2 failed: (%d) %s " % (e.errno, e.strerror) )
- sys.exit(1)
- #进程已经是守护进程了,重定向标准文件描述符
- for f in sys.stdout, sys.stderr: f.flush()
- si = open(stdin, 'r')
- so = open(stdout, 'a+')
- se = open(stderr, 'a+', 0)
- os.dup2(si.fileno(), sys.stdin.fileno()) #dup2函数原子化关闭和复制文件描述符
- os.dup2(so.fileno(), sys.stdout.fileno())
- os.dup2(se.fileno(), sys.stderr.fileno())
- #示例函数:每秒打印一个数字和时间戳
- def main():
- import time
- sys.stdout.write('Daemon started with pid %d ' % os.getpid())
- sys.stdout.write('Daemon stdout output ')
- sys.stderr.write('Daemon stderr output ')
- c = 0
- while True:
- sys.stdout.write('%d: %s ' %(c, time.ctime()))
- sys.stdout.flush()
- c = c+1
- time.sleep(1)
- if __name__ == "__main__":
- daemonize('/dev/null','/tmp/daemon_stdout.log','/tmp/daemon_error.log')
- main()
可以通过命令ps -ef | grep daemon.py查看后台运行的继承,在/tmp/daemon_error.log会记录错误运行日志,在/tmp/daemon_stdout.log会记录标准输出日志。
2、类实现
- #!/usr/bin/env python
- #coding: utf-8
- #python模拟linux的守护进程
- import sys, os, time, atexit, string
- from signal import SIGTERM
- class Daemon:
- def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
- #需要获取调试信息,改为stdin='/dev/stdin', stdout='/dev/stdout', stderr='/dev/stderr',以root身份运行。
- self.stdin = stdin
- self.stdout = stdout
- self.stderr = stderr
- self.pidfile = pidfile
- def _daemonize(self):
- try:
- pid = os.fork() #第一次fork,生成子进程,脱离父进程
- if pid > 0:
- sys.exit(0) #退出主进程
- except OSError, e:
- sys.stderr.write('fork #1 failed: %d (%s) ' % (e.errno, e.strerror))
- sys.exit(1)
- os.chdir("/") #修改工作目录
- os.setsid() #设置新的会话连接
- os.umask(0) #重新设置文件创建权限
- try:
- pid = os.fork() #第二次fork,禁止进程打开终端
- if pid > 0:
- sys.exit(0)
- except OSError, e:
- sys.stderr.write('fork #2 failed: %d (%s) ' % (e.errno, e.strerror))
- sys.exit(1)
- #重定向文件描述符
- sys.stdout.flush()
- sys.stderr.flush()
- si = file(self.stdin, 'r')
- so = file(self.stdout, 'a+')
- se = file(self.stderr, 'a+', 0)
- os.dup2(si.fileno(), sys.stdin.fileno())
- os.dup2(so.fileno(), sys.stdout.fileno())
- os.dup2(se.fileno(), sys.stderr.fileno())
- #注册退出函数,根据文件pid判断是否存在进程
- atexit.register(self.delpid)
- pid = str(os.getpid())
- file(self.pidfile,'w+').write('%s ' % pid)
- def delpid(self):
- os.remove(self.pidfile)
- def start(self):
- #检查pid文件是否存在以探测是否存在进程
- try:
- pf = file(self.pidfile,'r')
- pid = int(pf.read().strip())
- pf.close()
- except IOError:
- pid = None
- if pid:
- message = 'pidfile %s already exist. Daemon already running! '
- sys.stderr.write(message % self.pidfile)
- sys.exit(1)
- #启动监控
- self._daemonize()
- self._run()
- def stop(self):
- #从pid文件中获取pid
- try:
- pf = file(self.pidfile,'r')
- pid = int(pf.read().strip())
- pf.close()
- except IOError:
- pid = None
- if not pid: #重启不报错
- message = 'pidfile %s does not exist. Daemon not running! '
- sys.stderr.write(message % self.pidfile)
- return
- #杀进程
- try:
- while 1:
- os.kill(pid, SIGTERM)
- time.sleep(0.1)
- #os.system('hadoop-daemon.sh stop datanode')
- #os.system('hadoop-daemon.sh stop tasktracker')
- #os.remove(self.pidfile)
- except OSError, err:
- err = str(err)
- if err.find('No such process') > 0:
- if os.path.exists(self.pidfile):
- os.remove(self.pidfile)
- else:
- print str(err)
- sys.exit(1)
- def restart(self):
- self.stop()
- self.start()
- def _run(self):
- """ run your fun"""
- while True:
- #fp=open('/tmp/result','a+')
- #fp.write('Hello World ')
- sys.stdout.write('%s:hello world ' % (time.ctime(),))
- sys.stdout.flush()
- time.sleep(2)
- if __name__ == '__main__':
- daemon = Daemon('/tmp/watch_process.pid', stdout = '/tmp/watch_stdout.log')
- if len(sys.argv) == 2:
- if 'start' == sys.argv[1]:
- daemon.start()
- elif 'stop' == sys.argv[1]:
- daemon.stop()
- elif 'restart' == sys.argv[1]:
- daemon.restart()
- else:
- print 'unknown command'
- sys.exit(2)
- sys.exit(0)
- else:
- print 'usage: %s start|stop|restart' % sys.argv[0]
- sys.exit(2)
运行结果:
可以参考:http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/,它是当Daemon设计成一个模板,在其他文件中from daemon import Daemon,然后定义子类,重写run()方法实现自己的功能。
- class MyDaemon(Daemon):
- def run(self):
- while True:
- fp=open('/tmp/run.log','a+')
- fp.write('Hello World ')
- time.sleep(1)
不足:信号处理signal.signal(signal.SIGTERM, cleanup_handler)暂时没有安装,注册程序退出时的回调函数delpid()没有被调用。
然后,再写个shell命令,加入开机启动服务,每隔2秒检测守护进程是否启动,若没有启动则启动,自动监控恢复程序。
- #/bin/sh
- while true
- do
- count=`ps -ef | grep "daemonclass.py" | grep -v "grep"`
- if [ "$?" != "0" ]; then
- daemonclass.py start
- fi
- sleep 2
- done
三、python保证只能运行一个脚本实例
1、打开文件本身加锁
- #!/usr/bin/env python
- #coding: utf-8
- import fcntl, sys, time, os
- pidfile = 0
- def ApplicationInstance():
- global pidfile
- pidfile = open(os.path.realpath(__file__), "r")
- try:
- fcntl.flock(pidfile, fcntl.LOCK_EX | fcntl.LOCK_NB) #创建一个排他锁,并且所被锁住其他进程不会阻塞
- except:
- print "another instance is running..."
- sys.exit(1)
- if __name__ == "__main__":
- ApplicationInstance()
- while True:
- print 'running...'
- time.sleep(1)
注意:open()参数不能使用w,否则会覆盖本身文件;pidfile必须声明为全局变量,否则局部变量生命周期结束,文件描述符会因引用计数为0被系统回收(若整个函数写在主函数中,则不需要定义成global)。
2、打开自定义文件并加锁
- #!/usr/bin/env python
- #coding: utf-8
- import fcntl, sys, time
- pidfile = 0
- def ApplicationInstance():
- global pidfile
- pidfile = open("instance.pid", "w")
- try:
- fcntl.lockf(pidfile, fcntl.LOCK_EX | fcntl.LOCK_NB) #创建一个排他锁,并且所被锁住其他进程不会阻塞
- except IOError:
- print "another instance is running..."
- sys.exit(0)
- if __name__ == "__main__":
- ApplicationInstance()
- while True:
- print 'running...'
- time.sleep(1)
3、检测文件中PID
- #!/usr/bin/env python
- #coding: utf-8
- import time, os, sys
- import signal
- pidfile = '/tmp/process.pid'
- def sig_handler(sig, frame):
- if os.path.exists(pidfile):
- os.remove(pidfile)
- sys.exit(0)
- def ApplicationInstance():
- signal.signal(signal.SIGTERM, sig_handler)
- signal.signal(signal.SIGINT, sig_handler)
- signal.signal(signal.SIGQUIT, sig_handler)
- try:
- pf = file(pidfile, 'r')
- pid = int(pf.read().strip())
- pf.close()
- except IOError:
- pid = None
- if pid:
- sys.stdout.write('instance is running... ')
- sys.exit(0)
- file(pidfile, 'w+').write('%s ' % os.getpid())
- if __name__ == "__main__":
- ApplicationInstance()
- while True:
- print 'running...'
- time.sleep(1)
4、检测特定文件夹或文件
- #!/usr/bin/env python
- #coding: utf-8
- import time, commands, signal, sys
- def sig_handler(sig, frame):
- if os.path.exists("/tmp/test"):
- os.rmdir("/tmp/test")
- sys.exit(0)
- def ApplicationInstance():
- signal.signal(signal.SIGTERM, sig_handler)
- signal.signal(signal.SIGINT, sig_handler)
- signal.signal(signal.SIGQUIT, sig_handler)
- if commands.getstatusoutput("mkdir /tmp/test")[0]:
- print "instance is running..."
- sys.exit(0)
- if __name__ == "__main__":
- ApplicationInstance()
- while True:
- print 'running...'
- time.sleep(1)
也可以检测某一个特定的文件,判断文件是否存在:
- import os
- import os.path
- import time
- #class used to handle one application instance mechanism
- class ApplicationInstance:
- #specify the file used to save the application instance pid
- def __init__( self, pid_file ):
- self.pid_file = pid_file
- self.check()
- self.startApplication()
- #check if the current application is already running
- def check( self ):
- #check if the pidfile exists
- if not os.path.isfile( self.pid_file ):
- return
- #read the pid from the file
- pid = 0
- try:
- file = open( self.pid_file, 'rt' )
- data = file.read()
- file.close()
- pid = int( data )
- except:
- pass
- #check if the process with specified by pid exists
- if 0 == pid:
- return
- try:
- os.kill( pid, 0 ) #this will raise an exception if the pid is not valid
- except:
- return
- #exit the application
- print "The application is already running..."
- exit(0) #exit raise an exception so don't put it in a try/except block
- #called when the single instance starts to save it's pid
- def startApplication( self ):
- file = open( self.pid_file, 'wt' )
- file.write( str( os.getpid() ) )
- file.close()
- #called when the single instance exit ( remove pid file )
- def exitApplication( self ):
- try:
- os.remove( self.pid_file )
- except:
- pass
- if __name__ == '__main__':
- #create application instance
- appInstance = ApplicationInstance( '/tmp/myapp.pid' )
- #do something here
- print "Start MyApp"
- time.sleep(5) #sleep 5 seconds
- print "End MyApp"
- #remove pid file
- appInstance.exitApplication()
上述os.kill( pid, 0 )用于检测一个为pid的进程是否还活着,若该pid的进程已经停止则抛出异常,若正在运行则不发送kill信号。
5、socket监听一个特定端口
- #!/usr/bin/env python
- #coding: utf-8
- import socket, time, sys
- def ApplicationInstance():
- try:
- global s
- s = socket.socket()
- host = socket.gethostname()
- s.bind((host, 60123))
- except:
- print "instance is running..."
- sys.exit(0)
- if __name__ == "__main__":
- ApplicationInstance()
- while True:
- print 'running...'
- time.sleep(1)
可以将该函数使用装饰器实现,便于重用(效果与上述相同):
- #!/usr/bin/env python
- #coding: utf-8
- import socket, time, sys
- import functools
- #使用装饰器实现
- def ApplicationInstance(func):
- @functools.wraps(func)
- def fun(*args,**kwargs):
- import socket
- try:
- global s
- s = socket.socket()
- host = socket.gethostname()
- s.bind((host, 60123))
- except:
- print('already has an instance...')
- return None
- return func(*args,**kwargs)
- return fun
- @ApplicationInstance
- def main():
- while True:
- print 'running...'
- time.sleep(1)
- if __name__ == "__main__":
- main()
四、总结
(1)守护进程和单脚本运行在实际应用中比较重要,方法也比较多,可选择合适的来进行修改,可以将它们做成一个单独的类或模板,然后子类化实现自定义。
(2)daemon监控进程自动恢复避免了nohup和&的使用,并配合shell脚本可以省去很多不定时启动挂掉服务器的麻烦。
(3)若有更好的设计和想法,可随时留言,在此先感谢!