zoukankan      html  css  js  c++  java
  • java socket 模拟im 即时通讯

    自己想了一下怎么实现,就写了,没有深究是否合理.更多处理没有写下去,例如收件人不在线,应该保存在数据库,等下一次连接的时候刷新map,再把数据发送过去,图片发送也没有做,也没有用json格式

    socket很奇怪,我用客户端连接上了服务器,没有发送消息的情况下,断开电脑网络,是不会出现问题,然后在把电脑网络连接上,通讯依然正常,正常断开也不出问题,但是用idea直接按stop键,那么服务端就会出问题了,读取事件会一直为true,造成死循环,消耗CPU,所以必须要判断一下客户端连接是否断开了

    只需要把客户端代码启动几个,修改一些userName以及收件人,就可以测试,实现类似QQ微信即时通讯,聊天功能

    服务端代码

    package serversocketchannel;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * 
     * @author ZhenWeiLai
     *
     */
    public class ServerSocketChannelNonBlocking {
        private static ServerSocketChannel serverSocketChannel = null;
        private static Charset charset = Charset.forName("GBK");//设置编码集,用于编码,解码
        private static Selector selector = null;
        //保存客户端的map
        private static final ConcurrentHashMap<String,SocketChannel> clientSockets = new ConcurrentHashMap<>();
        static{
            try {
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.socket().setReuseAddress(true);
                serverSocketChannel.socket().bind(new InetSocketAddress(8000));
                serverSocketChannel.configureBlocking(false);//设置为非阻塞
                selector = Selector.open();//实例化一个选择器
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        public static void main(String[] args) {
                service();
        }
        
        private static void service(){
            SocketChannel clientChannel = null;
            SelectionKey selectionKey = null;
            SocketChannel targetChannel = null;
            try {
                serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);//服务端监听连接
                while(true){
                    selector.select();//阻塞至有新的连接就开始处理
                    Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
                    while(selectionKeys.hasNext()){
                        selectionKey = selectionKeys.next();
                        if(selectionKey.isAcceptable()){//如果事件是连接事件
                            ServerSocketChannel serverChannel = (ServerSocketChannel)selectionKey.channel();//获取事件绑定的channel
                            clientChannel = serverChannel.accept();//连接获取带客户端信息的socketChannel
                            clientChannel.configureBlocking(false);//客户设置为非阻塞,因为非阻塞才支持选择器.避免盲等浪费资源
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);//作为每一个客户端的附件缓冲器
                            /**
                             * 只监听读事件,这里千万别监听写事件,因为只要连接有效,那么写事件会一直为true,导致死循环,很耗资源
                             * 可以跟serverSocket用同一个选择器,因为绑定的channel不同
                             */
                            clientChannel.register(selector,SelectionKey.OP_READ,byteBuffer);
                        }else if(selectionKey.isReadable()){//只要有客户端写入,那么就可以处理
                            //获取客户端附件,也就是写入的数据
                            ByteBuffer byteBuffer = (ByteBuffer)selectionKey.attachment();
                            //从selectionKey获取客户端的channel
                            SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
                            //把附件读出,解码为字符串
                            String msg = read(socketChannel,byteBuffer);
                            //这里用了->分割收件人,->后面跟着的字符串是收件人
                            if(msg.indexOf("->")!=-1){
                                //内容
                                String content = msg.substring(0,msg.lastIndexOf("->"));
                                //从map里获取收件人的socket
                                targetChannel = clientSockets.get(msg.substring(msg.lastIndexOf("->")+2));
                                //实例化一个缓冲区,用来写出到收件人的socketChannel
                                ByteBuffer temp = ByteBuffer.allocate(1024);
                                temp.put(charset.encode(content));
                                //写出
                                handleWrite(targetChannel,temp);
                            }else{
                                //如果内容没有收件人,那么视为第一次连接,客户端发过来的userName,作为KEY存入MAP
                                clientSockets.put(msg,socketChannel);
                            }
                        }
                        selectionKeys.remove();
                    }
                }
            } catch (IOException e) {
                try {
                    if(selectionKey!=null)selectionKey.cancel();
                    if(clientChannel!=null){
                        clientChannel.shutdownInput();
                        clientChannel.shutdownOutput();
                        clientChannel.close();
                    }
                    if(targetChannel!=null){
                        targetChannel.shutdownInput();
                        targetChannel.shutdownOutput();
                        targetChannel.close();
                    }
                } catch (IOException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
                e.printStackTrace();
            }
    
        }
        
        private static String read(SocketChannel socketChannel,ByteBuffer byteBuffer){
            //重置position limit为写入做准备
            byteBuffer.clear();
            try {
                int flag =socketChannel.read(byteBuffer);
                //判断客户端是否断开连接
                if(flag==-1){
                    //如果客户端无故断开,一定要关闭,否则读取事件一直为true造成死循环,非常耗资源
                    socketChannel.close();
                }
            } catch (IOException e) {
                try {
                    socketChannel.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                e.printStackTrace();
            }
            //position =0 limit等于有效下标,为写出做准备
            byteBuffer.flip();
            return charset.decode(byteBuffer).toString();
        }
        
        //写出
        private static void handleWrite(SocketChannel socketChannel,ByteBuffer byteBuffer){
            synchronized (byteBuffer) {
                byteBuffer.flip();
                try {
                    socketChannel.write(byteBuffer);
                } catch (IOException e) {
                    try {
                        socketChannel.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                    e.printStackTrace();
                }
            }
        }
    }

    客户端代码

    package socketchannel;
    
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.ClosedChannelException;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    
    /**
     * Created by lzw on 17-2-28.
     */
    public class SocketChannelNonBlockingClient {
        private static Charset charset = Charset.forName("GBK");
        private static ByteBuffer receiveBuffer = ByteBuffer.allocate(10240);
        private static ByteBuffer sendBuffer = ByteBuffer.allocate(10240);
        private static SocketChannel socketChannel = null;
        private static Selector selector = null;
        private static String userName = "client1";//客户端名
        private static String targetName = "client2";//收件人名
    
        public static void main(String[] args) {
            try {
                socketChannel = SocketChannel.open();
                //连接到服务端
                SocketAddress socketAddress = new InetSocketAddress("19.95.103.112",8000);
                selector = Selector.open();//实例化一个选择器
                socketChannel.configureBlocking(false);//设置为非阻塞
                //先监听一个连接事件
                socketChannel.register(selector,SelectionKey.OP_CONNECT);
                //连接
                socketChannel.connect(socketAddress);
                //jdk 1.8的lambda表达式,用一个线程监控控制台输入
                new Thread(()->{
                        try {
                            receiveFromUser();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                }).start();
    
                talk();
    
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        private static void talk(){
            try {
                while(true){
                    selector.select();//阻塞直到连接事件
                    Iterator<SelectionKey> readyKeys = selector.selectedKeys().iterator();
                   while(readyKeys.hasNext()){
                        SelectionKey key =readyKeys.next();
                        if(key.isConnectable()){
                            //非阻塞的情况下可能没有连接完成,这里调用finishConnect阻塞至连接完成
                            socketChannel.finishConnect();
                            //连接完成以后,先发送自己的userName以便保存在服务端的客户端map里面
                            synchronized (sendBuffer){
                                SocketChannel socketChannel1 = (SocketChannel)key.channel();
                                sendBuffer.clear();
                                sendBuffer.put(charset.encode(userName));
                                send(socketChannel1);
                                socketChannel.register(selector,SelectionKey.OP_READ);//仅监听一个读取事件
                            }
    
                        }else if(key.isReadable()){
                            //处理读事件
                            receive(key);
                        }
                        readyKeys.remove();
                    }
                }
            } catch (ClosedChannelException e) {
                try {
                    socketChannel.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
    
        /**
         * 从控制台获取用户输入
         * @throws IOException
         */
        private static void receiveFromUser() throws IOException{
            //阻塞直到控制台有输入
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            for(String msg = br.readLine();msg!=null&&!msg.equals("bye");msg = br.readLine()){
                //同步锁避免线程竞争
                synchronized (sendBuffer) {
                    sendBuffer.clear();
                    //编码
                    sendBuffer.put(charset.encode(msg));
                    //分割副
                    sendBuffer.put(charset.encode("->"));
                    //目标名
                    sendBuffer.put(charset.encode(targetName));
                    send(socketChannel);
                }
            }
        }
        /**
         * 接收服务端的数据
         * @param key
         */
        private static void receive(SelectionKey key) throws IOException {
            //获取服务端的channel
            SocketChannel channel = (SocketChannel) key.channel();
            //为写入缓冲器做准备position=0,limit=capacity
                receiveBuffer.clear();
                //从服务端的channel把数据读入缓冲器
                channel.read(receiveBuffer);
                //position=0,limit=有效下标最后一位
                receiveBuffer.flip();
                //解码
                String msg = charset.decode(receiveBuffer).toString();
                //输出到控制台
                System.out.println(msg);
        }
    
        /**
         * 发送到服务端
         */
        private static void send(SocketChannel sendChannel) throws IOException {
                if(sendBuffer.remaining()!=0){
                    synchronized (sendBuffer){
                        sendBuffer.flip();
                        sendChannel.write(sendBuffer);
                    }
                }
        }
    }
  • 相关阅读:
    [fw]error: aggregate value used where an integer was expected
    [fw]awk求和
    [fw]谈EXPORT_SYMBOL使用
    [fw]用Kprobes调试(debug)内核
    [FW]使用kprobes查看内核内部信息
    linux缺頁異常處理--內核空間[v3.10]
    用C语言给指定的内存地址赋值(通过指针)
    [fw]Linux 的 time 指令
    how to prevent lowmemorykiller from killing processes
    Android成长日记-Noification实现状态栏通知
  • 原文地址:https://www.cnblogs.com/sweetchildomine/p/6517228.html
Copyright © 2011-2022 走看看