zoukankan      html  css  js  c++  java
  • 利用python itchat给女朋友定时发信息

    利用itchat给女朋友定时发信息

    涉及到的技术有itchat,redis,mysql,最主要的还是mysql咯,当然咯,这么多东西,我就只介绍我代码需要用到的,其他的,如果需要了解的话,就需要看参考资料了哟

    实现的功能:
    1.可以保存微信的消息,包括群聊和好友(文字/视频/语音/图片)
    2.在群里@自己,可以调用图灵机器人的API进行文字回复(类似于机器人)
    3.调用定时任务,在指定时间发送消息至某人

    需要了解的基础:
    1.python基础
    2.mysql基础
    3.redis基础

    实现效果如下:

    只需要在数据库中填写相应的数据,包括,发送给谁(to_user),如果有多个,则用分号(;)分开,执行间隔分钟数(exe_time),如,1440则代表一天,下一次执行时间戳(next_time),主要是抓这个时间来进行发送,抓取的键值(redis_keys)

    发送的效果是这样的:

     

    项目基础

    itchat模块

    官方参考文档:https://itchat.readthedocs.io/zh/latest/

    安装

    pip install itchat / pip3 install itchat

    最简单的测试给好友发送消息

    #产生二维码
    itchat.auto_login()
    #定义用户的昵称
    send_userid='用户的昵称'
    #查找用户的userid
    itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName']
    #利用send_msg发送消息
    itchat.send_msg('这是一个测试',toUserName=itcaht_user_name)

    测试小脚本

    #!/usr/bin/env python3
    
    import itchat
    
    #产生二维码
    itchat.auto_login()
    #定义用户的昵称
    send_userid='亲爱的'
    #查找用户的userid
    itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName']
    #利用send_msg发送消息
    itchat.send_msg('这是一个测试',toUserName=itcaht_user_name)

    在执行之后,会弹出一个二维码,然后会让你登录到网页微信,紧接着会去搜索用户信息,搜索到了之后会去取用户的ID,这个ID是微信随机给的,然后利用send_msg进行发送信息

    mysql模块

    参考文档:http://www.runoob.com/python3/python3-mysql.html

    安装

    pip install pymysql / pip3 install pymysql

    pymysql的基本使用

    连接mysql

    db = pymysql.connect(host='hostname',port=port,user='username',passwd='password',db='databasename',charset='charset')

    填写正确的信息后就成功的连接了mysql,注意,port这里,端口号不能加'',否则会连接不上mysql

    简单的操作mysql

    insert
    #连接DB
    db = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='test_1',charset='utf8')
    #编写SQL
    sql = "insert into test_message values (%d);" %(int(time.time()))
    #获取游标
    cursor = db.cursor()
    #数据库重连
    db.ping(reconnect=True)
    #执行SQL
    cursor.execute(sql)
    #commit 
    db.commit()

    执行完毕后,如果正确的话,就可以看到表中有数据了,当然,数据库和数据表得自己去建立才行,如果没有数据,则需要看看程序的输出结果了,还有一点,在做DML的时候,须要加上commit,这是事务的一个特性,否则数据会丢失的

    select
    #连接DB
    db = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='test_1',charset='utf8')
    #编写SQL
    sql = "select * from test_message"
    #获取游标
    cursor = db.cursor()
    #数据库重连
    db.ping(reconnect=True)
    #执行SQL
    cursor.execute(sql)
    #获取全部结果
    cursor.fetchall()
    #返回单个数据
    cursor.fetchone()

    执行完毕后,如果正确的话,就可以看到结果了

    redis模块

     参考文档:https://www.cnblogs.com/xiaoming279/p/6293583.html

    安装 

    pip install redis / pip3 install redis  

    连接Redis

    r = redis.Redis('host',port,db_port,'password')
    判断键是否存在
    r = redis.Redis('localhost',6379,0,'redis')
    keys = '123'
    if r.exists(keys):
        print ("存在")
    else:
        print ("不存在")
    随机取出列表中的数据
    r = redis.Redis('localhost',6379,0,'redis')
    len_keys = r.llen(keys)
    random_int = int(random.randint(0,len_keys-1))
    print (r.lindex(keys,random_int).decode('utf-8'))

    基本上在下面代码中,几乎会用到如上基础

    MySQL数据库/表建立

    数据库建立 

    CREATE DATABASE `itchat` /*!40100 DEFAULT CHARACTER SET utf8mb4 */

    数据表的建立

    itchat_login_info

    CREATE TABLE `itchat_login_info` (
    `createdate` int(11) NOT NULL,
    `myid` varchar(128) DEFAULT NULL,
    `username` varchar(128) DEFAULT NULL,
    `mysignature` varchar(128) DEFAULT NULL,
    `mysex` varchar(4) DEFAULT NULL,
    `myuseruin` varchar(128) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

    itchat_message

    CREATE TABLE `itchat_message` (
    `createdate` int(11) NOT NULL,
    `msgid` varchar(128) NOT NULL,
    `nickname` varchar(128) DEFAULT NULL,
    `username` varchar(128) DEFAULT NULL,
    `actualnickname` varchar(128) DEFAULT NULL,
    `actualusername` varchar(128) DEFAULT NULL,
    `msgtext` varchar(5000) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

    itchat_friend_message

    CREATE TABLE `itchat_friend_message` (
    `createdate` int(11) NOT NULL,
    `msgid` varchar(128) NOT NULL,
    `fromuser` varchar(128) NOT NULL,
    `fromuserid` varchar(128) NOT NULL,
    `fromsex` varchar(4) NOT NULL,
    `touser` varchar(128) NOT NULL,
    `touserid` varchar(128) NOT NULL,
    `tousersex` varchar(4) NOT NULL,
    `msgtext` varchar(5000) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

    auto_timer 

    CREATE TABLE `auto_timer` (
    `timer_id` int(11) NOT NULL,
    `createdate` int(11) NOT NULL,
    `content` varchar(200) DEFAULT NULL,
    `to_user` varchar(1000) NOT NULL,
    `exe_time` int(11) NOT NULL,
    `last_time` int(11) DEFAULT NULL,
    `next_time` int(11) NOT NULL,
    `redis_keys` varchar(100) NOT NULL,
    `timer_count` bigint(20) NOT NULL,
    PRIMARY KEY (`timer_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

    代码

    mian.py

    #!/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    
    import itchat
    from itchat.content import *
    import collections_logs
    import save_db
    import liwang_redis_itchat
    import time 
    import threading
    import os
    
    #定义装饰器,用于监听图片,视频等消息,并且下载下来
    @itchat.msg_register([PICTURE, RECORDING, ATTACHMENT, VIDEO],isFriendChat=True, isGroupChat=True)
    def download_files(msg):
        logger.debug(msg['FromUserName'])
        logger.debug("发送的是语音/视频等,需要保存至本地")
        msg.download(msg.fileName)
        file_value = '127.0.0.1/' + msg.FileName + '_' + msg['Type']
        logger.debug(file_value)
    
        #将消息写入数据库
        if '@@' in msg['FromUserName'] :
            logger.debug("判断为群聊,将数据写入MySQL数据库itchat_message中")
            sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['ActualNickName'],msg['ActualUserName'],(file_value))
            save_db.exe_db(db,sql,logger)
        elif (msg['FromUserName'] == MyID and '@@' in msg['ToUserName']):
            logger.debug("判断为群聊,且消息为自己发送的,将数据写入MySQL数据库itchat_message中")
            sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],myUserName,msg['ActualUserName'],(file_value))
            save_db.exe_db(db,sql,logger)
        else :
        #将群聊设置为0,让后面的@抓不到
            isgroup = 0
            #判断是否是自己发送
            if msg['FromUserName'] == MyID :
                logger.debug("判断为好友且为自己发送的,将数据写入MySQL数据库itchat_friend_message中")
                sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],myUserName,MyID,mysex,msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],file_value)
                save_db.exe_db(db,sql,logger)
            else:
                logger.debug("判断为好友且为别人发送的,将数据写入MySQL数据库itchat_friend_message中")
                sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],myUserName,MyID,mysex,file_value)
                save_db.exe_db(db,sql,logger)
    
    
    #定义装饰器,用于监听好友和群聊微信群聊内容
    @itchat.msg_register(TEXT, isFriendChat=True, isGroupChat=True)   
    def simple_reply(msg):
    
        #定义群聊
        isgroup = 1
    
        #将消息写入数据库
        if '@@' in msg['FromUserName'] :
            logger.debug("判断为群聊,将数据写入MySQL数据库itchat_message中")
            sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['ActualNickName'],msg['ActualUserName'],(msg['Text']))
            save_db.exe_db(db,sql,logger)
        elif (msg['FromUserName'] == MyID and '@@' in msg['ToUserName']):
            logger.debug("判断为群聊,且消息为自己发送的,将数据写入MySQL数据库itchat_message中")
            sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],myUserName,msg['ActualUserName'],(msg['Text']))
            save_db.exe_db(db,sql,logger)
        else :
            #将群聊设置为0,让后面的@抓不到
            isgroup = 0
            #判断是否是自己发送
            if msg['FromUserName'] == MyID :
                logger.debug("判断为好友且为自己发送的,将数据写入MySQL数据库itchat_friend_message中")
                sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],myUserName,MyID,mysex,msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],msg['Text'])
                save_db.exe_db(db,sql,logger)
            else:
                logger.debug("判断为好友且为别人发送的,将数据写入MySQL数据库itchat_friend_message中")
                sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],myUserName,MyID,mysex,msg['Text'])
                save_db.exe_db(db,sql,logger)
    
        #将所有的聊天信息都写入到mongodb中
        #暂时不写,mongodb还未掌握基本的使用
    
        #这里写聊天机器人端口
        #判断是否@了自己,且为群聊
        if isgroup == 1 :
            if msg['isAt'] :
                logger.debug("@了自己,移动到redis_itchat")
                from_message = msg['Text']
                send_msg = liwang_redis_itchat.auto_box(from_message,redis_db,logger)
                itchat.send_msg(send_msg,toUserName=msg['ToUserName'])
                itchat.send_msg(send_msg,toUserName=msg['FromUserName'])
            else:
                logger.debug("没有@自己")
    
    
        #空出两个空LOG
        logger.debug("-------------------------------------------------------------------")
        logger.debug("-------------------------------------------------------------------")
    
    def auto_reply():
        logger.debug("============================================================================================")
        
        #获取全部ID sql 
        sql = "select timer_id from auto_timer;"
        #获取结果
        userid_list = save_db.select_db(db,sql,logger)
    
        #遍历auto_timer ID
        for userid in userid_list:
            #打印正在处理的信息
            sql = "select content from auto_timer where timer_id = '%s';" %(userid)
            id_content = save_db.select_db(db,sql,logger)[0]
            logger.debug("判断ID:%s" %(userid))
            logger.debug("判断内容:%s" %(id_content))
    
            #判断时间是否符合
            #获取当前的时间戳
            now_time = int(time.time())
            logger.debug("当前时间为:%s" %(now_time))
            sql = "select next_time from auto_timer where timer_id = '%s';" %(userid)
            next_time = save_db.select_db(db,sql,logger)[0]
            next_time = int(next_time[0])
            logger.debug("下次执行时间戳:%s" %(next_time))
    
            #判断,如果当前时间大于或等于下一次执行时间,就执行
            if now_time >= next_time :
                logger.debug("处理内容:%s" %(id_content))
    
                #获取发送人
                sql = "select to_user from auto_timer where timer_id = '%s';" %(userid)
                to_user = save_db.select_db(db,sql,logger)[0]
                to_user = to_user[0]
                logger.debug("查找用户为:%s" %(to_user))
    
                #单用户
                single_user = 1
    
                #判断字符串是否有分割
                if ';' in to_user :
                    allocation_user = to_user.split(';')
                    logger.debug("需要发送多人,信息为:%s" %(allocation_user))
                    single_user = 0
                else:
                    allocation_user = to_user
                    logger.debug("需要发送一人,信息为:%s" %(allocation_user))
    
                #判断为单用户,不需要做处理
                if single_user == 1 :
                    to_user = to_user + ';'
                    allocation_user = to_user.split(';')
                    
    
    
                for send_userid in allocation_user :
                    logger.debug("正在处理%s用户" %(send_userid))
                    #查找微信是否存在此用户
                    try:
                        itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName']
    
                        if itcaht_user_name == " ":
                            logger.debug("微信不存在此好友,请检查")
                        else:
                            #获取发送的内容:
                            sql = "select redis_keys from auto_timer where timer_id = '%s';" %(userid)
                            redis_keys = save_db.select_db(db,sql,logger)[0]
                            redis_keys = str(redis_keys[0])
                            return_values = liwang_redis_itchat.return_rand_keys(redis_db,logger,redis_keys)
    
                            #获取其他信息
                            sql = "select last_time from auto_timer where timer_id = '%s';" %(userid)
                            last_time = save_db.select_db(db,sql,logger)[0]
                            last_time = last_time[0]
    
                            sql = "select timer_count from auto_timer where timer_id = '%s';" %(userid)
                            timer_count = save_db.select_db(db,sql,logger)[0]
                            timer_count = int(timer_count[0])
    
                            sql = "select exe_time from auto_timer where timer_id = '%s';" %(userid)
                            exe_time = save_db.select_db(db,sql,logger)[0]
                            exe_time = int(exe_time[0]) * 60
    
    
                            logger.debug("将要发送的信息如下")
                            logger.debug("timer_ID:%s" %(userid))
                            logger.debug("发送简介为:%s" %(id_content))
                            logger.debug("上一次执行的时间戳是:%s" %(last_time))
                            logger.debug("下一次执行的时间戳是:%s" %(next_time))
                            logger.debug("获取Redis的值为:%s" %(redis_keys))
                            logger.debug("间隔描述为:%s" %(exe_time))
                            logger.debug("已经执行次数为:%s" %(timer_count))
    
                            try:
                                logger.debug("将要发送的消息: %s" %(return_values))
                                itchat.send_msg(return_values,toUserName=itcaht_user_name)
                                
                            except Exception as e:
                                logger.debug("信息发送失败,详细信息如下:")
                                logger.debug(e)
                    except Exception as e:
                        logger.debug("未找到该用户信息,详细信息如下:")
                        logger.debug(e)
    
                try:
                    # 更新信息
                    last_time = now_time + exe_time
                    timer_count = timer_count + 1
    
    
                    userid = userid[0]
    
                    sql = "update auto_timer set last_time = '%s' , next_time = '%s' , timer_count = '%s' where timer_id = '%s';" \
                    %(now_time,last_time,timer_count,userid)
    
                    save_db.exe_db(db,sql,logger)
    
                    logger.debug("更新信息完毕")
    
    
                except Exception as e:
                    logger.debug("消息已经发送,但是更新数据库失败,详细信息如下:")
                    logger.debug(e)
            else:
                logger.debug("判断时间未到")
    
            #修整60秒
            logger.debug("============================================================================================")
            time.sleep(3)
        time.sleep(60)
    
    if __name__ == "__main__" :
        
        #设置登陆的PK
        loginpk = 'login_' + str(time.time())
        
        #定义微信自动登录
        itchat.auto_login(enableCmdQR=2,statusStorageDir=loginpk)
    #    itchat.auto_login(statusStorageDir=loginpk)
    
        #获取自己的ID
        global MyID
        MyID = itchat.get_friends(update=True)[0]["UserName"]
    
        #获取用户信息
        global myUserName
        global mysex
        myUserName = itchat.get_friends()[0]['NickName']
        myUserUin = itchat.get_friends()[0]['Uin']
        mysignature = itchat.get_friends()[0]['Signature']
        mysex = itchat.get_friends()[0]['Sex']
    
        #设置logger
        global logger
        logname = str(myUserName) + '_' + str(myUserUin)
        logger = collections_logs.build_logs(logname)
        
        #定义数据库
        global db
        db = save_db.itchat_connect_mysql(logger)
        global redis_db
        redis_db = liwang_redis_itchat.connect_redis(logger)
    
        #每登陆一次,记录一次login
        logger.debug("记录登陆Log")
        sql = "insert into itchat_login_info values (%d,'%s','%s','%s','%s','%s');" %(int(time.time()),MyID,myUserName,mysignature,mysex,myUserUin)
        save_db.exe_db(db,sql,logger)
    
        #启动线程
        threading._start_new_thread(itchat.run,())
    
        #开始循环
        while 1:
            itchat.configured_reply()
            auto_reply()

    collections_logs.py

    #!/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    
    import logging
    import time
    
    def build_logs(file_name):
    
        #获取当前时间
        now_time = time.strftime("%Y_%m_%d_%H_%M_%S",time.localtime())
        #设置log名称
        logname = 'itchat' + file_name + now_time
        #定义logger
        logger = logging.getLogger()
        #设置级别为debug
        logger.setLevel(level = logging.DEBUG)
        #设置 logging文件名称
        handler = logging.FileHandler(logname)
        #设置级别为debug
        handler.setLevel(logging.DEBUG)
        #设置log的格式
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        #将格式压进logger
        handler.setFormatter(formatter)
        console = logging.StreamHandler()
        console.setLevel(logging.DEBUG)
        #写入logger
        logger.addHandler(handler)
        logger.addHandler(console)
        #将logger返回
        return logger

    save_db.py

    #!/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    
    import pymysql
    
    def itchat_connect_mysql(logger):
    
        try:
            db = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='itchat',charset='utf8')
            logger.debug('MySQL数据库连接成功')
            logger.debug('数据库ID:%s' %(db))
            return db
        except Exception as e:
            logger.debug('MySQL数据库连接失败')
            logger.debug(e)
    
    def exe_db(db,sql,logger):
        try:
            cursor = db.cursor()
            logger.debug('获取游标:%s' %(cursor))
            db.ping(reconnect=True)
            logger.debug("数据库重连")
            cursor.execute(sql)
            logger.debug("执行SQL")
            logger.debug(sql)
            db.commit()
            logger.debug("数据库commit")
        except Exception as e:
            logger.debug('SQL执行失败,请至日志查看该SQL记录')
            logger.debug(sql)
            logger.debug(e)
    
    def select_db(db,sql,logger):
        try:
            cursor = db.cursor()
            logger.debug('获取游标:%s' %(cursor))
            db.ping(reconnect=True)
            logger.debug("数据库重连")
            cursor.execute(sql)
            logger.debug("执行SQL")
            logger.debug(sql)
            return cursor.fetchall()
        except Exception as e:
            logger.debug('SQL执行失败,请至日志查看该SQL记录')
            logger.debug(sql)
            logger.debug(e)

    liwang_redis_itchat.py

    #!/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    
    import redis
    import json
    import requests
    import random
    
    
    def connect_redis(logger):
        try:
            r = redis.Redis('127.0.0.1',6379,0,'redis')
            logger.debug("Redis数据库连接成功")
            return r
        except Exception as e:
            logger.debug("Redis数据库连接失败")
            logger.debug(e)
    
    def return_rand_keys(redis_db,logger,keys):
    
        #判断是否有这个key
    
        if redis_db.exists(keys):
    
            len_keys = redis_db.llen(keys)
            
            random_int = int(random.randint(0,len_keys-1))
            logger.debug("Redis获取的随机值为:%s" %(random_int))
    
            return_redis_values = redis_db.lindex(keys,random_int).decode('utf-8')
            logger.debug("Redis将要返回的值为:%s" %(return_redis_values))
    
            return return_redis_values
    
        else:
            logger.debug("没有找到相关key")
            return 0
    
    def auto_box(message,redis_db,logger):
    
        #设置分隔符
        if '\u2005' in message :
            myid='@小子\u2005'
        else:
            myid = '@小子 '
    
        #获取分隔符之后的值
        if len(message.split(myid)) == 1 :
            return "@我了要说话哟"
        else:
            dealwith = message.split(myid)[1]
    
        if dealwith == " " :
            logger.debug("只是@了我,并没有输入内容")
        else:
            #图灵机器人API接口
            url_api = "http://openapi.tuling123.com/openapi/api/v2"
    
            #将问题赋值给box_quest
            box_quest = dealwith
    
            #定义json post 
            request_json = json.dumps ({
                "perception":
                {
                    "inputText":
                    {
                        "text": box_quest
                    },
    
                    "selfInfo":
                    {
                        "location":
                        {
                            "city": "成都",
                            "province": "成都",
                            "street": "天府三街"
                        }
                    }
                },
    
                "userInfo": 
                {
                    "apiKey": "APIkey",
                    "userId": "tuling"
                }
            }
            )
    
            # 获取POST之后的值
            r = requests.post(url_api, data=request_json)
            r_test = r.text
            r_json = json.loads(r_test)
    
            #定义返回值
            return_str = (r_json['results'][0]['values']['text'])
    
            logger.debug("图灵机器人将要返回的值:")
            logger.debug(return_str)
    
            #需要自己建立自己的机器人仓库
            logger.debug("将返回的结果存入Redis数据库中")
            redis_db.rpush(box_quest,return_str)
    
            return return_str

    实现的效果 

    如下以log展现

    接收消息并且保存至数据库

     

    图灵机器人回复 

    定时任务

    欢迎转发! 请保留源地址: https://www.cnblogs.com/NoneID
  • 相关阅读:
    整数转换成字符
    html总结(一)
    ssh服务
    原码反码补码图形化注意
    关于副本机制
    win8设置自动关机
    python制作的翻译器基于爬取百度翻译【笔记思路】
    python多线程扫描爆破网站服务器思路【笔记】
    用python实现多线程爬取影视网站全部视频方法【笔记】
    利用Python3的requests和re库爬取猫眼电影笔记
  • 原文地址:https://www.cnblogs.com/NoneID/p/9744502.html
Copyright © 2011-2022 走看看