因为项目里有许多发送短信的地方,所以我们就把 发送短信的功能单独拿出来,做成一个service给其他app使用。
程序的大概流程是这样的 :
首先 我选择生产者模式创建队列。
@Component public class SmsQueueFactory { private static SmsQueueFactory smsQueueFactory =null; static final BlockingQueue<SmsMessageDto> messageQueue =new ArrayBlockingQueue<SmsMessageDto>(1024); private SmsQueueFactory (){}; private static Logger log = LoggerFactory.getLogger(SmsQueueFactory.class); /** * 获取对象 * @return */ public static SmsQueueFactory getSmsQueueFactory() { if (smsQueueFactory == null) { smsQueueFactory = new SmsQueueFactory(); } return smsQueueFactory; } public BlockingQueue<SmsMessageDto> getMessageQueue(){ return this.messageQueue; } public void setMessageDto(SmsMessageDto messageDto) { try { this.messageQueue.offer(messageDto, 100, TimeUnit.MILLISECONDS); }catch (Exception e){ log.error("setMessageDto出现错误"+e.getMessage()); } } }
一个接收器来 接收请求,并保存到数据库, 我在这里只是 写了一些伪代码,关键的地方我也会写上去的:
@Service("smsAcceptorImpl") public class SmsAcceptorImpl {
public void sendSms(SendMessageReq req) throws Exception{
validation(req); //校验参数
SmsMessageDto sms = genSmsObj(req);//封装成 message 对象
if (sms==null){
return;
}
sms= saveMessage(sms); // 先保存到数据库这样可以拿到 这个消息对象的 ID,方便以后使用。
sendToQueue(sms);// 放到队列
}
/**
* 添加到发送队列
* @param
*/
private void sendToQueue(SmsMessageDto messageDto) {
try {
BlockingQueue<SmsMessageDto> queue =SmsQueueFactory.getSmsQueueFactory().getMessageQueue();
queue.offer(messageDto, 100, TimeUnit.MILLISECONDS);
}catch (Exception e){
log.error("加入队列出错"+e.getMessage());
}
}
}
然后写一个 init() 方法 ,定时到 队列中 看看,有没有可以发送的消息。
@Component public class SmsQueueProcess { @Autowired private SmsSupplierSelector smsSupplierSelector; private static Logger log = LoggerFactory.getLogger(SmsQueueProcess.class); @PostConstruct public void init(){ getMesForQueue(); } //短信量不大 暂时不使用多线程处理 private void process(SmsMessageDto sms){ ISupplier supplier = smsSupplierSelector.getSupplier(sms); supplier.validation(sms); // 验证的方法就不写了 supplier.send(sms); // 重点 把 这发送的代码展示一下 } public void getMesForQueue() { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1); fixedThreadPool.execute(new getQueueList()); } class getQueueList implements Runnable{ //todo 获取 队列中的message 对象 public void getQueueList() throws InterruptedException{ while(true){ BlockingQueue<SmsMessageDto> queue = SmsQueueFactory.getSmsQueueFactory().getMessageQueue(); SmsMessageDto smsMessage=queue.take(); process(smsMessage); } } @Override public void run() { try{ getQueueList(); }catch (Exception e){ log.error("获取队列 中的对象出错"+e.getMessage()); } } }
// 这个类是 运营商选择器,默认是使用 阿里大于,如果发送失败了,就使用其他的(企信通),目前就只选用了两个运营商 @Component public class SmsSupplierSelector { @Autowired private ISupplier smsSupplierAliYu ; @Autowired private ISupplier smsSupplierQxt ; private final Map<String ,ISupplier> suppliers = new HashMap<>(); @PostConstruct private void init(){ suppliers.put("alidayu",smsSupplierAliYu); suppliers.put("qxt",smsSupplierQxt); } public ISupplier getSupplier(SmsMessageDto sms) { if(StringUtil.isBlank(sms.getSupplier())){ sms.setSupplier("alidayu"); return smsSupplierAliYu; } HashMap<String ,ISupplier> cloneSuppliers = new HashMap<>(suppliers); cloneSuppliers.remove(sms.getSupplier()); String[] keys = cloneSuppliers.keySet().toArray(new String[0]); Random random = new Random(); String randomKey = keys[random.nextInt(keys.length)]; sms.setSupplier(randomKey); return cloneSuppliers.get(randomKey); } }
运营商选择器,返回的是运营商对象。两个运营商,每个运营商都有自己的验证和发送的方法,这里展示一下 ,发送的方法。
@Override public void send(SmsMessageDto sms) { //todo 使用企信通的发送 try { String message=sms.getMessage(); String params = "DesMobile="+sms.getPhone()+"&Content=" + URLEncoder.encode(message, "GBK") +"&t="+ new Date().getTime(); String logMes = HttpClientUtil.doGet(GET_URL + params); sms.setSendCount(sms.getSendCount()==null ?0:(sms.getSendCount()+ 1)); //发送的次数加1 String code = XmlUtil.getContentFromXml(logMes, "code"); if (!("01".equals(code) || "03".equals(code))) { sms.setStatus(2); //发送失败 }else{ sms.setStatus(3); //发送成功 } } catch (Exception e) { log.error("短信发送失败!方法名[send]"+e.getMessage()); }finally { SmsMessage smsMessage=new SmsMessage(); smsMessage.setId(sms.getId()); smsMessage.setStatus(sms.getStatus()); smsMessage.setSendCount(sms.getSendCount()); smsMessage.setUpdateTime(new Date()); smsMessage.setSupplier(sms.getSupplier()); smsMessage.setRequestJson(JsonUtil.bean2Json(sms)); messageService.updateStatus(smsMessage); } }
@Override public void send(SmsMessageDto sms) { //todo 使用阿里大于发送 TaobaoClient client = new DefaultTaobaoClient("http://gw.api.taobao.com/router/rest", "app-key", "app-secret"); AlibabaAliqinFcSmsNumSendRequest req = new AlibabaAliqinFcSmsNumSendRequest(); req.setExtend(sms.getId()+"sms"); req.setSmsType("normal"); Map<String,String > stringStringMap=sms.getSmsParamMap(); req.setSmsFreeSignName(stringStringMap.get("product")); req.setSmsParamString(JsonUtil.bean2Json(stringStringMap)); req.setRecNum(sms.getPhone() ); sms.setSendCount(sms.getSendCount()==null ?0:(sms.getSendCount()+ 1)); req.setSmsTemplateCode(sms.getTemplateCode()); try { client.execute(req); sms.setSendCount(sms.getSendCount()==null ?(1):(sms.getSendCount()+ 1)); sms.setStatus(1); SmsMessage smsMessage=new SmsMessage(); smsMessage.setStatus(sms.getStatus()); // 发送状态 smsMessage.setId(sms.getId()); smsMessage.setUpdateTime(new Date()); smsMessage.setSendCount(sms.getSendCount()); smsMessage.setSupplier(sms.getSupplier()); smsMessage.setRequestJson(JsonUtil.bean2Json(sms)); messageService.updateStatus(smsMessage); } catch (ApiException e) { log.error("阿里大于发送消息失败,类名[SmsSupplierAliYu]"+e.getMessage()); } }
企信通的发送是实时接收消息的状态,阿里大鱼是异步的,所以就需要有个守护线程,查询数据库,和阿里服务器,修改消息的状态。
@Scheduled(cron="0/10 * * * * ? ") // 获取消息的返回参数 public void ckMessageStatusTask() throws Exception { try { //todo 守护线程 ,定时查询数据库,未发送成功的加入队列,重新发送 BlockingQueue<SmsMessageDto> queue = SmsQueueFactory.getSmsQueueFactory().getMessageQueue(); List<SmsMessage> smsMessageList = messageService.selectByStatusAndSendCount();// 要按照时间排序 List<SmsMessage> listAl =new ArrayList<>(); for (SmsMessage smsMessage : smsMessageList) { if (aLiDaYu.equals(smsMessage.getSupplier())){ listAl.add(smsMessage); } } if (null !=listAl && listAl.size()>0){ getMessageStatusForAli(listAl); } smsMessageList.removeAll(listAl); for (int j = 0; j < smsMessageList.size(); j++) { SmsMessage smsMessage=smsMessageList.get(j); if (smsMessage.getStatus()==2 || smsMessage.getStatus()==0){ // SmsMessageDto smsMessageDto = JsonUtil.json2Bean(smsMessage.getRequestJson(), SmsMessageDto.class); queue.offer(smsMessageDto, 100, TimeUnit.MILLISECONDS); } } } catch (Exception e) { log.error("短信守护线程出错,方法名[ckMessageStatusTask]" + e.getMessage()); } } private void getMessageStatusForAli( List<SmsMessage> smsMessageList){ //todo 从阿里大鱼 查询消息的状态 BlockingQueue<SmsMessageDto> queue = SmsQueueFactory.getSmsQueueFactory().getMessageQueue(); // 这里需要在阿里大于 网站上注册一个用户,并创建应用。使用上面的app证书 创建 client TaobaoClient client = new DefaultTaobaoClient("http://gw.api.taobao.com/router/rest", "app-key", "app-secret"); AlibabaAliqinFcSmsNumQueryRequest req = new AlibabaAliqinFcSmsNumQueryRequest(); SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); Date date = new Date(); try { for (SmsMessage smsMessage:smsMessageList){ String extend=smsMessage.getId()+"sms"; req.setRecNum(smsMessage.getPhone()); req.setQueryDate(sdf.format(date)); req.setCurrentPage(1L); // 当前页码 req.setPageSize(10L); AlibabaAliqinFcSmsNumQueryResponse rsp = client.execute(req); List<AlibabaAliqinFcSmsNumQueryResponse.FcPartnerSmsDetailDto> smsDetailDtos=rsp.getValues(); if (null !=smsDetailDtos){ for (AlibabaAliqinFcSmsNumQueryResponse.FcPartnerSmsDetailDto fcPartnerSmsDetailDto : smsDetailDtos) { // 只有一个 消息 if (extend.equals( fcPartnerSmsDetailDto.getExtend())) { Integer state = Integer.parseInt(Long.toString(fcPartnerSmsDetailDto.getSmsStatus())); if (state==3) smsMessageList.remove(smsMessage); if (state==2){ SmsMessageDto smsMessageDto = JsonUtil.json2Bean(smsMessage.getRequestJson(), SmsMessageDto.class); queue.offer(smsMessageDto, 100, TimeUnit.MILLISECONDS); } smsMessage.setStatus(state); messageService.updateMessageStatusById(smsMessage); // 添加日志 SmsLog smsLog=new SmsLog(); smsLog.setCreateTime(new Date()); smsLog.setMessageId(smsMessage.getId()); smsLog.setMessageDetails(smsMessage.getMessage()); smsLog.setMessageType("(ali)修改记录"); smsLog.setMessageStatus(smsMessage.getStatus()); smsLog.setMessageSupplier("ali"); messageLogService.insertSelective(smsLog); } } } } }catch (Exception e){ log.error("查询消息状态出错,方法名[getMessageStatusForAli]"+e.getMessage()); } }
基本上这么多,就已经把这个服务的代码都展示出来了。