zoukankan      html  css  js  c++  java
  • 利用python itchat给女朋友定时发信息

    利用itchat给女朋友定时发信息

    涉及到的技术有itchat,redis,mysql,最主要的还是mysql咯,当然咯,这么多东西,我就只介绍我代码需要用到的,其他的,如果需要了解的话,就需要看参考资料了哟

    实现的功能:
    1.可以保存微信的消息,包括群聊和好友(文字/视频/语音/图片)
    2.在群里@自己,可以调用图灵机器人的API进行文字回复(类似于机器人)
    3.调用定时任务,在指定时间发送消息至某人

    需要了解的基础:
    1.python基础
    2.mysql基础
    3.redis基础

    实现效果如下:

    只需要在数据库中填写相应的数据,包括,发送给谁(to_user),如果有多个,则用分号(;)分开,执行间隔分钟数(exe_time),如,1440则代表一天,下一次执行时间戳(next_time),主要是抓这个时间来进行发送,抓取的键值(redis_keys)

    发送的效果是这样的:

     

    项目基础

    itchat模块

    官方参考文档:https://itchat.readthedocs.io/zh/latest/

    安装

    pip install itchat / pip3 install itchat

    最简单的测试给好友发送消息

    #产生二维码
    itchat.auto_login()
    #定义用户的昵称
    send_userid='用户的昵称'
    #查找用户的userid
    itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName']
    #利用send_msg发送消息
    itchat.send_msg('这是一个测试',toUserName=itcaht_user_name)

    测试小脚本

    #!/usr/bin/env python3
    
    import itchat
    
    #产生二维码
    itchat.auto_login()
    #定义用户的昵称
    send_userid='亲爱的'
    #查找用户的userid
    itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName']
    #利用send_msg发送消息
    itchat.send_msg('这是一个测试',toUserName=itcaht_user_name)

    在执行之后,会弹出一个二维码,然后会让你登录到网页微信,紧接着会去搜索用户信息,搜索到了之后会去取用户的ID,这个ID是微信随机给的,然后利用send_msg进行发送信息

    mysql模块

    参考文档:http://www.runoob.com/python3/python3-mysql.html

    安装

    pip install pymysql / pip3 install pymysql

    pymysql的基本使用

    连接mysql

    db = pymysql.connect(host='hostname',port=port,user='username',passwd='password',db='databasename',charset='charset')

    填写正确的信息后就成功的连接了mysql,注意,port这里,端口号不能加'',否则会连接不上mysql

    简单的操作mysql

    insert
    #连接DB
    db = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='test_1',charset='utf8')
    #编写SQL
    sql = "insert into test_message values (%d);" %(int(time.time()))
    #获取游标
    cursor = db.cursor()
    #数据库重连
    db.ping(reconnect=True)
    #执行SQL
    cursor.execute(sql)
    #commit 
    db.commit()

    执行完毕后,如果正确的话,就可以看到表中有数据了,当然,数据库和数据表得自己去建立才行,如果没有数据,则需要看看程序的输出结果了,还有一点,在做DML的时候,须要加上commit,这是事务的一个特性,否则数据会丢失的

    select
    #连接DB
    db = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='test_1',charset='utf8')
    #编写SQL
    sql = "select * from test_message"
    #获取游标
    cursor = db.cursor()
    #数据库重连
    db.ping(reconnect=True)
    #执行SQL
    cursor.execute(sql)
    #获取全部结果
    cursor.fetchall()
    #返回单个数据
    cursor.fetchone()

    执行完毕后,如果正确的话,就可以看到结果了

    redis模块

     参考文档:https://www.cnblogs.com/xiaoming279/p/6293583.html

    安装 

    pip install redis / pip3 install redis  

    连接Redis

    r = redis.Redis('host',port,db_port,'password')
    判断键是否存在
    r = redis.Redis('localhost',6379,0,'redis')
    keys = '123'
    if r.exists(keys):
        print ("存在")
    else:
        print ("不存在")
    随机取出列表中的数据
    r = redis.Redis('localhost',6379,0,'redis')
    len_keys = r.llen(keys)
    random_int = int(random.randint(0,len_keys-1))
    print (r.lindex(keys,random_int).decode('utf-8'))

    基本上在下面代码中,几乎会用到如上基础

    MySQL数据库/表建立

    数据库建立 

    CREATE DATABASE `itchat` /*!40100 DEFAULT CHARACTER SET utf8mb4 */

    数据表的建立

    itchat_login_info

    CREATE TABLE `itchat_login_info` (
    `createdate` int(11) NOT NULL,
    `myid` varchar(128) DEFAULT NULL,
    `username` varchar(128) DEFAULT NULL,
    `mysignature` varchar(128) DEFAULT NULL,
    `mysex` varchar(4) DEFAULT NULL,
    `myuseruin` varchar(128) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

    itchat_message

    CREATE TABLE `itchat_message` (
    `createdate` int(11) NOT NULL,
    `msgid` varchar(128) NOT NULL,
    `nickname` varchar(128) DEFAULT NULL,
    `username` varchar(128) DEFAULT NULL,
    `actualnickname` varchar(128) DEFAULT NULL,
    `actualusername` varchar(128) DEFAULT NULL,
    `msgtext` varchar(5000) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

    itchat_friend_message

    CREATE TABLE `itchat_friend_message` (
    `createdate` int(11) NOT NULL,
    `msgid` varchar(128) NOT NULL,
    `fromuser` varchar(128) NOT NULL,
    `fromuserid` varchar(128) NOT NULL,
    `fromsex` varchar(4) NOT NULL,
    `touser` varchar(128) NOT NULL,
    `touserid` varchar(128) NOT NULL,
    `tousersex` varchar(4) NOT NULL,
    `msgtext` varchar(5000) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

    auto_timer 

    CREATE TABLE `auto_timer` (
    `timer_id` int(11) NOT NULL,
    `createdate` int(11) NOT NULL,
    `content` varchar(200) DEFAULT NULL,
    `to_user` varchar(1000) NOT NULL,
    `exe_time` int(11) NOT NULL,
    `last_time` int(11) DEFAULT NULL,
    `next_time` int(11) NOT NULL,
    `redis_keys` varchar(100) NOT NULL,
    `timer_count` bigint(20) NOT NULL,
    PRIMARY KEY (`timer_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

    代码

    mian.py

    #!/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    
    import itchat
    from itchat.content import *
    import collections_logs
    import save_db
    import liwang_redis_itchat
    import time 
    import threading
    import os
    
    #定义装饰器,用于监听图片,视频等消息,并且下载下来
    @itchat.msg_register([PICTURE, RECORDING, ATTACHMENT, VIDEO],isFriendChat=True, isGroupChat=True)
    def download_files(msg):
        logger.debug(msg['FromUserName'])
        logger.debug("发送的是语音/视频等,需要保存至本地")
        msg.download(msg.fileName)
        file_value = '127.0.0.1/' + msg.FileName + '_' + msg['Type']
        logger.debug(file_value)
    
        #将消息写入数据库
        if '@@' in msg['FromUserName'] :
            logger.debug("判断为群聊,将数据写入MySQL数据库itchat_message中")
            sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['ActualNickName'],msg['ActualUserName'],(file_value))
            save_db.exe_db(db,sql,logger)
        elif (msg['FromUserName'] == MyID and '@@' in msg['ToUserName']):
            logger.debug("判断为群聊,且消息为自己发送的,将数据写入MySQL数据库itchat_message中")
            sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],myUserName,msg['ActualUserName'],(file_value))
            save_db.exe_db(db,sql,logger)
        else :
        #将群聊设置为0,让后面的@抓不到
            isgroup = 0
            #判断是否是自己发送
            if msg['FromUserName'] == MyID :
                logger.debug("判断为好友且为自己发送的,将数据写入MySQL数据库itchat_friend_message中")
                sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],myUserName,MyID,mysex,msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],file_value)
                save_db.exe_db(db,sql,logger)
            else:
                logger.debug("判断为好友且为别人发送的,将数据写入MySQL数据库itchat_friend_message中")
                sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],myUserName,MyID,mysex,file_value)
                save_db.exe_db(db,sql,logger)
    
    
    #定义装饰器,用于监听好友和群聊微信群聊内容
    @itchat.msg_register(TEXT, isFriendChat=True, isGroupChat=True)   
    def simple_reply(msg):
    
        #定义群聊
        isgroup = 1
    
        #将消息写入数据库
        if '@@' in msg['FromUserName'] :
            logger.debug("判断为群聊,将数据写入MySQL数据库itchat_message中")
            sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['ActualNickName'],msg['ActualUserName'],(msg['Text']))
            save_db.exe_db(db,sql,logger)
        elif (msg['FromUserName'] == MyID and '@@' in msg['ToUserName']):
            logger.debug("判断为群聊,且消息为自己发送的,将数据写入MySQL数据库itchat_message中")
            sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],myUserName,msg['ActualUserName'],(msg['Text']))
            save_db.exe_db(db,sql,logger)
        else :
            #将群聊设置为0,让后面的@抓不到
            isgroup = 0
            #判断是否是自己发送
            if msg['FromUserName'] == MyID :
                logger.debug("判断为好友且为自己发送的,将数据写入MySQL数据库itchat_friend_message中")
                sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],myUserName,MyID,mysex,msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],msg['Text'])
                save_db.exe_db(db,sql,logger)
            else:
                logger.debug("判断为好友且为别人发送的,将数据写入MySQL数据库itchat_friend_message中")
                sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],myUserName,MyID,mysex,msg['Text'])
                save_db.exe_db(db,sql,logger)
    
        #将所有的聊天信息都写入到mongodb中
        #暂时不写,mongodb还未掌握基本的使用
    
        #这里写聊天机器人端口
        #判断是否@了自己,且为群聊
        if isgroup == 1 :
            if msg['isAt'] :
                logger.debug("@了自己,移动到redis_itchat")
                from_message = msg['Text']
                send_msg = liwang_redis_itchat.auto_box(from_message,redis_db,logger)
                itchat.send_msg(send_msg,toUserName=msg['ToUserName'])
                itchat.send_msg(send_msg,toUserName=msg['FromUserName'])
            else:
                logger.debug("没有@自己")
    
    
        #空出两个空LOG
        logger.debug("-------------------------------------------------------------------")
        logger.debug("-------------------------------------------------------------------")
    
    def auto_reply():
        logger.debug("============================================================================================")
        
        #获取全部ID sql 
        sql = "select timer_id from auto_timer;"
        #获取结果
        userid_list = save_db.select_db(db,sql,logger)
    
        #遍历auto_timer ID
        for userid in userid_list:
            #打印正在处理的信息
            sql = "select content from auto_timer where timer_id = '%s';" %(userid)
            id_content = save_db.select_db(db,sql,logger)[0]
            logger.debug("判断ID:%s" %(userid))
            logger.debug("判断内容:%s" %(id_content))
    
            #判断时间是否符合
            #获取当前的时间戳
            now_time = int(time.time())
            logger.debug("当前时间为:%s" %(now_time))
            sql = "select next_time from auto_timer where timer_id = '%s';" %(userid)
            next_time = save_db.select_db(db,sql,logger)[0]
            next_time = int(next_time[0])
            logger.debug("下次执行时间戳:%s" %(next_time))
    
            #判断,如果当前时间大于或等于下一次执行时间,就执行
            if now_time >= next_time :
                logger.debug("处理内容:%s" %(id_content))
    
                #获取发送人
                sql = "select to_user from auto_timer where timer_id = '%s';" %(userid)
                to_user = save_db.select_db(db,sql,logger)[0]
                to_user = to_user[0]
                logger.debug("查找用户为:%s" %(to_user))
    
                #单用户
                single_user = 1
    
                #判断字符串是否有分割
                if ';' in to_user :
                    allocation_user = to_user.split(';')
                    logger.debug("需要发送多人,信息为:%s" %(allocation_user))
                    single_user = 0
                else:
                    allocation_user = to_user
                    logger.debug("需要发送一人,信息为:%s" %(allocation_user))
    
                #判断为单用户,不需要做处理
                if single_user == 1 :
                    to_user = to_user + ';'
                    allocation_user = to_user.split(';')
                    
    
    
                for send_userid in allocation_user :
                    logger.debug("正在处理%s用户" %(send_userid))
                    #查找微信是否存在此用户
                    try:
                        itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName']
    
                        if itcaht_user_name == " ":
                            logger.debug("微信不存在此好友,请检查")
                        else:
                            #获取发送的内容:
                            sql = "select redis_keys from auto_timer where timer_id = '%s';" %(userid)
                            redis_keys = save_db.select_db(db,sql,logger)[0]
                            redis_keys = str(redis_keys[0])
                            return_values = liwang_redis_itchat.return_rand_keys(redis_db,logger,redis_keys)
    
                            #获取其他信息
                            sql = "select last_time from auto_timer where timer_id = '%s';" %(userid)
                            last_time = save_db.select_db(db,sql,logger)[0]
                            last_time = last_time[0]
    
                            sql = "select timer_count from auto_timer where timer_id = '%s';" %(userid)
                            timer_count = save_db.select_db(db,sql,logger)[0]
                            timer_count = int(timer_count[0])
    
                            sql = "select exe_time from auto_timer where timer_id = '%s';" %(userid)
                            exe_time = save_db.select_db(db,sql,logger)[0]
                            exe_time = int(exe_time[0]) * 60
    
    
                            logger.debug("将要发送的信息如下")
                            logger.debug("timer_ID:%s" %(userid))
                            logger.debug("发送简介为:%s" %(id_content))
                            logger.debug("上一次执行的时间戳是:%s" %(last_time))
                            logger.debug("下一次执行的时间戳是:%s" %(next_time))
                            logger.debug("获取Redis的值为:%s" %(redis_keys))
                            logger.debug("间隔描述为:%s" %(exe_time))
                            logger.debug("已经执行次数为:%s" %(timer_count))
    
                            try:
                                logger.debug("将要发送的消息: %s" %(return_values))
                                itchat.send_msg(return_values,toUserName=itcaht_user_name)
                                
                            except Exception as e:
                                logger.debug("信息发送失败,详细信息如下:")
                                logger.debug(e)
                    except Exception as e:
                        logger.debug("未找到该用户信息,详细信息如下:")
                        logger.debug(e)
    
                try:
                    # 更新信息
                    last_time = now_time + exe_time
                    timer_count = timer_count + 1
    
    
                    userid = userid[0]
    
                    sql = "update auto_timer set last_time = '%s' , next_time = '%s' , timer_count = '%s' where timer_id = '%s';" \
                    %(now_time,last_time,timer_count,userid)
    
                    save_db.exe_db(db,sql,logger)
    
                    logger.debug("更新信息完毕")
    
    
                except Exception as e:
                    logger.debug("消息已经发送,但是更新数据库失败,详细信息如下:")
                    logger.debug(e)
            else:
                logger.debug("判断时间未到")
    
            #修整60秒
            logger.debug("============================================================================================")
            time.sleep(3)
        time.sleep(60)
    
    if __name__ == "__main__" :
        
        #设置登陆的PK
        loginpk = 'login_' + str(time.time())
        
        #定义微信自动登录
        itchat.auto_login(enableCmdQR=2,statusStorageDir=loginpk)
    #    itchat.auto_login(statusStorageDir=loginpk)
    
        #获取自己的ID
        global MyID
        MyID = itchat.get_friends(update=True)[0]["UserName"]
    
        #获取用户信息
        global myUserName
        global mysex
        myUserName = itchat.get_friends()[0]['NickName']
        myUserUin = itchat.get_friends()[0]['Uin']
        mysignature = itchat.get_friends()[0]['Signature']
        mysex = itchat.get_friends()[0]['Sex']
    
        #设置logger
        global logger
        logname = str(myUserName) + '_' + str(myUserUin)
        logger = collections_logs.build_logs(logname)
        
        #定义数据库
        global db
        db = save_db.itchat_connect_mysql(logger)
        global redis_db
        redis_db = liwang_redis_itchat.connect_redis(logger)
    
        #每登陆一次,记录一次login
        logger.debug("记录登陆Log")
        sql = "insert into itchat_login_info values (%d,'%s','%s','%s','%s','%s');" %(int(time.time()),MyID,myUserName,mysignature,mysex,myUserUin)
        save_db.exe_db(db,sql,logger)
    
        #启动线程
        threading._start_new_thread(itchat.run,())
    
        #开始循环
        while 1:
            itchat.configured_reply()
            auto_reply()

    collections_logs.py

    #!/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    
    import logging
    import time
    
    def build_logs(file_name):
    
        #获取当前时间
        now_time = time.strftime("%Y_%m_%d_%H_%M_%S",time.localtime())
        #设置log名称
        logname = 'itchat' + file_name + now_time
        #定义logger
        logger = logging.getLogger()
        #设置级别为debug
        logger.setLevel(level = logging.DEBUG)
        #设置 logging文件名称
        handler = logging.FileHandler(logname)
        #设置级别为debug
        handler.setLevel(logging.DEBUG)
        #设置log的格式
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        #将格式压进logger
        handler.setFormatter(formatter)
        console = logging.StreamHandler()
        console.setLevel(logging.DEBUG)
        #写入logger
        logger.addHandler(handler)
        logger.addHandler(console)
        #将logger返回
        return logger

    save_db.py

    #!/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    
    import pymysql
    
    def itchat_connect_mysql(logger):
    
        try:
            db = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='itchat',charset='utf8')
            logger.debug('MySQL数据库连接成功')
            logger.debug('数据库ID:%s' %(db))
            return db
        except Exception as e:
            logger.debug('MySQL数据库连接失败')
            logger.debug(e)
    
    def exe_db(db,sql,logger):
        try:
            cursor = db.cursor()
            logger.debug('获取游标:%s' %(cursor))
            db.ping(reconnect=True)
            logger.debug("数据库重连")
            cursor.execute(sql)
            logger.debug("执行SQL")
            logger.debug(sql)
            db.commit()
            logger.debug("数据库commit")
        except Exception as e:
            logger.debug('SQL执行失败,请至日志查看该SQL记录')
            logger.debug(sql)
            logger.debug(e)
    
    def select_db(db,sql,logger):
        try:
            cursor = db.cursor()
            logger.debug('获取游标:%s' %(cursor))
            db.ping(reconnect=True)
            logger.debug("数据库重连")
            cursor.execute(sql)
            logger.debug("执行SQL")
            logger.debug(sql)
            return cursor.fetchall()
        except Exception as e:
            logger.debug('SQL执行失败,请至日志查看该SQL记录')
            logger.debug(sql)
            logger.debug(e)

    liwang_redis_itchat.py

    #!/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    
    import redis
    import json
    import requests
    import random
    
    
    def connect_redis(logger):
        try:
            r = redis.Redis('127.0.0.1',6379,0,'redis')
            logger.debug("Redis数据库连接成功")
            return r
        except Exception as e:
            logger.debug("Redis数据库连接失败")
            logger.debug(e)
    
    def return_rand_keys(redis_db,logger,keys):
    
        #判断是否有这个key
    
        if redis_db.exists(keys):
    
            len_keys = redis_db.llen(keys)
            
            random_int = int(random.randint(0,len_keys-1))
            logger.debug("Redis获取的随机值为:%s" %(random_int))
    
            return_redis_values = redis_db.lindex(keys,random_int).decode('utf-8')
            logger.debug("Redis将要返回的值为:%s" %(return_redis_values))
    
            return return_redis_values
    
        else:
            logger.debug("没有找到相关key")
            return 0
    
    def auto_box(message,redis_db,logger):
    
        #设置分隔符
        if '\u2005' in message :
            myid='@小子\u2005'
        else:
            myid = '@小子 '
    
        #获取分隔符之后的值
        if len(message.split(myid)) == 1 :
            return "@我了要说话哟"
        else:
            dealwith = message.split(myid)[1]
    
        if dealwith == " " :
            logger.debug("只是@了我,并没有输入内容")
        else:
            #图灵机器人API接口
            url_api = "http://openapi.tuling123.com/openapi/api/v2"
    
            #将问题赋值给box_quest
            box_quest = dealwith
    
            #定义json post 
            request_json = json.dumps ({
                "perception":
                {
                    "inputText":
                    {
                        "text": box_quest
                    },
    
                    "selfInfo":
                    {
                        "location":
                        {
                            "city": "成都",
                            "province": "成都",
                            "street": "天府三街"
                        }
                    }
                },
    
                "userInfo": 
                {
                    "apiKey": "APIkey",
                    "userId": "tuling"
                }
            }
            )
    
            # 获取POST之后的值
            r = requests.post(url_api, data=request_json)
            r_test = r.text
            r_json = json.loads(r_test)
    
            #定义返回值
            return_str = (r_json['results'][0]['values']['text'])
    
            logger.debug("图灵机器人将要返回的值:")
            logger.debug(return_str)
    
            #需要自己建立自己的机器人仓库
            logger.debug("将返回的结果存入Redis数据库中")
            redis_db.rpush(box_quest,return_str)
    
            return return_str

    实现的效果 

    如下以log展现

    接收消息并且保存至数据库

     

    图灵机器人回复 

    定时任务

    欢迎转发! 请保留源地址: https://www.cnblogs.com/NoneID
  • 相关阅读:
    Java程序:从命令行接收多个数字,求和并输出结果
    大道至简读后感
    大道至简第一章读后感Java伪代码
    Creating a SharePoint BCS .NET Connectivity Assembly to Crawl RSS Data in Visual Studio 2010
    声明式验证超时问题
    Error message when you try to modify or to delete an alternate access mapping in Windows SharePoint Services 3.0: "An update conflict has occurred, and you must re-try this action"
    Upgrading or Redeploying SharePoint 2010 Workflows
    Upgrade custom workflow in SharePoint
    SharePoint 2013中Office Web Apps的一次排错
    How to upgrade workflow assembly in MOSS 2007
  • 原文地址:https://www.cnblogs.com/NoneID/p/9744502.html
Copyright © 2011-2022 走看看