zoukankan      html  css  js  c++  java
  • Python阿里云消息推送调用API

    很多公司测试APP推送时候,应该也是很头疼:推送环境:测试、正式,稍不注意就把测试的push到正式上,导致所有用户都收到

    例子很多:

    其实阿里、极光都有推送Api,直接调用API就ok,特别是有的公司有很多APP,直接调用API最方便:

    代码量不多,直接贴代码:

     

    config

      aliyun.ini   配置阿里云的各种参数:各种key

      data_news.py  维护推送的数据

      devices.ini  维护需要推送的pushid

    common_push.py  api推送的封装

    get_api_push.py  查询推送的结果

    rmt_push.py   推送

    aliyun.ini

    # aliyun config
    
    
    [fm]
    accessKeyId = ''
    accessKeySecret = ''
    appKey = ''
    regionId = ''
    
    
    [hy]
    accessKeyId = ''
    accessKeySecret = ''
    appKey = ''
    regionId = ''
    
    
    [hlj]
    accessKeyId = ''
    accessKeySecret = ''
    appKey = ''
    regionId = ''
    devices.ini
    # devices config
    
    [fm]
    android = 655b4e0a231742b7830ac658eb46c979
    iOS = ''
    
    
    [hy]
    android = 18937640de964cffa3f770d3c997e8b5
    iOS = ''
    
    
    [hlj]
    android = 23665a1c1a0746ababf408385f474d22,3296d6ce68124fa184c8c4bc6f1c66e3
    iOS = ''

    data_news.# coding=utf-8

    # get news
    
    
    # 获取推送新闻
    def get_news(system_name, new_type):
        """
        :param system_name:
        :param new_type: audio_news:xx新闻; subject_news:xx新闻; link_news:xx新闻; collect_news:xx新闻; ordinary_news:xx新闻
        :return:
        """
    
        news = ''
        # 
        if system_name == 'fm':
            # xx新闻
            if new_type == 'audio_news':
                news = {
                    'title': '中西机场',
                    'content': '2018年12月客吞吐量突破5000万人次',
                    'url_to': 2,
                    'detail_id': 28ert36,
                    'un_read': 2,
                    'flag': 2
                }
            # xx新闻
            if new_type == 'subject_news':
                news = {
                    'title': '中西部唯双流国际机场',
                    'content': '20年旅客吞吐量突破5000万人次',
                    'url_to': 1,
                    'detail_id': 1500erter59,
                    'un_read': 1,
                    'flag': 0
                }
            # xx新闻
            if new_type == 'link_news':
                news = {
                    'title': '中西部唯机场',
                    'content': '2018年12年旅客吞吐量突破5000万人次',
                    'url_to': 1,
                    'detail_id': 1501ret263,
                    'un_read': 1,
                    'flag': 14
                }
            # xx新闻
            if new_type == 'collect_news':
                news = {
                    'title': '高考语文炉',
                    'content': '高考语文作文题目纷选一:新时代青年、绿水青山图;上海:谈谈“被需要”的心态>>',
                    'url_to': 2,
                    'detail_id': 3289,
                    'un_read': 1,
                    'flag': 1
                }
            # xx新闻
            if new_type == 'ordinary_news':
                news = {
                    'title': '中西国际机场',
                    'content': '2018年12月11日,成量突破5000万人次',
                    'url_to': 1,
                    'detail_id': 1345398,
                    'un_read': 1,
                    'flag': 0
                }
      return news

      

    common_push.py
    #!/usr/bin/python
    # coding=utf-8
    
    import json
    from datetime import *
    import configparser
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkpush.request.v20160801 import PushRequest
    
    
    aly_path = "C:\\python\\thecover_project\\aliyun_push\\config\\aliyun.ini"
    device_path = "C:\\python\\thecover_project\\aliyun_push\\config\\devices.ini"
    
    aly_config = configparser.ConfigParser()
    aly_config.read(aly_path)
    
    device_config = configparser.ConfigParser()
    device_config.read(device_path)
    
    
    def ali_yun_push(rmt_name, push_system, push_content):
        """
        :param rmt_name: 推送APP的别称
        :param push_system: iOS或android
        :param push_content: 需要push的内容, 字典格式
        :return:
        """
    
        push_news = json.dumps(push_content, ensure_ascii=False)
    
        access_key_id = aly_config[rmt_name]['accessKeyId']
        access_key_secret = aly_config[rmt_name]['accessKeySecret']
        region_id = aly_config[rmt_name]['regionId']
        app_key = int(aly_config[rmt_name]['appKey'])
    
        clt = AcsClient(access_key_id, access_key_secret, region_id)
        request = PushRequest.PushRequest()
    
        """
        # 阿里推送参数参考:https://help.aliyun.com/knowledge_detail/48089.html
        """
    
        request.set_AppKey(app_key)
        # 推送方式 这里是DEVICE,只给指定的设备推送,就不会出现测试推送给所有用户了
        request.set_Target('DEVICE')
        request.set_accept_format('json')
        request.set_action_name('PUSH')
        # 推送标题/内容
        request.set_Title(push_content['title'])
        request.set_Body(push_news)
    
        # iOS推送
        if push_system == 'iOS':
            request.set_TargetValue(device_config[rmt_name]['iOS'])
            request.set_DeviceType("iOS")
            request.set_PushType("NOTICE")
            # iOS应用图标右上角角标
            request.set_iOSBadge(0)
            request.set_iOSRemindBody(push_content['content'])
            request.set_IOSMusic("default")
            # 环境信息 DEV:表示开发环境,PRODUCT:表示生产环境
            request.set_IOSApnsEnv("PRODUCT")
            request.set_IOSExtParameters(push_news)
            request.set_StoreOffline(True)
            request.set_IOSRemind(True)
    
        # android推送
        if push_system == 'android':
            request.set_TargetValue(device_config[rmt_name]['android'])
            request.set_DeviceType("ANDROID")
            request.set_PushType("MESSAGE")
            # 辅助通道弹窗配置
            request.set_StoreOffline(True)
            request.set_AndroidNotificationChannel('1')
            request.set_AndroidRemind(True)
            request.set_AndroidPopupTitle(push_content['title'])
            request.set_AndroidPopupBody(push_content['content'])
            request.set_AndroidPopupActivity("XXXXActivity")
            request.set_AndroidExtParameters(push_news)
            request.set_AndroidNotifyType("SOUND")
            # 通知栏自定义样式1-100
            request.set_AndroidNotificationBarType(1)
            request.set_AndroidOpenType("ACTIVITY")
            # Android通知声音
            request.set_AndroidMusic("default")
    
        # 推送控制
        # 30秒之后发送, 也可以设置成你指定固定时间
        push_date = datetime.utcnow() + timedelta(seconds=+30)
        # 24小时后消息失效, 不会再发送
        expire_date = datetime.utcnow() + timedelta(hours=+24)
        push_time = push_date.strftime("%Y-%m-%dT%XZ")
        expire_time = expire_date.strftime("%Y-%m-%dT%XZ")
    
        request.set_PushTime(push_time)
        request.set_ExpireTime(expire_time)
    
        result = clt.do_action_with_exception(request)
    
        return str(result, encoding='utf-8')
    

     

    rmt_push.py
    #!/usr/bin/python
    # coding=utf-8
    
    
    import config.data_news as news
    import common_push as push
    
    """
    #  xxx
    """
    
    
    """
    # xxAPP推送
    news = news.get_news('fm', 'ordinary_news')
    rs = push.ali_yun_push('fm', 'android', news)
    """
    
    """
    # XXAPP推送
    news = news.get_news('hy', 'ordinary_news')
    rs = push.ali_yun_push('hy', 'android', news)
    """
    
    
    # xxAPP推送
    news = news.get_news('hlj', 'ordinary_news')
    rs = push.ali_yun_push('hlj', 'android', news)
    
    print(rs)
    

      

    差不多了,还不懂的自己去api接口认真看看,我这主要用的是辅助通道 

    本文来自博客园,作者:drewgg,转载请注明原文链接:https://www.cnblogs.com/drewgg/p/12574972.html

  • 相关阅读:
    Go语言入门系列(三)之数组和切片
    详解Java的对象创建
    Go语言入门系列(二)之基础语法总结
    Go语言入门系列(一)之Go的安装和使用
    SpringCloud--Ribbon--配置详解
    自己动手作图深入理解二叉树、满二叉树及完全二叉树
    自已动手作图搞清楚AVL树
    《RabbitMQ》什么是死信队列
    《RabbitMQ》如何保证消息不被重复消费
    《RabbitMQ》如何保证消息的可靠性
  • 原文地址:https://www.cnblogs.com/drewgg/p/12574972.html
Copyright © 2011-2022 走看看