以前上Java课的时候,老师要求,自行组队来做一个即时网络通信的课程设计。具体要求:使用Socket套接字和ServerSocket来开发一个基于c/s架构的小项目,服务器和客户端的UI采用Swing编程,具体业务逻辑采用多线程开发。现在过去这么久了,想去回忆一下,记录一下当时的点滴,作为一点点积累。我正在努力回忆..
我主要负责,服务器的设计开发,下面是我的部分代码。
一,UI部分是模仿别人写的,可自行设计。
二,业务部分(多线程处理)
1.线程管理类

package com.haoyudian.server.service; import java.io.IOException; import java.net.Inet4Address; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.haoyudian.common.util.Constants; import com.haoyudian.common.util.DateHelper; /** * 服务器管理类 接受用户登录、离线、转发消息 * * @author Scherrer * */ public class ServerManager { private ExecutorService executorService;// 线程池 private ServerSocket serverSocket = null; private Socket socket; private Boolean isStart = true; public ServerManager() { try { // 创建线程池,池中具有(cpu个数*50)条线程 executorService = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors() * 50); serverSocket = new ServerSocket(Constants.SERVER_PORT); System.out.println("服务器IP=" + Inet4Address.getLocalHost().getHostAddress()); } catch (Exception e) { e.printStackTrace(); exit(); } } /** * 退出方法 */ private void exit() { try { this.isStart = false; serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } public void start() { System.out.println(DateHelper.getDateByCN() + " 服务器已启动..."); try { while(isStart) { socket = serverSocket.accept(); String ip = socket.getRemoteSocketAddress().toString(); System.out.println(DateHelper.getDateByCN() + " 用户:" + ip + " 已建立连接"); // 为支持多用户并发访问,采用线程池管理每一个用户的连接请求 if (socket.isConnected()) { executorService.execute(new SocketTask(socket));// 添加到线程池 } } } catch (IOException e) { e.printStackTrace(); } finally { try { if (socket != null) socket.close(); if (serverSocket != null) serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } private final class SocketTask implements Runnable { private Socket socket = null; private InputThread inThread; private OutputThread outThread; private OutputThreadMap threadMap; public SocketTask(Socket socket) { this.socket = socket; threadMap = OutputThreadMap.getInstance(); } @Override public void run() { outThread = new OutputThread(socket);// // 先实例化写消息线程,(把对应用户的写线程存入map缓存器中) inThread = new InputThread(socket, outThread, threadMap);// 再实例化读消息线程 outThread.setStart(true); inThread.setStart(true); inThread.start(); outThread.start(); } } public static void main(String[] args) { new ServerManager().start(); } }
2.发送消息的线程

package com.haoyudian.server.service; import java.io.IOException; import java.io.ObjectOutputStream; import java.net.Socket; import com.haoyudian.common.bean.trans.TransObject; /** * 发送消息的线程 * * @author Scherrer * */ public class OutputThread extends Thread{ private ObjectOutputStream oos;//对象输出流 private TransObject<?> object;//传输对象 private boolean isStart = true;//循环标志 private Socket socket;//套接字 //private OutputThreadMap outMap;//发送现场缓存对象 public OutputThread(Socket s) { try { this.socket = s; //构造器里实例化对象输出流 oos = new ObjectOutputStream(this.socket.getOutputStream()); } catch (IOException e) { e.printStackTrace(); } } /** * 调用写消息线程,设置了消息之后,唤醒run方法,可以节约资源 * @param object */ public void setMessage(TransObject<?> object) { this.object = object; synchronized(this) { notify(); } } public void setStart(boolean isStart) { this.isStart = isStart; } @Override public void run() { try { while(isStart) { //没有消息写的时候,线程等待 synchronized (this) { wait(); } if (object != null) { oos.writeObject(object); oos.flush(); } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (oos != null) { oos.close(); } if (socket != null) { socket.close(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
3.接受消息的线程

package com.haoyudian.server.service; import java.io.IOException; import java.io.ObjectInputStream; import java.net.Socket; import java.util.List; import com.haoyudian.common.bean.TextMessage; import com.haoyudian.common.bean.User; import com.haoyudian.common.bean.trans.TransObject; import com.haoyudian.common.bean.trans.TransObjectType; import com.haoyudian.common.util.DateHelper; import com.haoyudian.server.dao.UserDao; import com.haoyudian.server.dao.UserDaoFactory; public class InputThread extends Thread{ private ObjectInputStream ois;//对象读入流 private Socket socket;//socket对象 private OutputThread outThread;//把接收的消息发送给用户 private OutputThreadMap map;//写消息的缓存器 private boolean isStart = true; public InputThread(Socket socket,OutputThread out,OutputThreadMap map) { try { this.socket = socket; this.outThread = out; this.map = map; this.ois = new ObjectInputStream(socket.getInputStream()); } catch (Exception e) { e.printStackTrace(); } } public void setStart(boolean isStart) {// 提供接口给外部关闭读消息线程 this.isStart = isStart; } @Override public void run() { try { while(isStart) { //读消息 try { readMessage(); } catch (Exception e) { //e.printStackTrace(); break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (ois != null) { ois.close(); } if (socket != null) { socket.close(); } } catch (IOException e) { e.printStackTrace(); } } } private void readMessage() throws IOException, ClassNotFoundException { Object readObj = ois.readObject();//读取消息对象 UserDao dao = UserDaoFactory.getInstance();// 通过dao模式管理后台 if (readObj != null && readObj instanceof TransObject) { TransObject<?> transObj = (TransObject<?>) readObj;//转换成传输对象 switch (transObj.getType()) { case REGISTER://注册 User user = (User) transObj.getObject(); int regAccout = dao.register(user); System.out.println(DateHelper.getDateByCN() + "新用户注册: " + regAccout); //回复用户 TransObject<User> regResult = new TransObject<User>(TransObjectType.REGISTER); User u = new User(); u.setAccount(regAccout); regResult.setObject(u); System.out.println("验证一下账号: " + u.getAccount()); outThread.setMessage(regResult); break; case LOGIN: User loginUser = (User) transObj.getObject(); List<User> list = dao.login(loginUser); System.out.println("好友列表: " + list.size()); //返回 list TransObject<List<User>> msg = new TransObject<>(TransObjectType.LOGIN); if (list != null ) {//登陆成功 TransObject<User> userOnlineMsg = new TransObject<User>(TransObjectType.LOGIN); //此处 new 一个User ,只广播用户账号,如果用loginUser,则可能泄露密码 User tempUser = new User(); tempUser.setAccount(loginUser.getAccount()); //tempUser.setNickname(nickname)//考虑广播昵称 userOnlineMsg.setObject(tempUser); for(OutputThread out : map.getAll()) {//拿到所有用户的发送线程 out.setMessage(userOnlineMsg); } //记录当前用户的发送线程 map.add(loginUser.getAccount(), outThread); //设置消息 msg.setObject(list); } //发送 outThread.setMessage(msg); System.out.println(DateHelper.getDateByCN() + " 用户:" + loginUser.getAccount() + " 上线了"); break; case MESSAGE: // 如果是转发消息(可添加群发) // 获取消息中要转发的对象id,然后获取缓存的该对象的写线程 int toAccount = transObj.getToUser().getAccount();//获取账号 OutputThread ot = map.getById(toAccount);//获取发送线程 if (ot != null) { ot.setMessage(transObj);//把接收到的消息对象直接发送给另一个用户 } else {//用户的缓存发送线程为空,表示表示用户已下线,回复用户 TextMessage text = new TextMessage(); text.setTextMessage("对方离线,您的消息暂时保存在服务器"); TransObject<TextMessage> msgTip = new TransObject<>(TransObjectType.MESSAGE); msgTip.setObject(text); User tempU = new User(); tempU.setAccount(0); msgTip.setFromUser(tempU); outThread.setMessage(msgTip); } break; case LOGOUT://下线处理 // 如果是退出,更新数据库在线状态,同时群发告诉所有在线用户 User logoutUser = (User) transObj.getObject(); System.out.println(DateHelper.getDateByCN() + "用户: " + logoutUser.getNickname() + "下线了哈"); dao.logout(logoutUser); //结束自己的读消息的线程 isStart = false; //移除自己的缓存线程 map.remove(logoutUser.getAccount()); outThread.setMessage(null);// 先要设置一个空消息去唤醒写线程 outThread.setStart(false);// 再结束写线程循环 TransObject<User> offObject = new TransObject<User>( TransObjectType.LOGOUT); User logout2User = new User(); logout2User.setAccount(logoutUser.getAccount()); offObject.setObject(logout2User); for (OutputThread offOut : map.getAll()) {// 广播用户下线消息 offOut.setMessage(offObject); } break; case Refresh_FRIEND_LIST://更新好友 List<User> refreshList = dao.refreshFriendList(transObj.getFromUser().getAccount()); TransObject<List<User>> refreshMsg = new TransObject<>(TransObjectType.Refresh_FRIEND_LIST); refreshMsg.setObject(refreshList); outThread.setMessage(refreshMsg); break; } } } }
其中,线程池那部分的使用,我觉得很有用,我要特别留意一下

public ServerManager() { try { // 创建线程池,池中具有(cpu个数*50)条线程 executorService = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors() * 50); serverSocket = new ServerSocket(Constants.SERVER_PORT); System.out.println("服务器IP=" + Inet4Address.getLocalHost().getHostAddress()); } catch (Exception e) { e.printStackTrace(); exit(); } } try { while(isStart) { socket = serverSocket.accept(); String ip = socket.getRemoteSocketAddress().toString(); System.out.println(DateHelper.getDateByCN() + " 用户:" + ip + " 已建立连接"); // 为支持多用户并发访问,采用线程池管理每一个用户的连接请求 if (socket.isConnected()) { executorService.execute(new SocketTask(socket));// 添加到线程池 } } }