背景:公众号换主体,要迁移,粉丝(openId)的业务数据要做处理.
第一步:参照我的另一篇文章,Java 导出微信公众号粉丝。
第二部:数据处理(master-worker模式)
程序主入口:Main
我导出来的粉丝文件格式是:
{ "info":[ {"openId":"ogVous494ltuNmO4zHb1seHeGLSk"}
..... 1万条 ] }
package changeOpenId; import java.util.List; import java.util.Map; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.odao.weixin.api.support.AccessTokenKit; import com.odao.weixin.site.cases2017.change.service.ChangeService; import com.odao.weixin.site.cases2017.push.entity.JsonDataReadUtil; /** * 多线程转换openId * @author wangfj */ public class ChangeMain { @SuppressWarnings({ "unchecked", "static-access", "resource" }) public static void main(String[] args) throws Exception { ApplicationContext appContext = new ClassPathXmlApplicationContext(new String[] {"odao-weixin-site-servlet.xml"}); String token = AccessTokenKit.getTokenNew("APPID", "APP秘钥"); String accesstoken = (String) ((Map) JSON.parseObject(token, Map.class)).get("access_token"); //根据粉丝文件来读取数据 JSONObject openIdJson = JsonDataReadUtil.getReadJsonByPath("第一步导出来的文件"); String info= openIdJson.get("info").toString(); JSONArray jsonArr = JSONObject.parseArray(info); List<Map<String,Object>> list = jsonArr.toJavaObject(jsonArr, List.class); //如果你们是直接操作数据库,可以直接写一个业务类,从数据库获取要转换的数据,如下: /*ChangeService changeService = (ChangeService) appContext.getBean("changeService"); List<Map<String,Object>> list = changeService.queryRecord(40000,50000);*/ //构造master ChangeMaster master = new ChangeMaster(new ChangeWorker(),100,accesstoken,appContext);//第二个参数100(worker工作线程数),这个根据你们自己的需求定 for(int i=0;i<list.size();i+=100){//微信转换openId接口,腾讯说是一次只能处理100条 List<Map<String,Object>> newList = list.subList(i, (i+100)>list.size()?list.size():(i+100)); master.submit(newList); } //多线程执行 master.execute(); while(true){ if(master.isComplate()){ long allTime = master.getResult(); System.out.println("主线程执行完毕,总耗时:"+allTime+"毫秒"); break; } } } }
Master:
package changeOpenId; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import org.springframework.context.ApplicationContext; /** * Master任务指派者,分发者 * @author wangfj */ public class ChangeMaster{ private ConcurrentLinkedQueue<List<Map<String,Object>>> workerQueue = new ConcurrentLinkedQueue<List<Map<String,Object>>>(); //存放所有的工作者 private HashMap<String, Thread> workers = new HashMap<String, Thread>(); //存放数据的结果集 private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String, Object>(); //构造master public ChangeMaster(Worker worker,int workerCount,String accessToken,ApplicationContext appContext){ worker.setApplicationContext(appContext); worker.setWorkerQueue(this.workerQueue); worker.setAccessToken(accessToken); worker.setResultMap(resultMap); for(int i=0;i<workerCount;i++){ this.workers.put("执行任务worker"+i,new Thread(worker,"子线程"+i)); } } //提交 public void submit(List<Map<String,Object>> list){ this.workerQueue.add(list); } //执行 public void execute(){ for(Map.Entry<String,Thread> me :workers.entrySet()){ me.getValue().start(); } } //获得执行结果集 public long getResult(){ long result = 0l; for(Map.Entry<String,Object> me :resultMap.entrySet()){ result += (Long)me.getValue(); } return result; } //所有子线程是否执行完毕 public boolean isComplate() { for(Map.Entry<String,Thread> me :workers.entrySet()){ if(Thread.State.TERMINATED != me.getValue().getState()){ return false; } } return true; } }
Worker(转换openId核心代码):
package changeOpenId; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import org.apache.commons.httpclient.HttpStatus; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.message.BasicHeader; import org.apache.http.protocol.HTTP; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.odao.weixin.site.cases2017.change.service.ChangeService; @Service public class ChangeWorker extends Worker{ public static String url = "http://api.weixin.qq.com/cgi-bin/changeopenid?access_token="; //微信提供中的openId转换接口 public static long handle(List<Map<String,Object>> list,String accessToken,ApplicationContext appContext) throws Exception { ChangeService changeService = (ChangeService) appContext.getBean("changeService");//我处理数据的业务类 long end = System.currentTimeMillis(); JSONObject params = new JSONObject(); params.put("from_appid", "xxxx");//此处from_appid为原帐号的appid ArrayList<String> openIds = new ArrayList<String>(); for(int i=0;i<list.size();i++){ openIds.add(list.get(i).get("openId").toString()); } params.put("openid_list", openIds);//需要转换的openid,即第1步中拉取的原帐号用户列表,这些必须是旧账号目前关注的才行,否则会出错;一次最多100个,不能多。格式:["openIdA","openIdB"] String reslut = JsonSMS(params.toString(),accessToken); JSONObject json = (JSONObject) JSONObject.parse(reslut); if("ok".equals(json.getString("errmsg"))){ String result_list = json.get("result_list").toString(); JSONArray arr= JSONObject.parseArray(result_list); List<Map<String,Object>> obj = arr.toJavaObject(arr, List.class); if(!obj.get(0).get("err_msg").equals("ori_openid error")){ List<Map<String,Object>> openIdObj = arr.toJavaObject(arr, List.class); changeService.batchUpdateAccountsMapping(openIdObj); } }else{ System.out.println("请求微信转换接口返回异常"); } return System.currentTimeMillis()-end; } public static String JsonSMS(String postData, String token) { String result = ""; try { //发送POST请求 URL urls = new URL(url.concat(token)); HttpURLConnection conn = (HttpURLConnection) urls.openConnection(); conn.setRequestMethod("POST"); conn.setRequestProperty("Content-Type", "application/json"); conn.setRequestProperty("Connection", "Keep-Alive"); conn.setUseCaches(false); conn.setDoOutput(true); conn.setRequestProperty("Content-Length", "" + postData.length()); OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream(), "UTF-8"); out.write(postData); out.flush(); out.close(); //获取响应状态 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { System.out.println("connect failed!"); return ""; } //获取响应内容体 String line; BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "utf-8")); while ((line = in.readLine()) != null) { result += line + " "; } in.close(); } catch (IOException e) { e.printStackTrace(System.out); } return result; } }
package changeOpenId; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import org.springframework.context.ApplicationContext; public class Worker implements Runnable{ private ConcurrentLinkedQueue<List<Map<String,Object>>> workerQueue; private ConcurrentHashMap<String,Object> resultMap; private ApplicationContext appContext; private String accessToken; public void setWorkerQueue(ConcurrentLinkedQueue<List<Map<String,Object>>> workerQueue) { this.workerQueue = workerQueue; } public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { this.resultMap = resultMap; } public void setAccessToken(String accessToken) { this.accessToken = accessToken; } @Override public void run() { while(true){ List<Map<String,Object>> input = this.workerQueue.poll(); if(input==null) break; try { Random random = new Random(); long time = ChangeWorker.handle(input,accessToken,appContext); resultMap.put(String.valueOf(random.nextInt(100)), time); } catch (Exception e) { e.printStackTrace(); } } } @SuppressWarnings("unused") private static void handle(List<String> list) {} public void setApplicationContext(ApplicationContext appContext) { this.appContext = appContext; } }
处理数据的业务类:
package com.odao.weixin.site.cases2017.change.service; import java.sql.SQLException; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; @Service public class ChangeService { static final Logger logger = LoggerFactory.getLogger(ChangeService.class); @Autowired private JdbcTemplate jdbcTemplateWebSiteActivityDB; /** * 批量更新玩家openId * @param list */ public void batchUpdateAccountsMapping(final List<Map<String,Object>> list) { String sql = "update 表名 set newOpenId=? ,createTime=getdate() where openId=?"; jdbcTemplateWebSiteActivityDB.batchUpdate(sql, new BatchPreparedStatementSetter() { public int getBatchSize() { return list.size(); //这个方法设定更新记录数,通常List里面存放的都是我们要更新的,所以返回list.size(); } public void setValues(java.sql.PreparedStatement ps, int i) throws SQLException { try{ ps.setString(1, list.get(i).get("new_openid").toString()); ps.setString(2, list.get(i).get("ori_openid").toString()); }catch(Exception e){ } } }); System.out.println(Thread.currentThread().getName()+"成功改变条数:"+list.size()); } }
业务类操作的表字段最好加上索引,基本上几秒钟几万数据就跑完了。