zoukankan      html  css  js  c++  java
  • python 基于itchat模块实现微信聊天机器人

    环境
    基于python 3.x
    模块itchat
    
    pip install itchat
    目标
    我们希望这个简单的代码实现支持下面几点功能:
    
    支持对分享的文章的响应
    支持对群聊中@的信息的响应
    支持对某些信息进行自动回复
    支持对(1对1聊天或是群聊)撤回的信息的显示
    支持对群聊特定人的设置,即群聊中只有该被设置人发信息时会得到机器人的响应
    支持对私聊特定人的设置,即私聊会话中只有他才会收到某些特定的自动回复信息
    相关参考
    itchat
    API 列表
    一件有趣的事:我用 Python 爬了爬自己的微信朋友
    用Python玩微信(非常详细)
    itchat—python实现调用微信接口的第三方模块
    关于原理
    实际上是微信的网页版,而这个模块帮助我们收发这些消息产生传递过程中的网络请求。
    若读者想自己搞懂程序为什么那样写那样取值,可以在运行过程中打印出msg的内容,并观察其键值的组织结构。
    
    print(msg)
    1
    关于防撤回
    每条收到的微信信息,无论是存在于群聊会话中还是存在于私聊会话中,都有一个id,消息撤回时的数据格式中会提到被撤回的信息的id是什么,我们只需保存所有会话中的消息,一旦检测到撤回,就取id,根据id获得消息的内容。
    在存储消息这一块,我发现大多数人的实现是无限量的保存,也就是程序在运行的过程中内存占用会不断增大。为此,我单独对python内置类型dict进行子类化,使得其实例化对象只能添加一定量的键值,比如只能添加10个字典值(即只会保存十条最新的消息),那么一旦添加到第11个值就会删掉第1个添加的值,添加到第12个值就会删去第2个值,即在已经存满10个值的情况下添加新值,该实例会自动去除最先添加进来的键值。
    
    #子类化字典,使得该类只能添加十个字典值,防止过度占用空间
    class weixin_dict(dict):
        def __init__(self,items=3):
            dict.__init__(self)
            self.nums = items
            self.items = 0
                    
        #传进来的value必须是一个字典
        def __setitem__(self,key,value):
            if self.items < self.nums:            
                self.items +=1    
                value['time'] = time.time()           
                return dict.__setitem__(self,key,value)
                
            else:
                #print('字典已经满')            
                veryold_msgtime = time.time()
                veryold_key = None
                
                for item in self.keys():                
                    if veryold_msgtime > self[item]['time']:
                        veryold_msgtime = self[item]['time']
                        veryold_key = item
                        
                self.pop(veryold_key)
                self.items -= 1
                self.__setitem__(key,value)
                
    msg_dict = weixin_dict(items=10)
    msg_dict_one = weixin_dict(items=10)
    
    这个字典的信息,可以作为后续的联系人特征的数据分析使用。
    
    文件结构
    weixin.py
    loginconfig.ini
    response.xml
    使用登陆时读取loginconfig.ini的配置文件中的参数,使程序使用个性化
    使用加载.xml文件(该文件主要用于给用户配置自动回复的信息)实现个性化,代码与语言设置的自动回复分离。
    
    loginconfig.ini文件如下:
    
    [DEFAULT]
    AutoReply_tuling = 1
    AutoReply_mycode = 1
    DetectTable =
    BlackTable = 等等风,
    
    [VIEW]
    Look_one = 1
    Look_all = 0
    
    [SPEAK]
    2Somebody = 你又说话了,我盯着你呢,
    
    [REPLY2]
    reply_1 = 张三
    reply_2 = 李四
    reply_3 = 王五
    
    response.xml的文件内容如下:
    
    <?xml version="1.0"?>
    <wexin>
        <reply>
            <reply2all>
                <msg keyword="早安">早安!今天天气真好,有你这句问候天都似乎下起大雨</msg>
                <msg keyword="午安">午安!无风的午后,有你真好,睡了...</msg>
                <msg keyword="吃了吗">吃了啦!吃了阳澄湖大闸蟹,澳洲小龙虾,阿根廷牛肉...凡你所想,皆入吾肚果吾腹,你呢你吃了吗?是吃了白菜豆腐还是方便面?</msg>
                <msg keyword="在吗">亲!有什么事请直接告知,待我禀明主君知晓,容主君思量数日,再予你答复可好?</msg>
                <msg keyword="在?">亲!有什么事请直接告知,待我禀明主君知晓,容主君思量数日,再予你答复可好?</msg>
                <msg keyword="晚安">晚安!做个好梦...</msg>
                <msg keyword="新年">我也祝你新年猪事顺利,身体健康!</msg>
            </reply2all>
            <reply2someone>    
            </reply2someone>
        </reply>
    
        <send>
        </send>
    </wexin>
    
    
    另外程序还引入了调用图灵机器人接口作为自动回复,因此如果你想使用这个功能,需要去注册一下,替换代码中的apikey和userid参数。如果你不想使用这个功能,只需要在配置文件中将AutoReply_tuling参数修改为0即可
    
    AutoReply_tuling = 0
    1
    完整的代码实现
    # -*- coding: utf-8 -*-
    """
    Created on Wed Jan 23 10:06:52 2019
    
    @author: HJY
    """
    
    import itchat
    import time
    import re
    import threading
    import random
    import configparser
    import requests
    import json
    
    
    #子类化字典,使得该类只能添加十个字典值,防止过度占用空间
    class weixin_dict(dict):
        def __init__(self,items=3):
            dict.__init__(self)
            self.nums = items
            self.items = 0
                    
        #传进来的value必须是一个字典
        def __setitem__(self,key,value):
            if self.items < self.nums:            
                self.items +=1    
                value['time'] = time.time()           
                return dict.__setitem__(self,key,value)
                
            else:
                #print('字典已经满')            
                veryold_msgtime = time.time()
                veryold_key = None
                
                for item in self.keys():                
                    if veryold_msgtime > self[item]['time']:
                        veryold_msgtime = self[item]['time']
                        veryold_key = item
                        
                self.pop(veryold_key)
                self.items -= 1
                self.__setitem__(key,value)
     
    #==============================================================================================================
    #获取联系人的信息
    def get_friendinfo():    
        friends_info = itchat.get_friends(update=True)    
        
        MYID['id'] = friends_info[0]['UserName'] #自己的ID
        
        for friend in friends_info:
            
            key = friend['UserName']
            value = {
                'NickName': friend['NickName'],     #昵称
                'Sex': friend['Sex'],               #性别 
                'Province': friend['Province'],     #省份   
                'Signature': friend['Signature']    #签名                              
                }   
            
            #备注民存在没有设置时
            if friend['RemarkName'] == '':
                value['RemarkName'] = friend['NickName']
            else:
                value['RemarkName'] = friend['RemarkName']
            
            FRIENDS_DICT[key] = value
        
            
    #对收到的分享进行谢谢
    @itchat.msg_register(itchat.content.SHARING)
    def sharing_reply(msg):
        return 'Thanks very much! i will read it later'
    
    #私人对话过程中的消息撤回
    @itchat.msg_register(itchat.content.NOTE)
    def one_detect(msg):
        
        if '你撤回了一条消息' == msg['Text']:
            return
        
        if '撤回了一条消息' in msg['Content']:
                    
            revoke_msg_id = re.search("<msgid>(.*?)</msgid>", msg['Content']).group(1)                
            old_msg = msg_dict_one.get(revoke_msg_id, {}) 
            recall_msg = '撤回的内容:'+ old_msg.get('msg_content')
            msg.user.send(recall_msg )  
    
    #群聊会话过程中的消息撤回
    @itchat.msg_register(itchat.content.NOTE,isGroupChat=True)
    def detect_withdrawText(msg):  
    
        if '收到红包,请在手机上查看' in msg['Content']:
            msg.user.send('发现红包出没!马上动用一切手段秉明小主,我在这里刷屏聊天,你们马上打电话给小主!')
    
        if '撤回了一条消息' in msg['Content']:
    
            msg_owner = re.match('"(.*?)" 撤回了一条消息',msg['Text']).group(1)        
            revoke_msg_id = re.search("<msgid>(.*?)</msgid>", msg['Content']).group(1)        
            old_msg = msg_dict.get(revoke_msg_id, {})
            recall_msg =  msg_owner + '撤回的内容:'+ old_msg.get('msg_content')
            msg.user.send(recall_msg)
    
    
    #对收到的文本信息进行回复
    @itchat.msg_register(itchat.content.TEXT,)
    def text_reply(msg):
        global MSG_LOOK_ONE
        global reply
        global AutoReply_tuling,AutoReply_mycode
    
        #个人对话防止撤回
        msg_id = msg['MsgId']    
        msg_owner = msg['FromUserName']                 
        msg_content = ''
        msg_content = msg['Text']
        msg_dict_one[msg_id] = {
                "msg_owner": msg_owner,
                "msg_content":msg_content
            }  
                                      
        #是否打印信息
        if MSG_LOOK_ONE:
            print(FRIENDS_DICT[msg['FromUserName']]['RemarkName'],'say: ',msg_content)
            
        #若在黑名单中,则进行额外回复
        if FRIENDS_DICT[msg['FromUserName']]['RemarkName'] in BLACKTABLE:            
            auto_response = '主君已下线多日,归期无告,恐君久待,甚是抱歉!你可以语音留言...'
            return auto_response
    
        #自动回复含有特定关键字的信息,优先使用mycode的设置,如果没有匹配则使用图灵机器人
        if AutoReply_mycode:
            for child in reply:
                if child.get('keyword') in msg.text:
                    time.sleep(1)
                    return child.text
    
        #使用图灵机器人
        if AutoReply_tuling:
            return use_tuling(msg.text)
    
    #自动回复群聊天中的@信息
    @itchat.msg_register(itchat.content.TEXT,isGroupChat=True)
    def text_group_reply(msg):
        global MSG_LOOK_ALL
    
        if msg.isAt:
            msg.user.send('''I have send your message to my fileHelper! when i come back,i will reply as soon as i read!''')
    
        #监控撤回信息
        else:
            msg_id = msg['MsgId']       
            #msg_owner = msg['ActualNickName']
            
            msg_content = ''
            msg_content = msg['Text']
            msg_dict[msg_id] = {
                    #"msg_owner":msg_owner,
                    "msg_content":msg_content
                }
            
            #可以针对某个人的发言进行回复
            #print('群聊发言人:',msg['ActualNickName'])
    
            #是否显示群聊信息
            if MSG_LOOK_ALL == 1:
                print('发言内容:',msg_content)             
                    
            if FRIENDS_DICT[msg['ActualUserName']]['RemarkName'] in SOMEBODY:
                auto_all = random.choice(SPEAK2SOMEBODY)
                msg.user.send(f"{FRIENDS_DICT[msg['ActualUserName']]['RemarkName']}:{auto_all}")
    #======================================================================================================================            
    
    #调用图灵机器人智能回复信息
    #外传时,隐藏apikey
    def use_tuling(receive_msg:'user send this message to you'):
        url = "http://openapi.tuling123.com/openapi/api/v2"
        data = {
            "reqType":0,
            "perception": {
                "inputText": {
                    "text": receive_msg
                },
                "selfInfo": {
                    "location": {
                        "city": "广州",
                        "province": "广东",
                        "street": "北京路"
                    }
                }
            },
            "userInfo": {
                "apiKey": "xxxxxxxxxxx",
                "userId": "xxxxxxxxxxx"
            }
        }
            
        response = requests.post(url,data=json.dumps(data))
        replymsg = response.json()['results'][0]['values']['text']
        return replymsg
    
    
    #读取监控配置文件
    def read_config():
    
        #如果配置文件不存在,则创建配置文件,使用默认参数:未实现
        global MSG_LOOK_ONE,MSG_LOOK_ALL
        global SOMEBODY,BLACKTABLE
        global SPEAK2SOMEBODY
        global AutoReply_tuling,AutoReply_mycode
    
        config = configparser.ConfigParser()
        config.read('loginconfig.ini',encoding="utf-8-sig")
    
        SOMEBODY = set(config.get('DEFAULT','DetectTable').strip().split(','))
        BLACKTABLE = set(config.get('DEFAULT','BlackTable').strip().split(','))
        SPEAK2SOMEBODY = config.get('SPEAK','2Somebody').strip().split(',')
        MSG_LOOK_ONE = config.getint('VIEW','Look_one')
        MSG_LOOK_ALL = config.getint('VIEW','Look_all')
        AutoReply_tuling = config.getint('DEFAULT','AutoReply_tuling')
        AutoReply_mycode = config.getint('DEFAULT','AutoReply_mycode')
    
    
    #读取语言配置文件
    def load_language():
        import xml.etree.ElementTree as ET
        global reply
        tree = ET.parse('response.xml')
        root = tree.getroot()
        reply = root[0][0]
        #此处可以考虑直接生成一个字典,提高查找效率,否则信息回复会在搜索时耗费时间:未实现
    
    #----------------------------------------------------------------
    #全局变量   
    msg_dict = weixin_dict(items=10)
    msg_dict_one = weixin_dict(items=10)
    
    MSG_LOOK_ONE = 0
    MSG_LOOK_ALL = 0
    FRIENDS_DICT = {}
    BLACKTABLE = None
    SOMEBODY = None
    SPEAK2SOMEBODY = None
    reply = None
    MYID = {}
    
    
    #--------------------------------------------------------------
    if __name__ == '__main__':
    
        read_config()
        load_language()
    
        #扫码登陆微信,且保持一定断线后不需要重新扫码登陆
        itchat.auto_login(hotReload=True)
        
        #发信息告知本次登陆的黑名单和互怼好友
        config_info_1 = ','.join(SOMEBODY)
        config_info_2 = ','.join(BLACKTABLE)
        config_info = f'本次登陆的配置信息
    群聊被监视者:{config_info_1}
    黑名单:{config_info_2}'
        itchat.send_msg(msg=config_info,toUserName='filehelper')
        
        #获取联系人信息,分线程
        getinfo = threading.Thread(target=get_friendinfo,)
        getinfo.setDaemon(True)
        getinfo.start() 
        itchat.run()
  • 相关阅读:
    C++构造与析构 yongmou
    坏习惯 yongmou
    Python 字符串方法
    python 列表推导式轻量级循环
    python 循环遍历字典元素
    python 短路逻辑和条件表达式
    python 迭代器
    一些关于面向对象设计的思考
    python map内建函数
    Python 列表
  • 原文地址:https://www.cnblogs.com/liang715200/p/15077625.html
Copyright © 2011-2022 走看看