zoukankan      html  css  js  c++  java
  • socket服务端和客户端互发和心跳检测实例

    基础版

    网上百度了一个简单的socket服务端和客户端监听代码 并且已经试验完成。直接上代码

    服务端:

    package com.whalecloud.uip.server.socket;
    
    import java.io.DataOutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/10 16:07
     */
    public class SocketServer {
    
        private static final int PORT = 5209;
    
    
        public static void test() {
            ServerSocket server = null;
            Socket socket = null;
            DataOutputStream out = null;
            try {
                server = new ServerSocket(PORT);
                socket = server.accept();
                out = new DataOutputStream(socket.getOutputStream());
                while (true) {
                    Thread.sleep(1000);
                    out.writeUTF(getRandomStr());
                    out.flush();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
        private static String getRandomStr() {
            String str = "";
            int ID = (int) (Math.random() * 30);
            int x = (int) (Math.random() * 200);
            int y = (int) (Math.random() * 300);
            int z = (int) (Math.random() * 10);
            str = "ID:" + ID + "/x:" + x + "/y:" + y + "/z:" + z;
            return str;
        }
    
    
        public static void main(String[] args) {
            test();
        }
    }

    客户端:

    package com.whalecloud.uip.server.socket;
    
    import java.io.DataInputStream;
    import java.io.InputStream;
    import java.net.Socket;
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/10 16:10
     */
    public class SocketClient {
        private static final String HOST = "127.0.0.1";
        private static final int PORT = 5209;
    
        private static void test() {
            Socket socket = null;
            DataInputStream dis = null;
            InputStream is = null;
    
            try {
                socket = new Socket(HOST, PORT);
                is = socket.getInputStream();
                dis = new DataInputStream(is);
                while (true) {
                    System.out.println("receive_msg:" + dis.readUTF());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            test();
        }
    }

    启动两个项目就可以在控制台看到接收到的信息了

    完整进阶版

    客户端:

    ClientMain---启动类
    package com.whalecloud.uip.client.socket;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/11 16:57
     */
    public class ClientMain {
    
        public static void main(String[] args) {
            SocketClientResponseInterface socketClientResponseInterface = new SocketClientResponseInterface() {
                @Override
                public void onSocketConnect() {
                    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    String strDate = df.format(new Date());
                    System.out.println(strDate + "连接成功");
                }
    
                @Override
                public void onSocketReceive(Object socketResult, int code) {
                    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    String strDate = df.format(new Date());
                    System.out.println(strDate+"拿到消息:"+socketResult);
                }
    
                @Override
                public void onSocketDisable(String msg, int code) {
                    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    String strDate = df.format(new Date());
                    System.out.println(strDate+"断开连接");
                }
            };
            try {
                SocketClient socketClient = new SocketClient(socketClientResponseInterface);
                while (true) {
                    socketClient.sendData("客户端11发送测试数据");
                    Thread.sleep(20000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    View Code

    SocketClientResponseInterface---返回信息接口类
    package com.whalecloud.uip.client.socket;
    
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/10 16:42
     */
    public interface SocketClientResponseInterface<T> {
        /**
         * 客户端连接回调
         */
        void onSocketConnect();
    
        /**
         * 客户端收到服务端消息回调
         *
         * @param socketResult
         * @param code
         */
        void onSocketReceive(T socketResult, int code);
    
        /**
         * 客户端关闭回调
         *
         * @param msg
         * @param code
         */
        void onSocketDisable(String msg, int code);
    }
    View Code

    SocketCloseInterface---连接关闭信息接口类
    package com.whalecloud.uip.client.socket;
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/10 17:24
     */
    public interface SocketCloseInterface {
    
        /**
         * 客户端收到服务端消息回调
         */
        void onSocketShutdownInput();
    
        /**
         * 客户端关闭回调
         */
        void onSocketDisconnection();
    }
    View Code

    SocketClient---客户端调用类
    package com.whalecloud.uip.client.socket;
    
    import org.apache.http.util.TextUtils;
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/10 17:05
     */
    public class SocketClient {
    
        private static final String TAG = SocketClient.class.getSimpleName();
    
        private SocketClientThread socketClientThread;
    
        public SocketClient(SocketClientResponseInterface socketClientResponseInterface) {
            socketClientThread = new SocketClientThread("socketClientThread", socketClientResponseInterface);
            socketClientThread.start();
            //ThreadPoolUtil.getInstance().addExecuteTask(socketClientThread);
        }
    
        public <T> void sendData(T data) {
            //convert to string or serialize object
            String s = (String) data;
            if (TextUtils.isEmpty(s)) {
                System.out.print(TAG+"sendData: 消息不能为空");
                return;
            }
            if (socketClientThread != null) {
                socketClientThread.sendMsg(s);
            }
        }
    
        public void stopSocket() {
            //一定要在子线程内执行关闭socket等IO操作
            new Thread(() -> {
                socketClientThread.setReConnect(false);
                socketClientThread.stopThread();
            }).start();
        }
    }
    View Code
    SocketClientThread---客户端线程类
    package com.whalecloud.uip.client.socket;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.ConnectException;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    import java.net.SocketAddress;
    
    import javax.net.SocketFactory;
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/10 16:10
     * 写数据采用死循环,没有数据时wait,有新消息时notify
     * 连接线程
     */
    public class SocketClientThread extends Thread implements SocketCloseInterface{
    
        private static final String TAG = SocketClientThread.class.getSimpleName();
    
        private volatile String name;
    
        private boolean isLongConnection = true;
        private boolean isReConnect = true;
        private SocketSendThread mSocketSendThread;
        private SocketReceiveThread mSocketReceiveThread;
        private SocketHeartBeatThread mSocketHeartBeatThread;
        private Socket mSocket;
    
        private boolean isSocketAvailable;
    
        private SocketClientResponseInterface socketClientResponseInterface;
    
        public SocketClientThread(String name, SocketClientResponseInterface socketClientResponseInterface) {
            this.name = name;
            this.socketClientResponseInterface = socketClientResponseInterface;
        }
    
        @Override
        public void run() {
            final Thread currentThread = Thread.currentThread();
            final String oldName = currentThread.getName();
            currentThread.setName("Processing-" + name);
            try {
                initSocket();
                System.out.print(TAG + "run: SocketClientThread end");
            } finally {
                currentThread.setName(oldName);
            }
        }
    
        /**
         * 初始化socket客户端
         */
        private void initSocket() {
            try {
                mSocket = SocketFactory.getDefault().createSocket();
                SocketAddress socketAddress = new InetSocketAddress(SocketUtil.ADDRESS, SocketUtil.PORT);
                mSocket.connect(socketAddress, 10000);
    
                isSocketAvailable = true;
    
                //开启接收线程
                mSocketReceiveThread = new SocketReceiveThread("SocketReceiveThread",
                    new BufferedReader(new InputStreamReader(mSocket.getInputStream(), "UTF-8")),
                    socketClientResponseInterface, this);
                mSocketReceiveThread.start();
    
                //开启发送线程
                PrintWriter printWriter = new PrintWriter(mSocket.getOutputStream(), true);
                System.out.print(TAG+"initSocket: " + printWriter);
                mSocketSendThread = new SocketSendThread("SocketSendThread", printWriter);
                mSocketSendThread.setCloseSendTask(false);
                mSocketSendThread.start();
    
                //开启心跳线程
                if (isLongConnection) {
                    mSocketHeartBeatThread = new SocketHeartBeatThread("SocketHeartBeatThread",
                        printWriter, mSocket, this);
                    mSocketHeartBeatThread.start();
                }
    
                if (socketClientResponseInterface != null) {
                    socketClientResponseInterface.onSocketConnect();
                }
            } catch (ConnectException e) {
                failedMessage("服务器连接异常,请检查网络", SocketUtil.FAILED);
                e.printStackTrace();
                stopThread();
            } catch (IOException e) {
                failedMessage("网络发生异常,请稍后重试", SocketUtil.FAILED);
                e.printStackTrace();
                stopThread();
            }
        }
    
        /**
         * 发送消息
         */
        public void sendMsg(String data) {
            if (mSocketSendThread != null) {
                mSocketSendThread.sendMsg(data);
            }
        }
    
        /**
         * 关闭socket客户端
         */
        public synchronized void stopThread() {
            //关闭接收线程
            closeReceiveTask();
            //唤醒发送线程并关闭
            wakeSendTask();
            //关闭心跳线程
            closeHeartBeatTask();
            //关闭socket
            closeSocket();
            //清除数据
            clearData();
            failedMessage("断开连接", SocketUtil.FAILED);
            if (isReConnect) {
                SocketUtil.toWait(this, 15000);
                initSocket();
                System.out.print(TAG + "stopThread: " + Thread.currentThread().getName());
            }
        }
    
        /**
         * 唤醒后关闭发送线程
         */
        private void wakeSendTask() {
            if (mSocketSendThread != null) {
                mSocketSendThread.wakeSendTask();
            }
        }
    
        /**
         * 关闭接收线程
         */
        private void closeReceiveTask() {
            if (mSocketReceiveThread != null) {
                mSocketReceiveThread.close();
                mSocketReceiveThread = null;
            }
        }
    
        /**
         * 关闭心跳线程
         */
        private void closeHeartBeatTask() {
            if (mSocketHeartBeatThread != null) {
                mSocketHeartBeatThread.close();
            }
        }
    
        /**
         * 关闭socket
         */
        private void closeSocket() {
            if (mSocket != null) {
                if (!mSocket.isClosed() && mSocket.isConnected()) {
                    try {
                        mSocket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                isSocketAvailable = false;
                mSocket = null;
            }
        }
    
        /**
         * 清除数据
         */
        private void clearData() {
            if (mSocketSendThread != null) {
                mSocketSendThread.clearData();
            }
        }
    
        /**
         * 连接失败回调
         */
        private void failedMessage(String msg, int code) {
            if (socketClientResponseInterface != null) {
                socketClientResponseInterface.onSocketDisable(msg, code);
            }
        }
    
        @Override
        public void onSocketShutdownInput() {
            if (isSocketAvailable) {
                SocketUtil.inputStreamShutdown(mSocket);
            }
        }
    
        @Override
        public void onSocketDisconnection() {
            isSocketAvailable = false;
            stopThread();
        }
    
        /**
         * 设置是否断线重连
         */
        public void setReConnect(boolean reConnect) {
            isReConnect = reConnect;
        }
    
    }
    View Code
    SocketHeartBeatThread---心跳监听类
    package com.whalecloud.uip.client.socket;
    
    import java.io.PrintWriter;
    import java.net.Socket;
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/10 17:01
     * 心跳实现,频率5秒
     */
    public class SocketHeartBeatThread extends Thread {
    
        private static final String TAG = SocketHeartBeatThread.class.getSimpleName();
    
        private volatile String name;
    
        private static final int REPEAT_TIME = 120000;
        private boolean isCancel = false;
        private final PrintWriter printWriter;
        private Socket mSocket;
    
        private SocketCloseInterface socketCloseInterface;
    
        public SocketHeartBeatThread(String name, PrintWriter printWriter,
                                     Socket mSocket, SocketCloseInterface socketCloseInterface) {
            this.name = name;
            this.printWriter = printWriter;
            this.mSocket = mSocket;
            this.socketCloseInterface = socketCloseInterface;
        }
    
        @Override
        public void run() {
            final Thread currentThread = Thread.currentThread();
            final String oldName = currentThread.getName();
            currentThread.setName("Processing-" + name);
            try {
                while (!isCancel) {
                    if (!isConnected()) {
                        break;
                    }
    
                    if (printWriter != null) {
                        synchronized (printWriter) {
                            SocketUtil.write2Stream("ping", printWriter);
                        }
                    }
                    try {
                        Thread.sleep(REPEAT_TIME);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } finally {
                //循环结束则退出输入流
                if (printWriter != null) {
                    synchronized (printWriter) {
                        SocketUtil.closePrintWriter(printWriter);
                    }
                }
                currentThread.setName(oldName);
                System.out.print(TAG+"SocketHeartBeatThread finish");
            }
        }
    
        /**
         * 判断本地socket连接状态
         */
        private boolean isConnected() {
            if (mSocket.isClosed() || !mSocket.isConnected() ||
                mSocket.isInputShutdown() || mSocket.isOutputShutdown()) {
                if (socketCloseInterface != null) {
                    socketCloseInterface.onSocketDisconnection();
                }
                return false;
            }
            return true;
        }
    
        public void close() {
            isCancel = true;
            if (printWriter != null) {
                synchronized (printWriter) {
                    SocketUtil.closePrintWriter(printWriter);
                }
            }
        }
    
    }
    View Code
    SocketReceiveThread---消息接收类
    package com.whalecloud.uip.client.socket;
    
    import java.io.BufferedReader;
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/10 16:48
     * 数据接收线程
     */
    public class SocketReceiveThread extends Thread {
        private static final String TAG = SocketReceiveThread.class.getSimpleName();
    
        private volatile String name;
    
        private volatile boolean isCancel = false;
    
        private BufferedReader bufferedReader;
    
        private SocketCloseInterface socketCloseInterface;
    
        private SocketClientResponseInterface socketClientResponseInterface;
    
        public SocketReceiveThread(String name, BufferedReader bufferedReader,
                                   SocketClientResponseInterface socketClientResponseInterface,
                                   SocketCloseInterface socketCloseInterface) {
            this.name = name;
            this.bufferedReader = bufferedReader;
            this.socketClientResponseInterface = socketClientResponseInterface;
            this.socketCloseInterface = socketCloseInterface;
        }
    
        @Override
        public void run() {
            final Thread currentThread = Thread.currentThread();
            final String oldName = currentThread.getName();
            currentThread.setName("Processing-" + name);
            try {
                while (!isCancel) {
    
                    if (bufferedReader != null) {
                        String receiverData = SocketUtil.readFromStream(bufferedReader);
                        if (receiverData != null) {
                            successMessage(receiverData);
                        } else {
                            System.out.print(TAG + "run: receiverData==null");
                            break;
                        }
                    }
                }
            } finally {
                //循环结束则退出输入流
                SocketUtil.closeBufferedReader(bufferedReader);
                currentThread.setName(oldName);
                System.out.print(TAG + "SocketReceiveThread finish");
            }
        }
    
        /**
         * 接收消息回调
         */
        private void successMessage(String data) {
            if (socketClientResponseInterface != null) {
                socketClientResponseInterface.onSocketReceive(data, SocketUtil.SUCCESS);
            }
        }
    
        public void close() {
            isCancel = true;
            this.interrupt();
            if (bufferedReader != null) {
                if (socketCloseInterface != null) {
                    socketCloseInterface.onSocketShutdownInput();
                }
                SocketUtil.closeBufferedReader(bufferedReader);
                bufferedReader = null;
            }
        }
    }
    View Code
    SocketSendThread---发送线程类
    package com.whalecloud.uip.client.socket;
    
    import java.io.PrintWriter;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/10 16:59
     * 数据发送线程,当没有发送数据时让线程等待
     */
    public class SocketSendThread extends Thread {
        private static final String TAG = SocketSendThread.class.getSimpleName();
    
        private volatile String name;
    
        private volatile boolean isCancel = false;
        private boolean closeSendTask;
        private final PrintWriter printWriter;
    
        protected volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>();
    
        public SocketSendThread(String name, PrintWriter printWriter) {
            this.name = name;
            this.printWriter = printWriter;
        }
    
        @Override
        public void run() {
            final Thread currentThread = Thread.currentThread();
            final String oldName = currentThread.getName();
            currentThread.setName("Processing-" + name);
            try {
                while (!isCancel) {
    
                    String dataContent = dataQueue.poll();
                    if (dataContent == null) {
                        //没有发送数据则等待
                        SocketUtil.toWait(dataQueue, 0);
                        if (closeSendTask) {
                            //notify()调用后,并不是马上就释放对象锁的,所以在此处中断发送线程
                            close();
                        }
                    } else if (printWriter != null) {
                        synchronized (printWriter) {
                            SocketUtil.write2Stream(dataContent, printWriter);
                        }
                    }
                }
            } finally {
                //循环结束则退出输出流
                if (printWriter != null) {
                    synchronized (printWriter) {
                        SocketUtil.closePrintWriter(printWriter);
                    }
                }
                currentThread.setName(oldName);
                System.out.print(TAG+"SocketSendThread finish");
            }
        }
    
        /**
         * 发送消息
         */
        public void sendMsg(String data) {
            dataQueue.add(data);
            //有新增待发送数据,则唤醒发送线程
            SocketUtil.toNotifyAll(dataQueue);
        }
    
        /**
         * 清除数据
         */
        public void clearData() {
            dataQueue.clear();
        }
    
        public void close() {
            isCancel = true;
            this.interrupt();
            if (printWriter != null) {
                //防止写数据时停止,写完再停
                synchronized (printWriter) {
                    SocketUtil.closePrintWriter(printWriter);
                }
            }
        }
    
        public void wakeSendTask() {
            closeSendTask = true;
            SocketUtil.toNotifyAll(dataQueue);
        }
    
        public void setCloseSendTask(boolean closeSendTask) {
            this.closeSendTask = closeSendTask;
        }
    }
    View Code
    SocketUtil---客户端socket工具类
    package com.whalecloud.uip.client.socket;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.PrintWriter;
    import java.net.Inet6Address;
    import java.net.InetAddress;
    import java.net.NetworkInterface;
    import java.net.Socket;
    import java.net.SocketException;
    import java.util.Enumeration;
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/10 16:39
     */
    public class SocketUtil {
        private static final String TAG = SocketUtil.class.getSimpleName();
        public static String ADDRESS = "127.0.0.1";
        public static int PORT = 10086;
    
        public static final int SUCCESS = 100;
        public static final int FAILED = -1;
    
        /**
         * 读数据
         *
         * @param bufferedReader
         */
        public static String readFromStream(BufferedReader bufferedReader) {
            try {
                String s;
                if ((s = bufferedReader.readLine()) != null) {
                    return s;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        /**
         * 写数据
         *
         * @param data
         * @param printWriter
         */
        public static void write2Stream(String data, PrintWriter printWriter) {
            if (data == null) {
                return;
            }
            if (printWriter != null) {
                printWriter.println(data);
            }
        }
    
    
        /**
         * 关闭输入流
         *
         * @param socket
         */
        public static void inputStreamShutdown(Socket socket) {
            try {
                if (!socket.isClosed() && !socket.isInputShutdown()) {
                    socket.shutdownInput();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 关闭BufferedReader
         *
         * @param br
         */
        public static void closeBufferedReader(BufferedReader br) {
            try {
                if (br != null) {
                    br.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 关闭PrintWriter
         *
         * @param pw
         */
        public static void closePrintWriter(PrintWriter pw) {
            if (pw != null) {
                pw.close();
            }
        }
    
        /**
         * 阻塞线程,millis为0则永久阻塞,知道调用notify()
         */
        public static void toWait(Object o, long millis) {
            synchronized (o) {
                try {
                    o.wait(millis);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后
         *
         * @param o
         */
        public static void toNotifyAll(Object o) {
            synchronized (o) {
                o.notifyAll();
            }
        }
    
    }
    View Code

    服务端:

    ServerMain--服务端测试类
    package com.whalecloud.uip.server.socket;
    
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/10 20:05
     */
    public class ServerMain {
    
        private static boolean isStart = true;
        private static ServerReceiveThread serverReceiveThread;
    
        public static void main(String[] args) {
            ServerSocket serverSocket = null;
            ExecutorService executorService = Executors.newCachedThreadPool();
            System.out.println("服务端 " + SocketUtil.getIP() + " 运行中...
    ");
            try {
                serverSocket = new ServerSocket(SocketUtil.PORT);
                while (isStart) {
                    Socket socket = serverSocket.accept();
    
                    //设定输入流读取阻塞超时时间(180秒收不到客户端消息判定断线)
                    socket.setSoTimeout(180000);
                    serverReceiveThread = new ServerReceiveThread(socket,
                        new SocketServerResponseInterface() {
                            @Override
                            public void clientOffline() {// 对方不在线
                                System.out.println("offline");
                            }
    
                            @Override
                            public void clientOnline(String clientIp) {
                                System.out.println(clientIp + " is online");
                                System.out.println("-----------------------------------------");
                            }
                        });
    
                    if (socket.isConnected()) {
                        executorService.execute(serverReceiveThread);
                    }
                }
    
                serverSocket.close();
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (serverSocket != null) {
                    try {
                        isStart = false;
                        serverSocket.close();
                        serverReceiveThread.stop();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    View Code
    
    
    SocketServerResponseInterface--回调接口类
    package com.whalecloud.uip.server.socket;
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/10 19:34
     */
    public interface SocketServerResponseInterface {
        /**
         * 客户端断线回调
         */
        void clientOffline();
    
        /**
         * 客户端上线回调
         *
         * @param clientIp
         */
        void clientOnline(String clientIp);
    }
    View Code


    ServerReceiveThread--消息接送类
    package com.whalecloud.uip.server.socket;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.net.Socket;
    import java.util.Iterator;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/10 19:46
     */
    public class ServerReceiveThread implements Runnable {
        private ReceiveThread receiveThread;
        private static ServerSendThread serverSendThread;
        private Socket socket;
        private SocketServerResponseInterface socketServerResponseInterface;
    
        private volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>();
        private static ConcurrentHashMap<String, Socket> onLineClient = new ConcurrentHashMap<>();
    
        private long lastReceiveTime = System.currentTimeMillis();
    
        private String userIP;
    
        public String getUserIP() {
            return userIP;
        }
    
        public ServerReceiveThread(Socket socket, SocketServerResponseInterface socketServerResponseInterface) {
            this.socket = socket;
            this.socketServerResponseInterface = socketServerResponseInterface;
            this.userIP = socket.getInetAddress().getHostAddress();
            onLineClient.put(userIP, socket);
            System.out.println("用户:" + userIP
                + " 加入了聊天室,当前在线人数:" + onLineClient.size());
        }
    
        @Override
        public void run() {
            try {
                //开启接收线程
                receiveThread = new ReceiveThread();
                receiveThread.bufferedReader = new BufferedReader(
                    new InputStreamReader(socket.getInputStream(), "UTF-8"));
                receiveThread.start();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 断开socket连接
         */
        public void stop() {
            try {
                System.out.println("stop");
                if (receiveThread != null) {
                    receiveThread.isCancel = true;
                    receiveThread.interrupt();
                    if (receiveThread.bufferedReader != null) {
                        SocketUtil.inputStreamShutdown(socket);
                        System.out.println("before closeBufferedReader");
                        SocketUtil.closeBufferedReader(receiveThread.bufferedReader);
                        System.out.println("after closeBufferedReader");
                        receiveThread.bufferedReader = null;
                    }
                    receiveThread = null;
                    System.out.println("stop receiveThread");
                    //停止消息发送线程
                    serverSendThread.stop();
                }
                onLineClient.remove(userIP);
                System.out.println("用户:" + userIP
                    + " 退出,当前在线人数:" + onLineClient.size());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 获取已接连的客户端
         */
        public Socket getConnectdClient(String clientID) {
            return onLineClient.get(clientID);
        }
    
        /**
         * 打印已经连接的客户端
         */
        public static void printAllClient() {
            if (onLineClient == null) {
                return;
            }
            Iterator<String> inter = onLineClient.keySet().iterator();
            while (inter.hasNext()) {
                System.out.println("client:" + inter.next());
            }
        }
    
        /**
         * 阻塞线程,millis为0则永久阻塞,知道调用notify()
         */
        public void toWaitAll(Object o) {
            synchronized (o) {
                try {
                    o.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后
         */
        public void toNotifyAll(Object obj) {
            synchronized (obj) {
                obj.notifyAll();
            }
        }
    
        /**
         * 判断本地socket连接状态
         */
        private boolean isConnected() {
            if (socket.isClosed() || !socket.isConnected()) {
                onLineClient.remove(userIP);
                ServerReceiveThread.this.stop();
                System.out.println("socket closed...");
                return false;
            }
            return true;
        }
    
        /**
         * 数据接收线程
         */
        public class ReceiveThread extends Thread {
    
            private BufferedReader bufferedReader;
            private boolean isCancel;
    
            @Override
            public void run() {
                try {
                    while (!isCancel) {
                        if (!isConnected()) {
                            isCancel = true;
                            break;
                        }
    
                        String msg = SocketUtil.readFromStream(bufferedReader);
                        if (msg != null) {
                            if ("ping".equals(msg)) {
                                System.out.println("收到心跳包");
                                lastReceiveTime = System.currentTimeMillis();
                                socketServerResponseInterface.clientOnline(userIP);
                            } else {
                                msg = "用户" + userIP + " : " + msg;
                                System.out.println("我是服务端,我收到数据:"+msg);
                                //接收到消息 启动发消息的线程
                                this.sendMsg(msg);
                                socketServerResponseInterface.clientOnline(userIP);
                            }
                        } else {
                            System.out.println("client is offline...");
                            ServerReceiveThread.this.stop();
                            socketServerResponseInterface.clientOffline();
                            break;
                        }
                        System.out.println("ReceiveThread");
                    }
    
                    SocketUtil.inputStreamShutdown(socket);
                    SocketUtil.closeBufferedReader(bufferedReader);
                    System.out.println("ReceiveThread is finish");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            public void sendMsg(String msg) {
                //开启发送线程  这里最好优化成用线程池
                ExecutorService executorService = Executors.newCachedThreadPool();
                serverSendThread = new ServerSendThread(socket);
                serverSendThread.addMessage("服务端发送测试数据");
    
                if (socket.isConnected()) {
                    executorService.execute(serverSendThread);
                }
            }
        }
    
    
    
    }
    View Code
    ServerSendThread--消息发送类
    package com.whalecloud.uip.server.socket;
    
    import java.io.PrintWriter;
    import java.net.Socket;
    import java.util.Iterator;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    /**
     * @author lin.hongwen2@iwhalecloud.com
     * @date 2020/3/12 15:12
     */
    public class ServerSendThread implements Runnable  {
        private SendThread sendThread;
        private Socket socket;
    
        private volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>();
        private static ConcurrentHashMap<String, Socket> onLineClient = new ConcurrentHashMap<>();
    
        private long lastReceiveTime = System.currentTimeMillis();
    
        private String userIP;
    
        public String getUserIP() {
            return userIP;
        }
    
        public ServerSendThread(Socket socket) {
            this.socket = socket;
            this.userIP = socket.getInetAddress().getHostAddress();
        }
    
        @Override
        public void run() {
            try {
                //开启发送线程
                sendThread = new SendThread();
                sendThread.printWriter = new PrintWriter(socket.getOutputStream(), true);
                sendThread.start();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 断开socket连接
         */
        public void stop() {
            try {
                System.out.println("stop");
                if (sendThread != null) {
                    sendThread.isCancel = true;
                    toNotifyAll(sendThread);
                    sendThread.interrupt();
                    if (sendThread.printWriter != null) {
                        //防止写数据时停止,写完再停
                        synchronized (sendThread.printWriter) {
                            SocketUtil.closePrintWriter(sendThread.printWriter);
                            sendThread.printWriter = null;
                        }
                    }
                    sendThread = null;
                    System.out.println("stop sendThread");
                }
                onLineClient.remove(userIP);
                System.out.println("用户:" + userIP
                    + " 退出,当前在线人数:" + onLineClient.size());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 发送消息
         */
        public void addMessage(String data) {
            if (!isConnected()) {
                return;
            }
    
            dataQueue.offer(data);
            //有新增待发送数据,则唤醒发送线程
            toNotifyAll(dataQueue);
        }
    
        /**
         * 获取已接连的客户端
         */
        public Socket getConnectdClient(String clientID) {
            return onLineClient.get(clientID);
        }
    
        /**
         * 打印已经连接的客户端
         */
        public static void printAllClient() {
            if (onLineClient == null) {
                return;
            }
            Iterator<String> inter = onLineClient.keySet().iterator();
            while (inter.hasNext()) {
                System.out.println("client:" + inter.next());
            }
        }
    
        /**
         * 阻塞线程,millis为0则永久阻塞,知道调用notify()
         */
        public void toWaitAll(Object o) {
            synchronized (o) {
                try {
                    o.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后
         */
        public void toNotifyAll(Object obj) {
            synchronized (obj) {
                obj.notifyAll();
            }
        }
    
        /**
         * 判断本地socket连接状态
         */
        private boolean isConnected() {
            if (socket.isClosed() || !socket.isConnected()) {
                onLineClient.remove(userIP);
                ServerSendThread.this.stop();
                System.out.println("socket closed...");
                return false;
            }
            return true;
        }
    
        /**
         * 数据发送线程,当没有发送数据时让线程等待
         */
        public class SendThread extends Thread {
    
            private PrintWriter printWriter;
            private boolean isCancel;
    
            @Override
            public void run() {
                try {
                    while (!isCancel) {
                        if (!isConnected()) {
                            isCancel = true;
                            break;
                        }
    
                        String msg = dataQueue.poll();
                        if (msg == null) {
                            toWaitAll(dataQueue);
                        } else if (printWriter != null) {
                            synchronized (printWriter) {
                                SocketUtil.write2Stream(msg, printWriter);
                            }
                        }
                        System.out.println("SendThread");
                    }
    
                    SocketUtil.outputStreamShutdown(socket);
                    SocketUtil.closePrintWriter(printWriter);
                    System.out.println("SendThread is finish");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    View Code
    
    

    测试后发现服务端可以收到心跳+客户端发送过来的数据

    客户端发送的信息:

     我这里尝试了启动两个client。两个client发不同的信息。然后server端根据接收到的信息发送不同的值。

    测试后发现发送的数据不会socket互串。只会对应的客户端收到信息

    gitLab项目地址:  https://github.com/linHongWenGithub/socketLearn

  • 相关阅读:
    谨慎的覆盖clone()方法
    siverlight 实现神奇罗盘
    Java读取一个目录(文件夹)下的文件例子
    关于 Web 字体:现状与未来
    logistic回归
    泸沽湖的介绍
    with grant option与with admin option区别
    sqlldr日期格式
    对List顺序,逆序,随机排列实例代码
    索引组织表(IOT)
  • 原文地址:https://www.cnblogs.com/linhongwenBlog/p/12456590.html
Copyright © 2011-2022 走看看