zoukankan      html  css  js  c++  java
  • python 3.5构建WINDOWS推送服务

    import ConfigParser
    import os
    import sys
    
    cf = ConfigParser.ConfigParser()
    #绝对路径获取
    ABSPATH=os.path.abspath(sys.argv[0])
    ABSPATH=os.path.dirname(ABSPATH)+"/"
    
    cf.read(ABSPATH +'digest.conf')

    利用PYTHON3.5构建WINDOWS服务

    功能介绍:

    利用PYTHON服务,向用户进行数据推送服务

    实现的功能:

    1、客户端验证

    2、配置文件加解密

    3、MSSQL数据库的读取

    4、日志记录

    服务端:

    #!/usr/bin/env python
    # -*- coding: UTF8 -*-
    import win32serviceutil
    import win32service
    import win32event
    import win32timezone
    import sys
    import servicemanager
    import os,sys,socket
    rootdir =os.path.abspath(sys.argv[0])
    rootdir =os.path.dirname(rootdir) +"/"
    import select,pymssql,logging,configparser
    from logging.handlers import TimedRotatingFileHandler
    # 以下_mssql decimal 模块用于打包PY文件时
    import _mssql
    import decimal
    import time,json
    import threading
    import hashlib
    cf =configparser.ConfigParser()
    cf.read(rootdir +'socket_server.conf')
    
    
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y/%m/%d %H:%M:%S')
    logger = logging.getLogger("log")  # name
    logger.setLevel(logging.DEBUG)
    try:
        # 按1天进行日志切割,同时保存20天的日志
        fh =TimedRotatingFileHandler(rootdir +"log/"+ "sock_server.log","d",1,20)
    except:
        os.makedirs(rootdir +"log/")
        fh =TimedRotatingFileHandler(rootdir +"log/"+ "sock_server.log","d",1,20)
    fh.setLevel(logging.DEBUG)
    fh.setFormatter(formatter)
    # 给log 添加Handler
    logger.addHandler(fh)
    
    # 加密函数
    def encrypt(s,key=20):
        """
        加密方法
        :param key:
        :param s:
        :return:
        """
        b = bytearray(str(s).encode("gbk"))
        n = len(b)  # 求出 b 的字节数
        c = bytearray(n*2)
        j = 0
        for i in range(0, n):
            b1 = b[i]
            b2 = b1 ^ key  # b1 = b2^ key
            c1 = b2 % 16
            c2 = b2 // 16  # b2 = c2*16 + c1
            c1 = c1 + 65
            c2 = c2 + 65 # c1,c2都是0~15之间的数,加上65就变成了A-P 的字符的编码
            c[j] = c1
            c[j+1] = c2
            j = j+2
        return c.decode("gbk")
    # 解密函数
    def decrypt(s,key=20):
        """
        解密方法
        :param key:
        :param s:
        :return:
        """
        c = bytearray(str(s).encode("gbk"))
        n = len(c) # 计算 b 的字节数
        if n % 2 != 0 :
            return ""
        n = n // 2
        b = bytearray(n)
        j = 0
        for i in range(0, n):
            c1 = c[j]
            c2 = c[j+1]
            j = j+2
            c1 = c1 - 65
            c2 = c2 - 65
            b2 = c2*16 + c1
            b1 = b2^ key
            b[i]= b1
        try:
            return b.decode("gbk")
        except:
            return "failed"
    # 初始华MSSQL类方法
    class MSSQL:
        """
        对pymssql的简单封装
        pymssql库,该库到这里下载:http://www.lfd.uci.edu/~gohlke/pythonlibs/#pymssql
        使用该库时,需要在Sql Server Configuration Manager里面将TCP/IP协议开启
        用法:
        """
        def __init__(self,host,user,pwd,db,port =1433):
            self.host = host
            self.port = port
            self.user = user
            self.pwd = pwd
            self.db = db
        def __GetConnect(self):
            """
            得到连接信息
            返回: conn.cursor()
            """
            try:
                if not self.db:
                    logger.info("没有设置数据库信息")
                self.conn = pymssql.connect(host=self.host,port=self.port,user=self.user,password=self.pwd,database=self.db,charset="utf8")
                cur = self.conn.cursor()
                if not cur:
                    logger.info("连接数据库失败!")
                else:
                    return cur
            except Exception as E:
                logger.info("连接数据库异常,请检查配置!错误信息:%s"%E)
        def ExecQuery(self,sql):
            """
            执行查询语句
            返回的是一个包含tuple的list,list的元素是记录行,tuple的元素是每行记录的字段
            """
            cur = self.__GetConnect()
            cur.execute(sql)
            resList = cur.fetchall()
            # 查询完毕后必须关闭连接
            self.conn.close()
            return resList
    
        def ExecNonQuery(self,sql):
            """
            执行非查询语句
    
            调用示例:
                cur = self.__GetConnect()
                cur.execute(sql)
                self.conn.commit()
                self.conn.close()
            """
            cur = self.__GetConnect()
            cur.execute(sql)
            self.conn.commit()
            self.conn.close()
    # SQL语句执行模块
    def mssql(conn):
        """ 对指定数据进行查询,并返回结果!"""
        ms = MSSQL(host=cf.get("DB","ip"),user=decrypt(cf.get("DB","username")),pwd=decrypt(cf.get("DB","password")),db=cf.get("DB","db"),port=int(cf.get("DB","port")))
        x = 0
        while True:
            try:
                sql ="select top 10 ChatContentID ,a.Siteid,ChatContent,ChatMemberName,ChatMemberLevelID,ChatMemberLevelTitle,AccMemberName,AccMemberLevelID,AccMemberLevelTitle,a.States,ChatType,IsPush,IsRobot,a.CreateDate,b.MemberID from dbo.ChatContent a left join Member b on a.ChatMemberPhone=b.Phone where ChatContentID !=0 and ChatType in (0,1,2,6) and ChatContentID>%s order by CreateDate desc"%x
                res = ms.ExecQuery(sql)
                if res != "":
                    x = res[0][0]
                    json_res = ""
                    result = {}
                    for i in res:
                        result["ChatContentID"]=i[0]
                        result["Siteid"] = i[1]
                        result["ChatContent"]=i[2]
                        result["ChatMemberName"]=i[3]
                        result["ChatMemberLevelID"] =i[4]
                        result["ChatMemberLevelTitle"] =i[5]
                        result["AccMemberName"] = i[6]
                        result["AccMemberLevelID"] =i[7]
                        result["AccMemberLevelTitle"] =i[8]
                        result["States"]=i[9]
                        result["ChatType"]=i[10]
                        result["IsPush"]=i[11]
                        result["IsRobot"] =i[12]
                        result["CreateDate"] =str(i[13])
                        result["MemberID"] = i[14]
                        json_res += json.dumps(result)+"
    "
                    conn.send(bytes(json_res,encoding="utf8"))
            except Exception as e:
                # 没有数据等待10秒
                time.sleep(int(cf.get("TIME","conn_time")))
                continue
    # SOCKET主进程模块
    def process(conn,addr):
    
        try:
            i = 0
            # 认证失败允许重试3次
            while i < 3:
                flage = False
                # 接收客户端连接请求信息
                info = conn.recv(1000)
                # 实例化加密函数
                hash = hashlib.sha512()
                hash.update(bytes("123",encoding="utf8"))  # KEY=123
                hash_pwd = hash.hexdigest()
                if info.decode() == hash_pwd:
                    logger.info("客户端ip:[%s]认证成功!"%addr[0])
                    flage = True
                # 接收用户及密码信息
                while flage:
                    mssql(conn)
                else:
                    # 登陆失败,发送给客户端重新验证
                    i += 1
                    logger.info("客户端ip:[%s]认证失败!"%addr[0])
                    conn.send(bytes("验证失败!","utf8"))
                if i > 2:
                    # 主动关闭连接
                    logger.warning("客户端ip:[%s]超过认证限制,服务端强制中断连接!"%addr[0])
                    conn.close()
        except Exception as e:
            logger.debug(e)
            conn.close()
    # SOCKET服务模块
    def sock_server():
        '''
        启动服务器端,开启线程监听
        :return:
        '''
    
        server = socket.socket()
        server_ip=cf.get("HOST","ip")
        server_port = int(cf.get("HOST","port"))
        # server_ip ="localhost"
        # server_port = 4561
        server.bind((server_ip,server_port))
        server.listen(10)
        while True:
            r,w,e = select.select([server,], [], [], 1)
            for i,server in enumerate(r):
                conn,addr = server.accept()
                # 创建线程
                t = threading.Thread(target=process, args=(conn, addr))
                # 启动线程
                t.start()
    # 建立WINDOWS服务框架
    class win32test(win32serviceutil.ServiceFramework):
    
        _svc_name_ = "socketserver"
        _svc_display_name_ = "socketserver_display"
        _svc_description_ = "socketserver_disc"
        def __init__(self, args):
            self.run = True
            win32serviceutil.ServiceFramework.__init__(self, args)
            self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
        def SvcDoRun(self):
            while self.run:
                # 调用用户程序
                sock_server()
                win32event.WaitForSingleObject(self.hWaitStop, win32event.INFINITE)
        def SvcStop(self):
            self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
            win32event.SetEvent(self.hWaitStop)
            self.run =False
    if __name__=='__main__':
        if len(sys.argv) == 1:
            try:
                servicemanager.Initialize()
                servicemanager.PrepareToHostSingle(win32test)
                servicemanager.StartServiceCtrlDispatcher()
            except:
                win32serviceutil.usage()
    
        else:
            win32serviceutil.HandleCommandLine(win32test)

     客户端:

    #!/usr/bin/env python3.5
    # -*-coding:utf8-*-
    """
    本实例客户端用于不断接收不定长数据,存储到变量res
    """
    import socket,time,hashlib,json
    ip_port = ('192.168.1.189',1888)
    sk = socket.socket()
    sk.connect(ip_port)
    sk.setblocking(0) # 非阻塞模式,当接收没有发现任何数据时出异常
    
    while True:
        user_input=input("cmd>>:").strip()
        # 重新初始化加密函数
        hash = hashlib.sha512()
        if len(user_input) ==0:continue
        if user_input =="q":break
        hash.update(bytes(user_input,encoding="utf8"))
        hash_pwd = hash.hexdigest()
        # print(hash_pwd) # 打印生成的加密函数
        sk.send(bytes(hash_pwd,'utf8'))
        res = ""
        while True:
            try:
                time.sleep(0.1)
                time1 = time.time()
                server_replay = sk.recv(8000).decode()
                res += str(server_replay)
            except BlockingIOError:
                time2 = time.time()
                print("接收数据完成,耗时:%s秒" %(time2-time1))
                break
        result = res.split("
    ")
        try :
            for i in result:
                xx = json.loads(i,"utf8")
                print(xx)
        except Exception:
            pass
        res = ""
    
    sk.close()
  • 相关阅读:
    linux之ftp命令详解
    ubuntu-18.04 设置开机启动脚本
    web端调起Windows系统应用程序(exe执行文件),全面兼容所有浏览器
    logstash 6.6.0 读取nginx日志 插入到elasticsearch中
    微服务架构中服务注册与发现
    logstash kafka output 日志处理
    filebeat输出到kafka
    nginx优化之request_time 和upstream_response_time差别
    利用ldirectord实现lvs后端realserver健康状态检查
    ELK 架构之 Logstash 和 Filebeat 安装配置
  • 原文地址:https://www.cnblogs.com/IPYQ/p/5681558.html
Copyright © 2011-2022 走看看