zoukankan      html  css  js  c++  java
  • Java 微信公众号迁移

    背景:公众号换主体,要迁移,粉丝(openId)的业务数据要做处理.

    第一步:参照我的另一篇文章,Java 导出微信公众号粉丝。

    第二部:数据处理(master-worker模式)

    程序主入口:Main

    我导出来的粉丝文件格式是:

    {
        "info":[
            {"openId":"ogVous494ltuNmO4zHb1seHeGLSk"}
          ..... 1万条 ] }
    package changeOpenId;
    
    import java.util.List;
    import java.util.Map;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.odao.weixin.api.support.AccessTokenKit;
    import com.odao.weixin.site.cases2017.change.service.ChangeService;
    import com.odao.weixin.site.cases2017.push.entity.JsonDataReadUtil;
    
    /**
     * 多线程转换openId
     * @author wangfj
     */
    public class ChangeMain {  
        @SuppressWarnings({ "unchecked", "static-access", "resource" })
        public static void main(String[] args) throws Exception {  
            ApplicationContext appContext = new ClassPathXmlApplicationContext(new String[] {"odao-weixin-site-servlet.xml"});
            String token = AccessTokenKit.getTokenNew("APPID", "APP秘钥");
            String accesstoken = (String) ((Map) JSON.parseObject(token, Map.class)).get("access_token");
            //根据粉丝文件来读取数据
            JSONObject openIdJson = JsonDataReadUtil.getReadJsonByPath("第一步导出来的文件");
            String info= openIdJson.get("info").toString();
            JSONArray jsonArr = JSONObject.parseArray(info);
            List<Map<String,Object>> list = jsonArr.toJavaObject(jsonArr,  List.class);
           //如果你们是直接操作数据库,可以直接写一个业务类,从数据库获取要转换的数据,如下:
            /*ChangeService changeService = (ChangeService) appContext.getBean("changeService");
            List<Map<String,Object>> list = changeService.queryRecord(40000,50000);*/
            //构造master
            ChangeMaster master = new ChangeMaster(new ChangeWorker(),100,accesstoken,appContext);//第二个参数100(worker工作线程数),这个根据你们自己的需求定
            for(int i=0;i<list.size();i+=100){//微信转换openId接口,腾讯说是一次只能处理100条
                List<Map<String,Object>> newList = list.subList(i, (i+100)>list.size()?list.size():(i+100));
                master.submit(newList);
            }
            //多线程执行
            master.execute();
            while(true){
                if(master.isComplate()){
                    long allTime = master.getResult();
                    System.out.println("主线程执行完毕,总耗时:"+allTime+"毫秒");
                    break;
                }
            }
        }
      
    }  

     Master:

    package changeOpenId;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import org.springframework.context.ApplicationContext;
    
    /**
     * Master任务指派者,分发者 
     * @author wangfj
     */
    public class ChangeMaster{
        
        private ConcurrentLinkedQueue<List<Map<String,Object>>> workerQueue =  new ConcurrentLinkedQueue<List<Map<String,Object>>>();
        
        //存放所有的工作者
        private HashMap<String, Thread> workers = new HashMap<String, Thread>();
        
        //存放数据的结果集
        private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String, Object>();
        
        //构造master
        public ChangeMaster(Worker worker,int workerCount,String accessToken,ApplicationContext appContext){
            worker.setApplicationContext(appContext);
            worker.setWorkerQueue(this.workerQueue);
            worker.setAccessToken(accessToken);
            worker.setResultMap(resultMap);
            for(int i=0;i<workerCount;i++){
                this.workers.put("执行任务worker"+i,new Thread(worker,"子线程"+i));
            }
        }
        
        //提交
        public void submit(List<Map<String,Object>> list){
            this.workerQueue.add(list);
        }
        
        //执行
        public void execute(){
            for(Map.Entry<String,Thread> me :workers.entrySet()){
                me.getValue().start();
            }
        }
        
        //获得执行结果集
        public long getResult(){
            long result = 0l;
            for(Map.Entry<String,Object> me :resultMap.entrySet()){
                result += (Long)me.getValue();
            }
            return result;
        }
        
        //所有子线程是否执行完毕
        public boolean isComplate() {
            for(Map.Entry<String,Thread> me :workers.entrySet()){
                if(Thread.State.TERMINATED != me.getValue().getState()){
                    return false;
                }
            }
            return true;
        }
    }

     Worker(转换openId核心代码):

    package changeOpenId;
    
    import java.io.BufferedReader;
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.io.OutputStreamWriter;
    import java.net.HttpURLConnection;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Map;
    import org.apache.commons.httpclient.HttpStatus;
    import org.apache.http.HttpResponse;
    import org.apache.http.client.methods.HttpPost;
    import org.apache.http.entity.StringEntity;
    import org.apache.http.impl.client.DefaultHttpClient;
    import org.apache.http.message.BasicHeader;
    import org.apache.http.protocol.HTTP;
    import org.springframework.context.ApplicationContext;
    import org.springframework.stereotype.Service;
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.odao.weixin.site.cases2017.change.service.ChangeService;
    
    @Service
    public class ChangeWorker extends Worker{
        
        public static String url = "http://api.weixin.qq.com/cgi-bin/changeopenid?access_token=";  //微信提供中的openId转换接口
        
        public static long handle(List<Map<String,Object>> list,String accessToken,ApplicationContext appContext) throws Exception {
            ChangeService changeService = (ChangeService) appContext.getBean("changeService");//我处理数据的业务类
            long end = System.currentTimeMillis();
            JSONObject params = new JSONObject();
            params.put("from_appid", "xxxx");//此处from_appid为原帐号的appid
            
            ArrayList<String> openIds = new ArrayList<String>();
            for(int i=0;i<list.size();i++){
                openIds.add(list.get(i).get("openId").toString());
            }
            
            params.put("openid_list", openIds);//需要转换的openid,即第1步中拉取的原帐号用户列表,这些必须是旧账号目前关注的才行,否则会出错;一次最多100个,不能多。格式:["openIdA","openIdB"]
            
            String reslut = JsonSMS(params.toString(),accessToken);
            JSONObject json = (JSONObject) JSONObject.parse(reslut);
            if("ok".equals(json.getString("errmsg"))){
                String result_list = json.get("result_list").toString();
                JSONArray arr= JSONObject.parseArray(result_list);
                List<Map<String,Object>> obj = arr.toJavaObject(arr,  List.class);
                if(!obj.get(0).get("err_msg").equals("ori_openid error")){
                    List<Map<String,Object>> openIdObj = arr.toJavaObject(arr,  List.class);
                    changeService.batchUpdateAccountsMapping(openIdObj);
                }
            }else{
                System.out.println("请求微信转换接口返回异常");
            }
            return System.currentTimeMillis()-end;
        }
        
        public static String JsonSMS(String postData, String token) {
            String result = "";
            try {
                //发送POST请求
                URL urls = new URL(url.concat(token));
                HttpURLConnection conn = (HttpURLConnection) urls.openConnection();
                conn.setRequestMethod("POST");
                conn.setRequestProperty("Content-Type", "application/json");
                conn.setRequestProperty("Connection", "Keep-Alive");
                conn.setUseCaches(false);
                conn.setDoOutput(true);
                conn.setRequestProperty("Content-Length", "" + postData.length());
                OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream(), "UTF-8");
                out.write(postData);
                out.flush();
                out.close();
                //获取响应状态
                if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
                    System.out.println("connect failed!");
                    return "";
                }
                //获取响应内容体
                String line;
                BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "utf-8"));
                while ((line = in.readLine()) != null) {
                    result += line + "
    ";
                }
                in.close();
            } catch (IOException e) {
                e.printStackTrace(System.out);
            }
            return result;
        }
    }
    package changeOpenId;
    
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import org.springframework.context.ApplicationContext;
    
    public class Worker implements Runnable{
        
        private ConcurrentLinkedQueue<List<Map<String,Object>>> workerQueue;
        
        private ConcurrentHashMap<String,Object> resultMap;
        
        private ApplicationContext appContext;
        
        private String accessToken;
        
        public void setWorkerQueue(ConcurrentLinkedQueue<List<Map<String,Object>>> workerQueue) {
            this.workerQueue = workerQueue;
        }
        
        public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
            this.resultMap = resultMap;
        }
    
        public void setAccessToken(String accessToken) {
            this.accessToken = accessToken;
        }
    
        @Override
        public void run() {
            while(true){
                List<Map<String,Object>> input = this.workerQueue.poll();
                if(input==null) break;
                try {
                    Random random = new Random();
                    long time = ChangeWorker.handle(input,accessToken,appContext);
                    resultMap.put(String.valueOf(random.nextInt(100)), time);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        @SuppressWarnings("unused")
        private static void handle(List<String> list) {}
    
        public void setApplicationContext(ApplicationContext appContext) {
            this.appContext = appContext;
            
        }
    }

    处理数据的业务类:

    package com.odao.weixin.site.cases2017.change.service;
    
    import java.sql.SQLException;
    import java.util.List;
    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.jdbc.core.BatchPreparedStatementSetter;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ChangeService {
        
        static final Logger logger = LoggerFactory.getLogger(ChangeService.class);
        
        @Autowired
        private JdbcTemplate jdbcTemplateWebSiteActivityDB;
        
        /**
         * 批量更新玩家openId
         * @param list
         */
        public void batchUpdateAccountsMapping(final List<Map<String,Object>> list) {  
            String sql = "update 表名 set newOpenId=? ,createTime=getdate() where openId=?";  
            jdbcTemplateWebSiteActivityDB.batchUpdate(sql, new BatchPreparedStatementSetter() {  
                public int getBatchSize() {  
                    return list.size();  
                    //这个方法设定更新记录数,通常List里面存放的都是我们要更新的,所以返回list.size();  
                }  
                public void setValues(java.sql.PreparedStatement ps, int i) throws SQLException {
                    try{
                      ps.setString(1, list.get(i).get("new_openid").toString());  
                      ps.setString(2, list.get(i).get("ori_openid").toString());
                    }catch(Exception e){
                        
                    }
                }  
            });  
            System.out.println(Thread.currentThread().getName()+"成功改变条数:"+list.size());
        }
    }

     业务类操作的表字段最好加上索引,基本上几秒钟几万数据就跑完了。

  • 相关阅读:
    JAVA中的除法运算
    虚拟内存的设置和相关问题的解决方法
    div + css + js 打造HTML的tab控件
    body居中 兼容ie和ff
    js 获取当前页面源代码
    windows系统的全部命令
    HR线条样式CSS定制
    PHP5.3.5以及Apache2.2.17安装简介
    如何使用apache在本地服务器虚拟域名来测试网站
    CakePHP常用技巧总结
  • 原文地址:https://www.cnblogs.com/wangfajun/p/8399677.html
Copyright © 2011-2022 走看看