我有个简单的应用需求:
1. 该应用随时会监听键盘的输入;
2. 当输入指定键时会控制相机录制的启动和关闭。
监听键盘是一个事件循环,相机录制也是一个循环录制的过程。我试着用 Python 启动两个进程,并用两个进程共享变量的更新来控制两个进程的交互。
监听键盘输入
首先我找到python 监听键盘输入的方案可以满足我监听键盘的需求。
1 import sys, select, tty, termios 2 3 old_attr = termios.tcgetattr(sys.stdin) 4 tty.setcbreak(sys.stdin.fileno()) 5 print('Please input keys, press Ctrl + C to quit') 6 7 while(1): 8 if select.select([sys.stdin], [], [], 0)[0] == [sys.stdin]: 9 print(sys.stdin.read(1)) 10 11 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)
这段代码可以进行键盘监听,但是却有一个潜在 Bug:第 11 行的代码永远无法执行,即使加上跳出循环的判断,当使用 Ctrl+C 时程序会终止。
而第 11 行的作用是恢复当前终端的原始属性,没执行到该语句,终端会变得不正常:你会发现程序结束后在该终端输入字符不会有显示。因此,程序应该设法程序终止时必然执行该语句。
我建议改为类似的代码来修复这个 Bug:
1 import sys 2 import select 3 import tty 4 import termios 5 6 old_attr = termios.tcgetattr(sys.stdin) 7 tty.setcbreak(sys.stdin.fileno()) 8 print('Please input keys, press Ctrl + C to quit') 9 10 try: 11 while True: 12 if select.select([sys.stdin], [], [], 0)[0] == [sys.stdin]: 13 key = sys.stdin.read(1) 14 if key == 'q': 15 # to stop the infinite loop 16 break 17 elif key == 's': 18 # do something 19 print('I will do something') 20 else: 21 # do other things 22 print(key) 23 24 except Exception as e: 25 print(e) 26 finally: 27 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)
相机录制
相机录制的路径当然因不同的相机而异,因此这里我只写 Demo 级别的抽象代码:
1 import time 2 3 4 class Recorder: 5 def __init__(self): 6 print('Recorder initialization.') 7 8 def run(self): 9 time.sleep(0.3) 10 print('Recording a frame.') 11 12 def close(self): 13 print('Recording ended.') 14 15 16 recorder = Recorder() 17 18 t0 = time.time() 19 while time.time() - t0 < 10: 20 recorder.run() 21 22 recorder.close()
可以看到这段逻辑也有一个条件循环。
假设应用的需求是按键 s,进行录制;再次按键 s,终止录制。
由于这两个逻辑需要完全同时进行,而 Python 的线程又是伪并行,因此这里考虑用多进程共享变量的通信。
使用进程间共享变量
使用 multiprocessing,直接看完整的代码是比较直观的:
1 import time 2 import sys 3 import select 4 import tty 5 import termios 6 import multiprocessing 7 8 9 class Recorder: 10 def __init__(self): 11 print('Recorder initialization.') 12 13 def run(self): 14 time.sleep(0.3) 15 print('Recording a frame.') 16 17 def close(self): 18 print('Recording ended.') 19 20 21 def recording_process(shared): 22 recorder = Recorder() 23 while shared['is_recording']: 24 recorder.run() 25 26 recorder.close() 27 28 29 if __name__ == '__main__': 30 # set shared variable 31 manager = multiprocessing.Manager() 32 shared = manager.dict() 33 shared['is_recording'] = False 34 35 old_attr = termios.tcgetattr(sys.stdin) 36 tty.setcbreak(sys.stdin.fileno()) 37 print('Please input keys, press Ctrl + C to quit') 38 39 # check the keyborad 40 p = None 41 try: 42 while True: 43 if select.select([sys.stdin], [], [], 0)[0] == [sys.stdin]: 44 key = sys.stdin.read(1) 45 if key == 'q': 46 # to stop the infinite loop 47 break 48 elif key == 's': 49 # toggle 50 shared['is_recording'] = not shared['is_recording'] 51 # resume or stop recording 52 if shared['is_recording']: 53 p = multiprocessing.Process(target=recording_process, args=(shared,)) 54 p.start() 55 else: 56 p and p.join() 57 else: 58 # do other things 59 print(key) 60 61 except Exception as e: 62 print(e) 63 finally: 64 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)
原文作者:雨先生
原文链接:https://www.cnblogs.com/noluye/p/11704982.html
许可协议:知识共享署名-非商业性使用 4.0 国际许可协议