zoukankan      html  css  js  c++  java
  • 后台程序处理(二) python threading

    由于协程没办法完成(一)中所说的任务模式

    接下来就尝试一下使用线程和队列来实现一下这个功能

    在实现之前,我们先明确一个问题——python的线程是伪并发的。同一时间只能有一个线程在运行。具体怎样的运作方式由解释器决定

    然后回顾一下上一章遇到的问题——return以后,需要另外一个线程去检测之前的操作是否执行成功

    因此程序的流程设计应该是这样的:

     1 # 大致流程步骤如下
     2 # 1.获取参数(接口被访问时触发)
     3 request_data = request.form
     4 # 2.根据参数查询内容
     5 target = Target.query.filter_by(id=request_data).first()
     6 # 3.将结果插入队列
     7 ans_queue.put(target)
     8 # 4.激活线程
     9 thread.set()
    10 # 5.将结果从队列中取出
    11 ans_queue.get()
    12 # 6.处理结果
    13 check()
    14 # 7.将线程休眠(阻塞)
    15 thread.event.clear()

    这样设计的考虑主要是以下几点:

    1.简单

    2.入队可以保证消息按时间顺序被处理

    3.出队可以保证当队列不为空时,检查线程会执行到队列为空为止。免去不必要的唤醒检查。然后在有消息入队时被重新激活

    4.其实我们的设计正常来说不会出现3中的检查情况。基本上队列一旦有消息入队,线程就会启动并清空队列

    5.入队可以保证消息的完整和独立性,每次请求得到的数据入队后,队列中都是一列数组。处理逻辑更清晰

    6.队列中的数据不出栈是不可见的

    7.我就是宁愿用全局队列也不想用全局变量

    实际接口代码和线程代码如下:

    A.队列和线程代码

      1 # 消息队列
      2 lock_queue = Queue()
      3 
      4 
      5 def check_kill(event):
      6     while True:
      7         # check queue
      8         if lock_queue.empty() is True:
      9             event.clear()
     10         # wait event
     11         if event.is_set() is not True:
     12             event.wait()
     13         # do some work
     14         sids, serials, minutes, hosts, insts, opasses, ospasses = [], [], [], [], [], [], []
     15 
     16         # get data until queue empty or datas more than 10
     17         if lock_queue.empty() is not True:
     18             data = lock_queue.get()
     19             for i in data:
     20                 sid, serial, minute, host, inst, opass, ospass = i.split(',')
     21                 sids.append(sid)
     22                 serials.append(serial)
     23                 minutes.append(minute)
     24                 hosts.append(host)
     25                 insts.append(inst)
     26                 opasses.append(opass)
     27                 ospasses.append(ospass)
     28 
     29         # init the command
     30         kill_command = 'kill -9'
     31 
     32         # each time we deal less or equal 10 check
     33         for i in range(len(minutes)):
     34             current = datetime.datetime.now().minute
     35             if current >= int(minutes[i]):
     36                 passtime = current - int(minutes[i])
     37             else:
     38                 passtime = current + 60 - int(minutes[i])
     39             
     40             print("passtime is", passtime)
     41             if (5 - passtime) >= 0:
     42                 time.sleep((5 - passtime)*60)
     43 
     44             # split piece of list
     45             sql_sids, sids = sids[0], sids[1:]
     46             sql_serials, serials = serials[0], serials[1:]
     47             sql_hosts, hosts = hosts[0], hosts[1:]
     48             sql_insts, insts = insts[0], insts[1:]
     49             sql_opass, opasses = opasses[0], opasses[1:]
     50             sql_ospass, ospasses = ospasses[0], ospasses[1:]
     51 
     52             print("data", sql_hosts, sql_insts, sql_serials, sql_sids)
     53             # create cursor
     54             
     55             try:
     56                 conn = sqlite3.connect('data-dev.sqlite')
     57                 c = conn.cursor()
     58                 cu = c.execute("select ouser,oport,osport,osuser from tool_target where host='%s' and inst='%s'" % (sql_hosts, sql_insts))
     59 
     60                 result = cu.fetchall()
     61 
     62                 ouser = result[0][0]
     63                 opass = sql_opass
     64                 str_conn = (sql_hosts
     65                             + ':'
     66                             + str(result[0][1])
     67                             + '/'
     68                             + sql_insts)
     69                 odb = cx_Oracle.connect(ouser, opass, str_conn)
     70                 cursor = odb.cursor()
     71 
     72                 # select to find if lock exist
     73                 sql = '''select b.spid, a.sid, a.serial#, a.event from v$session a, v$process b  
     74                         where a.sid = %s and a.serial# = %s ''' % (sql_sids, sql_serials)
     75                 
     76                 cursor.execute(sql)
     77                 answer = cursor.fetchall()
     78                 print("answer is", answer)
     79                 kill_command += ' ' + answer[0][0]
     80 
     81                 s = paramiko.SSHClient()
     82                 s.load_system_host_keys()
     83                 s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
     84                 s.connect(sql_hosts, result[0][2], result[0][3], sql_ospass)
     85                 stdin, stdout, stderr = s.exec_command(kill_command)
     86                 stdout.read()
     87                 print('------------------------')
     88                 s.close()
     89                 cursor.close()
     90                 odb.close()
     91                 c.close()
     92                 conn.close()
     93             except:
     94                 pass
     95 
     96 
     97 txkill_ready = threading.Event()
     98 t1 = threading.Thread(target=check_kill, args=(txkill_ready,), name='t1')
     99 t1.start()
    100 # txkill_ready.set()

    B.接口代码

     1 @main.route('/txlock/startkillurl', methods=['POST'])
     2 def start_kill_url():
     3     if request.method == 'POST':
     4         cmd = request.form.getlist('list')[0]
     5         host = request.form.getlist('host')[0]
     6         inst = request.form.getlist('inst')[0]
     7         # print(len(cmd))
     8         # cmd.replace("
    ", "")
     9         # cmd.replace("	", "")
    10         # print(len(cmd))
    11         
    12         tooltarget = ToolTarget.query.filter_by(host=host, inst=inst).first()
    13         ouser = tooltarget.ouser
    14         opass = ToolTarget.de_rsa(pwd=tooltarget.opass)
    15         ospass = ToolTarget.de_rsa(pwd=tooltarget.ospass)
    16         str_conn = (tooltarget.host
    17                     + ':'
    18                     + str(tooltarget.oport)
    19                     + '/'
    20                     + tooltarget.inst)
    21         odb = cx_Oracle.connect(ouser, opass, str_conn)
    22         cursor = odb.cursor()
    23 
    24         # add into queue
    25         c = re.findall('d*,d*', cmd)
    26         d = [i+','+str(datetime.datetime.now().minute)+','+host+','+inst+','+opass+','+ospass for i in c]
    27         # data example : ['15,5,17', '16,23,17', '14,5,17', '142,1,17']
    28         lock_queue.put(d)
    29         txkill_ready.set()
    30 
    31         try:
    32             cursor.execute(cmd)
    33             # pass
    34         except:
    35             return "执行失败,关闭弹窗后会自动刷新列表"
    36     return "执行成功,关闭弹窗后会自动刷新列表"
  • 相关阅读:
    使用FormatterServices 类序列化或反序列化
    HaozesFx(飞信精灵)发布
    EF Provider for Access/ODBC 以及ADO.Net Entity Framework 与Linq to SQL的比较和适用场景:
    Fetion2008 分析 Part3:会话
    Gleaner(个人文档管理)
    This implementation is not part of the Windows Platform FIPS validated cryptographic algorithms.
    发一个收取Pop3邮件的代码
    码农架构技术周刊 | 第1期
    这样学Redis,才能技高一筹!
    CommunityServer2.1删除anonymous帐号后的解决办法
  • 原文地址:https://www.cnblogs.com/ranyuu/p/7593670.html
Copyright © 2011-2022 走看看