后端用的是c#,所以长链接这块用的是signalR。公司的前端是用flutter的,也有线程的signalR的插件。可惜会出现一些问题,决定自己封装一个。这里就简单介绍一下android原生封装signalR吧
这边实现了,心跳机制,断线重连,消息去重发送,连接状态等。
先封装了hubConnection,然后在这层实现了心跳。这一块必须得扯上后端,后端实现了一个方法,收到什么参数,马上就把这个参数传回来。然后就用这个方法实现心跳,发送一个消息给服务器,服务器收到这个消息。记录下发出时间和接收时间,不小于自己设定的时间间隔,则认定网络状态有效。当心跳无效的时候就把连接状态置为false,表示连接断开。其实他提供了一个回调oncloed。当连接关闭的时候会调用这个回调。但是不能太依赖这个,所以自己写了心跳来确保连接。下为心跳的逻辑。
while(isRunning){ long ping = System.currentTimeMillis()/1000; //发送心跳包 try{ hubConnection.send("Echo",String.valueOf(ping)); }catch (Exception e){ connectStatus = false; } //心跳延时 try { Thread.sleep(heartDelay); } catch (InterruptedException e) { e.printStackTrace(); } //最后一次接收消息时间小于发送心跳时间, //起码在心跳时间内,没有收到包。 if(lastRecvTime < ping){ long delay = System.currentTimeMillis()/1000 - ping; //时间差大于重连时间的时候,判定为超时,连接状态置为false if(delay > KeepAliveTimeOutSecond){ connectStatus = false; }else { connectStatus = true; } }else { connectStatus = true; } }
这个isRunning则表明需不需要进行心跳检测,当连接断开的时候当然是不必要的啦。(ps,来自后端大佬的一个建议,死循环线程里要加一个try,避免他因为错误而中断循环)。
然后开放了三个方法,开始连接,断开连接,发送消息。
/** * 开放的三个方法 * */ public void send(String method,Object... message){try{ hubConnection.send(method,message); }catch (Exception e){ connectStatus = false; } } public void stopConnect(){ isRunning = false; connectStatus = false; hubConnection.stop(); } public void startConnect(){ Log.i(TAG,"start connect this message from SignalRSession"); hubConnection = HubConnectionBuilder.create(url) .build(); setOn(); hubConnection.start().blockingAwait(); heartCheck(); isRunning = true; }
发送消息就不多说了,就是包一下。这里加try是为了保证特殊原因连接丢失的情况下,调用send方法不会出错。
断开连接的时候把心跳循环停掉,连接状态也是理所当然的变成false,然后是hubConnection的stop。
建立连接的话,就是把url传入,这里的url是在这个类初始化的时候拿到的。setOn是我自己写的建立监听的函数,发送过来消息都会在setOn中收到,然后通过handler发出去。然后开始的时候要建立心跳连接。当然这块可以放到初始化里。可以优化下。
public SignalRChannel(String url1, android.os.Handler handler) { this.url = url1; this.receiveHandler = handler; }
这是这个类的构造器,url用来建立连接就不多说。这个handler是为了发送消息以及更上层接收消息。
到此为止,第一层封装完了。
接下来是第二层,实现了断线重连,消息去重,记录数据库等操作。数据库选用的框架用的是room。
这一块操作比较多,可能会讲的有点乱。到时候可以看看我的demo消化下。
public ReliableClient(String url1, Context context) { this.url = url1; this.context = context; //创建数据库,如果存在不会重复创建 db = Room.databaseBuilder(context, AppDatabase.class, "database-name").build(); recordDb = Room.databaseBuilder(context, recordDatabase.class,"database-name1").build(); loadData(); logFile = new LogFile(context); Thread t = new Thread(runnableSend); t.start(); }
这个是构造器,第一个数据库用来存收到的数据,第二个数据库用来处理进度(处理到第几个数据了) 。loadData是获取进度,即刚刚的数据库。logFile是我自己写的类,用于写日志。然后这个线程启动的是短线重连。这里一个ReliableClient可以用单例来实现。
private void loadData() { Runnable runnable = new Runnable() { @Override public void run() { if(recordDb.recordDao().databaseCount()<1){ //数据库没有数据,设置为默认值 curRecvSeq = -1; authMessage = null; Log.i(TAG,"load <1 "); }else if(recordDb.recordDao().databaseCount() == 1){ //数据库一条数据,取这条数据 recordData messageData = recordDb.recordDao().getRecord(); curRecvSeq = messageData.curRecvSeq; authMessage = new AuthRequest(messageData.ClientType,messageData.Token,messageData.UserId,messageData.Version); if(authMessage.ClientType == -1){ authMessage = null; } Log.i(TAG,"load = 1 "+curRecvSeq); }else { Log.i(TAG,"qweq: "+recordDb.recordDao().databaseCount()); //数据库很多数据,取最后一条的数据 recordData messageData = recordDb.recordDao().getRecord(); curRecvSeq = messageData.curRecvSeq; authMessage = new AuthRequest(messageData.ClientType,messageData.Token,messageData.UserId,messageData.Version); recordDb.recordDao().deleteAll(); recordData record1 = new recordData(); record1.Token = authMessage.Token; record1.curRecvSeq = messageData.curRecvSeq; record1.Version = authMessage.version; record1.ClientType = authMessage.ClientType; record1.UserId = authMessage.UserId; recordDb.recordDao().insertAll(record1); if(authMessage.ClientType == -1){ authMessage = null; } Log.i(TAG,"load > 1 "+curRecvSeq); } } }; new Thread(runnable).start(); if(curRecvSeq != -1){ //如果有操作记录,那么查询数据库,取出未处理的数据,发给flutter。 List<MessageData> messageDataList = db.userDao().getAll(); for(MessageData messageData : messageDataList){ //未操作数据压入哈希表 hTable.put(messageData.seq,messageData); curRecvSeq ++; } } Log.i(TAG,"load msg :"+curRecvSeq); }
这个数据库理论上只能存在一条数据,因为是记录嘛,然后这里的逻辑是,当数据库没有数据时,给他一个默认值,标记为初次启动。当一条数据的时候读取这条数据。当出现不可抗力时,出现了多条数据,取出最后一条数据,然后删库不跑路。把这最后一条记录插进数据库。这个记录是为了获取登录信息的,账号,token等。这样他从后台启动起来的时候,还是处于连接状态。
下面是断线重连机制以及消息发送队列机制
private Runnable runnableSend = new Runnable() { @Override public void run() { while(isRunning){ try {//刷新连接状态 if(signalRChannel == null || !signalRChannel.isConnected()){ try{ reConnect(); }catch (Exception e){ e.printStackTrace(); continue; } } while(!sendMessageQueue.isEmpty()){ //发送消息 SendMessage sendMessage = sendMessageQueue.poll(); signalRChannel.send(sendMessage.method, sendMessage.message); } if(!logFile.fileStatus){ logFile.openLog(); } Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } } };
这里还是跑一个死循环线程,反复确认连接状态,如果断开连接的话,执行重连。消息发送也比较简单,放在一个队列里,顺便发送出去。
下面是重连,比较简单就这样看吧。
private void reConnect() { if(signalRChannel != null){ signalRChannel.stopConnect(); } signalRChannel = new SignalRChannel(url,receiveHandler); signalRChannel.startConnect(); if(authMessage!=null){ signalRChannel.send("Auth",authMessage); } }
下面是三个开放给外部的方法
//发送消息 public void send(String method,Object... messages){ /** * queue * */ if(method.equals("Echo")){ long time = System.currentTimeMillis()/1000; signalRChannel.send(method,String.valueOf(time/1000)); }else { SendMessage sendMessage = new SendMessage(method,messages); sendMessageQueue.offer(sendMessage); } } //登录 public void LogIn(AuthRequest authRequest){ this.authMessage = authRequest; //todo: write file // signalRChannel.send("Auth",authMessage); } //登出 public void LogOut(){ authMessage = null; if(signalRChannel != null){ signalRChannel.stopConnect(); } if(logFile.fileStatus){ logFile.closeLog(); } }
发送消息的时候给他压进消息队列里,等一段时间发送。当然我这里设置时间是4秒,有点不合理,这个需要自己改一下。
然后这里的登录登出只是状态登出了,长链接是一直存在的。
登出是先把authMessage清空,然后断开重连一下,就断开了。
登录只是记录下他的登录信息,因为我们登录走的是另外的方法。
这里大致是这样了,其他很多代码都是跟我们自己的业务相关,我会觉得不具有参考性,就不列出来了。
最后贴一下demo地址: https://github.com/libo1223/signalR