zoukankan      html  css  js  c++  java
  • android中非堵塞socket通信

    1、什么是同步与异步。堵塞与非堵塞

    首先我们要明确搞明确:同步就等于堵塞?异步就等于非堵塞?这是不正确的,同步不等于阻 塞。而异步也不等于非堵塞。

    1)那什么是同步编程?

    什么是同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回。

    依据这个定义,android中绝大多数函数都是同步调用。

    可是一般而言,我们在谈论同步、异步的时候,特指那些须要其它部件协作或者须要一定时间完毕的任务。在android中,因为主线程(UI线程的不安全性),我们常常会用到handler的SendMessage函数,就是一个同步线程,它将数据传送给某个窗体后。在对方处理完消息后,这个函数是不会返回的,当处理完毕的时候才返回对应的返回值。

    2)那什么是异步编程?

    异步的概念和同步相反的。

    当一个调用者异步发出一个功能调用时,调用者不能立马得到结果。实际处理这个调用的部件在完毕后,通过状态、通知和回调来通知调用者。以 android中AsyncTask类为例,顾名思义异步运行任务,在doInBackground 运行完毕后,onPostExecute 方法将被UI 线程调用。后台的计算结果将通过该方法传递到UI 线程。而且在界面上展示给用户.。在android或者java异步编程中须要注意下面几个知识点:回调。监听者模式,观察者模式。

    这几点在之后另外几篇文章中会提及。

    3)什么是堵塞式编程?

    堵塞调用是指调用结果返回之前,当前线程会被挂起。

    函数仅仅有在 得到结果之后才会返回。由于这点定义跟同步编程的定义非常相像。所以非常多人觉得同步编程就等堵塞式编程。

    对于同步调用来说,非常多时候当前线程还是激活的。仅仅是从逻辑上当前函数没有返回而已。比如,我们在 socket编程中调用Receive函数。假设缓冲区中没有数据,这个函数就会一直等待,直到有数据才返回。而此时。当前线程还会继续处理各种各样的消 息。

    假设主窗体和调用函数在同一个线程中,除非你在特殊的界面操作函数中调用。事实上主界面还是应该能够刷新。可是在android中,因为主线程(UI线程)的不安全性,特别到4.0版本号后,系统已经不同意在主线程中进行耗时的同步编程。所以android才出现了AsyncTask类用于异步编程。


    4)什么是非堵塞式编程?

    非堵塞和堵塞的概念相相应,指在不能立马得到结果之前,该函数不会堵塞当前线程。而会立马返回。

    从这个定义上来说,非堵塞编程能够说是异步编程的一种,可是异步编程并不等于非堵塞式编程。

    5)差别大概

    我们用买票的案例去理解它。当我们去买票的时候。假设还在排队,一直排着。直到买到票再离开,这个就是同步编程(所谓同步就是当一个进程发起一个函数(任务)调用的时候。一直会到函数(任务)完毕)。那还有另外一方式。你能够叫一个人(监听者。观察者)帮你看着,直接你买票了,再通知你,你能够先去别的事情(而异步这不会这样,异步情况下是当一个进程发 起一个函数(任务)调用的时候,不会等函数返回)。堵塞是就是等排队。非堵塞就是直接走开。

    2、几个关键知识点

    1)java.net.InetSocketAddress

    此类实现 IP 套接字地址(IP 地址 + port号)。

    它还能够是一个对(主机名 + port号)。在此情况下,将尝试解析主机名。

    假设解析失败,则该地址将被视为未解析 地址,可是其在某些情形下仍然能够使用。比方通过代理连接。

    需注意接口:
    public InetSocketAddress(InetAddress addr,int port)
    依据 IP 地址和port号创建套接字地址。
    有效port值介于 0 和 65535 之间。

    port号 zero 同意系统在 bind 操作中挑选临时的port。

    2)java.nio.channels.Selector

    可通过调用此类的 open 方法创建选择器。该方法将使用系统的默认选择器提供者创建新的选择器。也可通过调用自己定义选择器提供者的 openSelector 方法来创建选择器。通过选择器的 close 方法关闭选择器之前,它一直保持打开状态。
    需注意接口:
    public static Selector open()throws IOException
    打开一个选择器。


    public abstract void close()throws IOException
    关闭此选择器。


    假设某个线程眼下正堵塞在此选择器的某个选择方法中。则中断该线程。如同调用该选择器的 wakeup 方法那样。
    全部仍与此选择器关联的未取消键已无效、其通道已注销,而且与此选择器关联的全部其它资源已释放。
    假设此选择器已经关闭,则调用此方法无效。
    关闭选择器后,除了调用此方法或 wakeup 方法外,以不论什么其它方式继续使用它都将导致抛出 ClosedSelectorException。

    注:选择器的关闭是关键点。特别须要注意上述第二条

    3)java.nio.channels.SocketChannel

    针对面向流的连接套接字的可选择通道。


    套接字通道不是连接网络套接字的完整抽象。

    必须通过调用 socket 方法所获得的关联 Socket 对象来完毕对套接字选项的绑定、关闭和操作。不可能为随意的已有套接字创建通道,也不可能指定与套接字通道关联的套接字所使用的 SocketImpl 对象。


    通过调用此类的某个 open 方法创建套接字通道。新创建的套接字通道已打开。但尚未连接。

    试图在未连接的通道上调用 I/O 操作将导致抛出 NotYetConnectedException。可通过调用套接字通道的 connect 方法连接该通道;一旦连接后,关闭套接字通道之前它会一直保持已连接状态。

    可通过调用套接字通道的 isConnected 方法来确定套接字通道是否已连接。


    套接字通道支持非堵塞连接:可创建一个套接字通道,而且通过 connect 方法能够发起到远程套接字的连接,之后通过 finishConnect 方法完毕该连接。

    可通过调用 isConnectionPending 方法来确定是否正在进行连接操作。


    可单独地关闭 套接字通道的输入端和输出端,而无需实际关闭该通道。

    调用关联套接字对象的 shutdownInput 方法来关闭某个通道的输入端将导致该通道上的兴许读取操作返回 -1(指示流的末尾)。调用关联套接字对象的 shutdownOutput 方法来关闭通道的输出端将导致该通道上的兴许写入操作抛出 ClosedChannelException。


    套接字通道支持异步关闭。这与 Channel 类中所指定的异步 close 操作类似。

    假设一个线程关闭了某个套接字的输入端。而同一时候还有一个线程被堵塞在该套接字通道上的读取操作中,那么处于堵塞线程中的读取操作将完毕,而不读取不论什么字节且返回 -1。I假设一个线程关闭了某个套接字的输出端。而同一时候还有一个线程被堵塞在该套接字通道上的写入操作中,那么堵塞线程将收到 AsynchronousCloseException。


    多个并发线程可安全地使用套接字通道。虽然在随意给定时刻最多仅仅能有一个线程进行读取和写入操作,但数据报通道支持并发的读写。connect 和 finishConnect 方法是相互同步的,假设正在调用当中某个方法的同一时候试图发起读取或写入操作。则在该调用完毕之前该操作被堵塞。


    3、实例代码演示

    连接核心代码:

    Selector mSelector = null;
    ByteBuffer sendBuffer = null;
    SocketChannel client = null;
    InetSocketAddress isa = null;
    SocketEventListener mSocketEventListener = null;
    private boolean Connect(String site, int port)
    {
            if (mSocketEventListener != null)
            {
                    mSocketEventListener.OnSocketPause();
            }
            boolean ret = false;
            try
            {
                    mSelector = Selector.open();
                    client = SocketChannel.open();
                    client.socket().setSoTimeout(5000);
                    isa = new InetSocketAddress(site, port);
                    boolean isconnect = client.connect(isa);
                    // 将客户端设定为异步
                    client.configureBlocking(false);
                    // 在轮讯对象中注冊此客户端的读取事件(就是当server向此客户端发送数据的时候)
                    client.register(mSelector, SelectionKey.OP_READ);
                    
                    long waittimes = 0;
    
                    if(!isconnect)
                    {
                        while (!client.finishConnect())
                        {
                                EngineLog.redLog(TAG,  "等待非堵塞连接建立....");
                                Thread.sleep(50);
                                if(waittimes < 100)
                                {
                                        waittimes++;
                                }
                                else
                                {
                                        break;
                                }
                        }
                    }
                    Thread.sleep(500);
                    haverepaired();
                    startListener();
                    ret = true;
            }
            catch (Exception e)
            {
                    EngineLog.redLog(TAG + " - Connect error", e != null ? e.toString() : "null");
                    try
                    {
                            Thread.sleep(1000 * 10);
                    }
                    catch (Exception e1)
                    {
                            EngineLog.redLog(TAG + " - Connect error", e1 != null ? e1.toString() : "null");
                    }
                    ret = false;
            }
            return ret;
    }
    在上述代码中。我们能够看到有一个SocketEventListener监听接口,这个接口用于监听socket事件,将其回调给调用者
    SocketEventListener接口:
    public interface SocketEventListener
            {
                    /**
                     * Socket正在接收数据
                     * */
                    public void OnStreamRecive();
                    /**
                     * Socket接收数据完毕
                     * */
                    public void OnStreamReciveFinish();
                    /**
                     * Socket有新的消息返回
                     * */
                    public void OnStreamComing(byte[] aStreamData);
                    /**
                     * Socket出现异常
                     * */
                    public void OnSocketPause();
                    /**
                     * Socket已修复,可用
                     * */
                    public void OnSocketAvaliable();
            }
    监听接口的使用:
    rivate void startListener()
            {
                    if (mReadThread == null || mReadThread.isInterrupted())
                    {
                            mReadThread = null;
                            mReadThread = new Thread()
                            {
                                    @Override
                                    public void run()
                                    {
                                            while (!this.isInterrupted() && mRunRead)
                                            {
                                                    MyLineLog.redLog(TAG,"startListener:" + mSendMsgTime);
                                                    try
                                                    {
                                                         // 假设client连接没有打开就退出循环
                                                            if (!client.isOpen())
                                                                    break;
                                                            // 此方法为查询是否有事件发生假设没有就堵塞,有的话返回事件数量
                                                            int eventcount = mSelector.select();
                                                            // 假设没有事件返回循环
                                                            if (eventcount > 0)
                                                            {
                                                            	starttime = CommonClass.getCurrentTime();
                                                                    // 遍例全部的事件
                                                                    for (SelectionKey key : mSelector.selectedKeys())
                                                                    {
                                                                            // 删除本次事件
                                                                            mSelector.selectedKeys().remove(key);
                                                                            // 假设本事件的类型为read时,表示server向本client发送了数据
                                                                            if (key.isValid() && key.isReadable())
                                                                            {
                                                                                    if (mSocketEventListener != null)
                                                                                    {
                                                                                            mSocketEventListener.OnStreamRecive();
                                                                                    }
                                                                                    boolean readresult = ReceiveDataBuffer((SocketChannel) key.channel());
    
                                                                                    if (mSocketEventListener != null)
                                                                                    {
                                                                                            mSocketEventListener.OnStreamReciveFinish();
                                                                                    }
                                                                                    
                                                                                    if(readresult)
                                                                                    {
                                                                                            key.interestOps(SelectionKey.OP_READ);
                                                                                            sleep(200);
                                                                                    }
                                                                                    else
                                                                                    {
                                                                                            throw new Exception();
                                                                                    }
                                                                            }
                                                                            key = null;
                                                                    }
                                                                    mSelector.selectedKeys().clear();
                                                            }
                                                    }
                                                    catch (Exception e)
                                                    {
                                                            mRunRead = false;
                                                            mReadThread = null;
                                                            if(e instanceof InterruptedException)
                                                            {
                                                                    MyLineLog.redLog(TAG, "startListener:" + e.toString());
                                                            }
                                                            else
                                                            {
                                                                    break;
                                                            }
                                                    }
                                            }
                                    }
                            };
                            mReadThread.setName(TAG + " Listener, " + CommonClass.getCurrentTime());
                            mRunRead = true;
                            mReadThread.start();
                    }
            }

    连接完之后就是发送数据和接收数据,以下是发送数据的核心代码:

    public boolean SendSocketMsg(byte[] aMessage) throws IOException
            {
                    boolean ret = false;
                    try
                    {
                            sendBuffer.clear();
                            sendBuffer = ByteBuffer.wrap(aMessage);
                            int sendsize = client.write(sendBuffer);
                            sendBuffer.flip();
                            sendBuffer.clear();
                            mSendMsgTime = CommonClass.getCurrentTime();
                            MyLineLog.redLog(TAG, "SendSocketMsg:" + mSendMsgTime + ", sendsize:" + sendsize);
                            ret = true;
                    }
                    catch (Exception e)
                    {
                            MyLineLog.redLog(TAG,  "发送数据失败。

    "); if (mSocketEventListener != null) { mSocketEventListener.OnSocketPause(); } // crash(); } return ret; }


    由于实际工作须要,我们须要常常会碰到两个问题。无效数据和大数据。怎样去解决问题呢,无效数据用过滤,大数据用分块接收,以下是接收数据的方法:
    private boolean ReceiveDataBuffer(SocketChannel aSocketChannel)
            {
    //              n 有数据的时候返回读取到的字节数。
    //              0 没有数据而且没有达到流的末端时返回0。
    //              -1 当达到流末端的时候返回-1。

    boolean ret = false; ByteArrayBuffer bab = new ByteArrayBuffer(8*1024); while(true) { try { ByteBuffer readBuffer = ByteBuffer.allocate(1024 * 1); readBuffer.clear(); int readsize = aSocketChannel.read(readBuffer); if(readsize > 0) { MyLineLog.redLog(TAG, "aSocketChannel.read=>" + readsize); byte[] readbytes = readBuffer.array(); bab.append(readbytes, 0, readsize); readBuffer.clear(); readBuffer.flip(); ret = true; } else if(readsize == 0) { int buffersize = bab.length(); byte[] readdata = bab.buffer(); int readdataoffset = 0; boolean parsedata = true; while(readdataoffset < buffersize && parsedata) { byte datatype = readdata[readdataoffset]; if (datatype == PushUtils.PACKAGETYPE_HEARTBEAT || datatype == PushUtils.PACKAGETYPE_HEARTBEAR_NODATA) { byte[] blockdata = new byte[] { datatype }; ReceiveData(blockdata); readdataoffset += 1; blockdata = null; } else { byte[] blocklength = new byte[4]; System.arraycopy(readdata, readdataoffset + 5, blocklength, 0, 4); int blocksize = CommonClass.bytes2int(CommonClass.LitteEndian_BigEndian(blocklength)); blocklength = null; int blockdatasize = 5 + blocksize + 4; if(blockdatasize <= buffersize) { MyLineLog.redLog(TAG, "块数据大小:" + blockdatasize); byte[] blockdata = new byte[blockdatasize]; System.arraycopy(readdata, readdataoffset, blockdata, 0, blockdatasize); long starttime = CommonClass.getCurrentTime(); ReceiveData(blockdata); long endtime = CommonClass.getCurrentTime(); MyLineLog.redLog(TAG, "解析数据用时:" + (endtime - starttime) + "ms"); readdataoffset += blockdatasize; blockdata = null; } else if(blockdatasize < 10240) {//小于10k,则属于正常包 MyLineLog.redLog(TAG, "块数据大小:" + blockdatasize + ",小于10k,说明数据不完整,继续获取。

    "); //将未解析数据存到暂时buffer int IncompleteSize = buffersize - readdataoffset; if(IncompleteSize > 0) { byte[] Incompletedata = new byte[IncompleteSize]; System.arraycopy(readdata, readdataoffset, Incompletedata, 0, IncompleteSize); bab.clear(); bab.append(Incompletedata, 0, IncompleteSize); parsedata = false; Incompletedata = null; } } else {//异常包 MyLineLog.yellowLog(TAG, "块数据错误大小:" + blockdatasize); MyLineLog.redLog(TAG,"blockdatasize error:" + blockdatasize); ret = true; break; } } } if(parsedata) { ret = true; break; } } else if(readsize == -1) { ret = false; break; } else { ret = true; break; } } catch (IOException e) { MyLineLog.redLog(TAG, "aSocketChannel IOException=>" + e.toString()); ret = false; break; } } bab.clear(); bab = null; return ret; }


    假设数据量过大的话,还会使用压缩方法进行传输,那应该怎样接收呢,以下是一段接收压缩数据的方法:
    private void ReceiveData(byte[] aDataBlock)
            {
                    try
                    {
                            MyLineLog.redLog(TAG, "ReceiveData:" + mSendMsgTime);
                            if (mSendMsgTime != 0)
                            {
                                    mSendMsgTime = 0;
                            }
                            
                            byte[] ret = null;
                            
                            int offset = 0;
    
                            byte datatype = aDataBlock[offset];
                            offset += 1;
    
                            if (datatype != -1)
                            {
                                    if (datatype == PushUtils.PACKAGETYPE_HEARTBEAT)
                                    {
                                            ret = new byte[] { datatype };
                                    }
                                    else if (datatype == PushUtils.PACKAGETYPE_HEARTBEAR_NODATA)
                                    {
                                            ret = new byte[] { datatype };
                                    }
                                    else if (datatype == PushUtils.PACKAGETYPE_NORMAL || datatype == PushUtils.PACKAGETYPE_HEARTBEAR_HAVEDATA)
                                    {
                                            byte[] databytelength = new byte[4];
                                            System.arraycopy(aDataBlock, offset, databytelength, 0, 4);
                                            offset += 4;
                                            int header = CommonClass.bytes2int(CommonClass.LitteEndian_BigEndian(databytelength));
                                            databytelength = null;
    
                                            if (header == PushUtils.PACKAGEHEADER)
                                            {
                                                    byte[] datalengthbyte = new byte[4];
                                                    System.arraycopy(aDataBlock, offset, datalengthbyte, 0, 4);
                                                    offset += 4;
    
                                                    int datalength = CommonClass.bytes2int(CommonClass.LitteEndian_BigEndian(datalengthbyte));
                                                    datalengthbyte = null;
    
                                                    if (datalength > 4)
                                                    {
                                                            // compressed bit 临时不压缩
                                                            byte compressed = aDataBlock[offset];
                                                            offset += 1;
    
                                                            if (compressed == 1)
                                                            {//解压缩
                                                                    //跳过头4个字节。此处用于解压缩后的数据大小,临时不须要
                                                                    offset += 4;
                                                                    int contentlength = datalength - 1 - 4;
                                                                    byte[] datacontentbyte = new byte[contentlength];
                                                                    System.arraycopy(aDataBlock, offset, datacontentbyte, 0, contentlength);
                                                                    offset += contentlength;                                                               
    
                                                                    byte[] compressdata = new byte[contentlength - 4];
                                                                    System.arraycopy(datacontentbyte, 0, compressdata, 0, contentlength - 4);
    
                                                                    long starttime = CommonClass.getCurrentTime();
                                                                    byte[] decompressdatacontentbyte = CommonClass.decompress(compressdata);
                                                                    long endtime = CommonClass.getCurrentTime();
                                                                    MyLineLog.redLog(TAG, "解压缩数据用时:" + (endtime - starttime) + "ms");
                                                                    int decompressdatacontentbytelength = decompressdatacontentbyte.length;
                                                                    compressdata = null;
                                                                    int footer = PushUtils.getInt(datacontentbyte, contentlength - 4);
    
                                                                    if (footer == PushUtils.PACKAGEFOOTER)
                                                                    {
                                                                            ret = new byte[decompressdatacontentbytelength + 1];
                                                                            ret[0] = datatype;
                                                                            System.arraycopy(decompressdatacontentbyte, 0, ret, 1, decompressdatacontentbytelength);
                                                                            datacontentbyte = null;
                                                                            decompressdatacontentbyte = null;
                                                                    }
                                                            }
                                                            else
                                                            {//数据未压缩
                                                                    int contentlength = datalength - 1;
                                                                    byte[] datacontentbyte = new byte[contentlength];
                                                                    System.arraycopy(aDataBlock, offset, datacontentbyte, 0, contentlength);
                                                                    offset += contentlength;
    
                                                                    int footer = PushUtils.getInt(datacontentbyte, contentlength - 4);
    
                                                                    if (footer == PushUtils.PACKAGEFOOTER)
                                                                    {
                                                                            ret = new byte[contentlength + 1 - 4];
                                                                            ret[0] = datatype;
                                                                            System.arraycopy(datacontentbyte, 0, ret, 1, contentlength - 4);
                                                                            datacontentbyte = null;
                                                                    }                                                                
                                                            }
                                                    }
                                            }
                                    }
    
                                    if (mSocketEventListener != null)
                                    {
                                            mSocketEventListener.OnStreamComing(ret);
                                    }
                            } 
                    }
                    catch (Exception e)
                    {
                            MyLineLog.redLog(TAG + " - ReceiveData error", e.toString());
                    }
            }

    在介绍SocketChannel的时候,api提到关闭须要注意事项,以下一段关闭SocketChannel的演示样例代码:
    public void closeSocket()
            {
                    mRunRead = false;
                    if (mReadThread != null)
                    {
                            if (!mReadThread.isInterrupted())
                            {
                                    mReadThread.interrupt();
                                    mReadThread = null;
                            }
                    }
    
                    if (mSelector != null && mSelector.isOpen())
                    {
                            try
                            {
                                    mSelector.close();
                            }
                            catch (IOException e)
                            {
                                    MyLineLog.redLog(TAG + " - closeSocket error", e.toString());
                            }
                            mSelector = null;
                    }
                    
                    if (client != null)
                    {
                            try
                            {
                                    client.close();
                                    client = null;
                            }
                            catch (IOException e)
                            {
                                    MyLineLog.redLog(TAG + " - closeSocket2 error", e.toString());
                            }
                    }
    
                    System.gc();
            }

    这篇文章解说部分大量參照JavaApi。事实上非常多问题的答案就在Api里面,当你不知道怎样去做的时候,回头看一下Api,细致思考一下,就能解决大部分问题。
    Ps:感谢我大学舍友阿钢为我提供的代码





  • 相关阅读:
    FICOON
    Mezzanine
    BIOS
    基于ftp的自动传输脚本
    主机存活检测、端口检测
    基于ssh的服务器基础信息搜集
    Spring Boot aop使用指南
    Java动态代理
    Spring中的声明式事务管理
    Spring Boot注解使用指南
  • 原文地址:https://www.cnblogs.com/zhchoutai/p/8341784.html
Copyright © 2011-2022 走看看