这篇博客主要就铺代码吧,Mina的一些基础知识可以参考:
http://www.cnblogs.com/huangfox/p/3458272.html
场景假设:
1.客户端发送用户信息,服务端根据用户名查询用户年龄。(模拟查询)
2.同步请求
3.协议:直接采用字段类型编码解码。
具体代码结构:
codec负责编码解码,TCPAcceptor服务端,TCPConnector客户端,User业务对象。
User
package com.fox.mina.base.c2; /** * @author huangfox * @date 2013年12月3日 上午11:23:55 * */ public class User { String name; int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "User [name=" + name + ", age=" + age + "]"; } }
编码、解码工厂
DefaultMinaCodecFactory
package com.fox.mina.base.c2.codec; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; /** * @author huangfox * @date 2013年12月5日 下午7:48:47 * */ public class DefaultMinaCodecFactory implements ProtocolCodecFactory { ProtocolEncoder encoder; ProtocolDecoder decoder; public DefaultMinaCodecFactory() { } public DefaultMinaCodecFactory(ProtocolEncoder encoder, ProtocolDecoder decoder) { super(); this.encoder = encoder; this.decoder = decoder; } @Override public ProtocolEncoder getEncoder(IoSession session) throws Exception { // TODO Auto-generated method stub return encoder; } @Override public ProtocolDecoder getDecoder(IoSession session) throws Exception { // TODO Auto-generated method stub return decoder; } public ProtocolEncoder getEncoder() { return encoder; } public void setEncoder(ProtocolEncoder encoder) { this.encoder = encoder; } public ProtocolDecoder getDecoder() { return decoder; } public void setDecoder(ProtocolDecoder decoder) { this.decoder = decoder; } }
编码器
FEncode
package com.fox.mina.base.c2.codec; import org.apache.commons.lang.ArrayUtils; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderAdapter; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import com.fox.mina.base.c2.User; /** * @author huangfox * @date 2013年12月5日 下午7:49:21 * */ public class FEncoder extends ProtocolEncoderAdapter { public static final NumberCodec numberCodec = DefaultNumberCodecs .getBigEndianNumberCodec(); @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { byte[] bytes = null; if (message instanceof User) { User user = (User) message; byte[] name = user.getName().getBytes(); bytes = ArrayUtils.addAll(bytes, numberCodec.int2Bytes(name.length, 4)); bytes = ArrayUtils.addAll(bytes, name); bytes = ArrayUtils.addAll(bytes, numberCodec.int2Bytes(user.getAge(), 4)); out.write(IoBuffer.wrap(bytes)); } else { System.out.println("encoder error!"); } } }
编码的协议:
用户姓名的长度(int 4字节)
用户年龄(int 4字节)
用户姓名(byte[])
解码器
FDecode
package com.fox.mina.base.c2.codec; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import com.fox.mina.base.c2.User; /** * @author huangfox * @date 2013年12月3日 上午11:35:46 * */ public class FDecoder extends CumulativeProtocolDecoder { public static final NumberCodec numberCodec = DefaultNumberCodecs .getBigEndianNumberCodec(); @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { int len = 0; Object lenAttr = session.getAttribute("len"); if (lenAttr == null) { if (in.remaining() < 4) { return false; } byte[] buffer = new byte[4]; in.get(buffer); len = numberCodec.bytes2Int(buffer, 4); session.setAttribute("len", len); } else { len = (Integer) lenAttr; } // String name = ""; Object nameAttr = session.getAttribute("name"); if (nameAttr == null) { if (in.remaining() < len) { return false; } byte[] buffer = new byte[len]; in.get(buffer); name = new String(buffer); session.setAttribute("name", name); } else { name = (String) nameAttr; } if (in.remaining() < 4) { return false; } byte[] buffer = new byte[4]; in.get(buffer); int age = numberCodec.bytes2Int(buffer, 4); User u = new User(); u.setName(name); u.setAge(age); // out.write(u); // session.removeAttribute("len"); session.removeAttribute("name"); return true; } }
编码、解码使用的工具类:
package com.fox.mina.base.c2.codec; public interface NumberCodec { String convertCharset(String charset); byte[] short2Bytes(short value, int byteLength); byte[] int2Bytes(int value, int byteLength); byte[] long2Bytes(long value, int byteLength); byte[] float2Bytes(float value, int byteLength); byte[] double2Bytes(double value, int byteLength); short bytes2Short(byte[] bytes, int byteLength); int bytes2Int(byte[] bytes, int byteLength); long bytes2Long(byte[] bytes, int byteLength); float bytes2Float(byte[] bytes, int byteLength); double bytes2Double(byte[] bytes, int byteLength); }
package com.fox.mina.base.c2.codec; public class DefaultNumberCodecs { private static int b2ui(byte b) { return (int) (b + 256) % 256; } private static long b2ul(byte b) { return (long) (b + 256) % 256; } private static NumberCodec littleEndianCodec = new NumberCodec() { public int bytes2Int(byte[] bytes, int byteLength) { int value = 0; for (int i = 0; i < byteLength; i++) { value |= b2ui(bytes[i]) << (i * 8); } return value; } public long bytes2Long(byte[] bytes, int byteLength) { long value = 0; for (int i = 0; i < byteLength; i++) { value |= b2ul(bytes[i]) << (i * 8); } return value; } public short bytes2Short(byte[] bytes, int byteLength) { short value = 0; for (int i = 0; i < byteLength; i++) { value |= b2ui(bytes[i]) << (i * 8); } return value; } @Override public float bytes2Float(byte[] bytes, int byteLength) { int value = 0; for (int i = 0; i < byteLength; i++) { value |= b2ui(bytes[i]) << (i * 8); } return Float.intBitsToFloat(value); } @Override public double bytes2Double(byte[] bytes, int byteLength) { long value = 0; for (int i = 0; i < byteLength; i++) { value |= b2ul(bytes[i]) << (i * 8); } return Double.longBitsToDouble(value); } public byte[] int2Bytes(int value, int byteLength) { byte[] bytes = new byte[byteLength]; for (int i = 0; i < byteLength; i++) { int shiftCount = i * 8; bytes[i] = (byte) ((value & (0x000000ff << shiftCount)) >> shiftCount); } return bytes; } public byte[] long2Bytes(long value, int byteLength) { byte[] bytes = new byte[byteLength]; for (int i = 0; i < byteLength; i++) { int shiftCount = i * 8; bytes[i] = (byte) ((value & (0x00000000000000ffL << shiftCount)) >> shiftCount); } return bytes; } public byte[] short2Bytes(short value, int byteLength) { byte[] bytes = new byte[byteLength]; for (int i = 0; i < byteLength; i++) { int shiftCount = i * 8; bytes[i] = (byte) ((value & (0x00ff << shiftCount)) >> shiftCount); } return bytes; } @Override public byte[] float2Bytes(float value, int byteLength) { byte[] bytes = new byte[byteLength]; // parse the bits that represent the floating-point number // floatToRawIntBits gives the raw float bits without normalization // using floatToRawIntBits is over 5 times as fast as // floatToIntBits. int x = Float.floatToRawIntBits(value); for (int i = 0; i < byteLength; i++) { int shiftCount = i * 8; bytes[i] = (byte) ((x & (0x000000ff << shiftCount)) >> shiftCount); } return bytes; } @Override public byte[] double2Bytes(double value, int byteLength) { byte[] bytes = new byte[byteLength]; // parse the the bits that represent the floating-point number long x = Double.doubleToRawLongBits(value); for (int i = 0; i < byteLength; i++) { int shiftCount = i * 8; bytes[i] = (byte) ((x & (0x00000000000000ffL << shiftCount)) >> shiftCount); } return bytes; } public String convertCharset(String charset) { if (charset.equals("UTF-16")) { return "UTF-16LE"; } else { return charset; } } }; private static NumberCodec bigEndianCodec = new NumberCodec() { public int bytes2Int(byte[] bytes, int byteLength) { int value = 0; for (int i = 0; i < byteLength; i++) { value |= b2ui(bytes[i]) << ((byteLength - 1 - i) * 8); } return value; } public long bytes2Long(byte[] bytes, int byteLength) { long value = 0; for (int i = 0; i < byteLength; i++) { value |= b2ul(bytes[i]) << ((byteLength - 1 - i) * 8); } return value; } public short bytes2Short(byte[] bytes, int byteLength) { short value = 0; for (int i = 0; i < byteLength; i++) { value |= b2ui(bytes[i]) << ((byteLength - 1 - i) * 8); } return value; } @Override public float bytes2Float(byte[] bytes, int byteLength) { int value = 0; for (int i = 0; i < byteLength; i++) { value |= b2ui(bytes[i]) << ((byteLength - 1 - i) * 8); } return Float.intBitsToFloat(value); } @Override public double bytes2Double(byte[] bytes, int byteLength) { long value = 0; for (int i = 0; i < byteLength; i++) { value |= b2ul(bytes[i]) << ((byteLength - 1 - i) * 8); } return Double.longBitsToDouble(value); } public byte[] int2Bytes(int value, int byteLength) { byte[] bytes = new byte[byteLength]; for (int i = 0; i < byteLength; i++) { int shiftCount = ((byteLength - 1 - i) * 8); bytes[i] = (byte) ((value & (0x000000ff << shiftCount)) >> shiftCount); } return bytes; } public byte[] long2Bytes(long value, int byteLength) { byte[] bytes = new byte[byteLength]; for (int i = 0; i < byteLength; i++) { int shiftCount = ((byteLength - 1 - i) * 8); bytes[i] = (byte) ((value & (0x00000000000000ffL << shiftCount)) >> shiftCount); } return bytes; } public byte[] short2Bytes(short value, int byteLength) { byte[] bytes = new byte[byteLength]; for (int i = 0; i < byteLength; i++) { int shiftCount = ((byteLength - 1 - i) * 8); bytes[i] = (byte) ((value & (0x00ff << shiftCount)) >> shiftCount); } return bytes; } @Override public byte[] float2Bytes(float value, int byteLength) { byte[] bytes = new byte[byteLength]; // parse the bits that represent the floating-point number // floatToRawIntBits gives the raw float bits without normalization // using floatToRawIntBits is over 5 times as fast as // floatToIntBits. int x = Float.floatToRawIntBits(value); for (int i = 0; i < byteLength; i++) { int shiftCount = ((byteLength - 1 - i) * 8); bytes[i] = (byte) ((x & (0x000000ffL << shiftCount)) >> shiftCount); } return bytes; } @Override public byte[] double2Bytes(double value, int byteLength) { byte[] bytes = new byte[byteLength]; // parse the the bits that represent the floating-point number long x = Double.doubleToRawLongBits(value); for (int i = 0; i < byteLength; i++) { int shiftCount = ((byteLength - 1 - i) * 8); bytes[i] = (byte) ((x & (0x00000000000000ffL << shiftCount)) >> shiftCount); } return bytes; } public String convertCharset(String charset) { if (charset.equals("UTF-16")) { return "UTF-16BE"; } else { return charset; } } }; public static NumberCodec getBigEndianNumberCodec() { return bigEndianCodec; } public static NumberCodec getLittleEndianNumberCodec() { return littleEndianCodec; } }
服务端
TCPAcceptor
package com.fox.mina.base.c2; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.apache.mina.core.session.IoSessionConfig; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import com.fox.mina.base.c2.codec.DefaultMinaCodecFactory; import com.fox.mina.base.c2.codec.FDecoder; import com.fox.mina.base.c2.codec.FEncoder; /** * @author huangfox * @date 2013年12月3日 上午11:15:53 * */ public class TCPAcceptor { IoAcceptor acceptor = null; String addr = "127.0.0.1"; int port = 9999; public TCPAcceptor() { } public void start() { acceptor = new NioSocketAcceptor(); // IoSessionConfig sessionConf = acceptor.getSessionConfig(); sessionConf.setReadBufferSize(2048); // filter chain(codec) acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new DefaultMinaCodecFactory( new FEncoder(), new FDecoder()))); // handler acceptor.setHandler(new IOHandler()); // try {// 可做重连 acceptor.bind(new InetSocketAddress(addr, port)); } catch (IOException e) { e.printStackTrace(); } System.out.println("server stated ... "); } private class IOHandler extends IoHandlerAdapter { @Override public void messageReceived(IoSession session, Object message) throws Exception { User u = (User) message; System.out.println("[server]" + u.toString()); //模拟业务处理 u.setAge(u.hashCode()%100); // send msg to client session.write(u); } } public static void main(String[] args) { TCPAcceptor server = new TCPAcceptor(); server.start(); } }
客户端
TCPConnector
package com.fox.mina.base.c2; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.util.Random; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.ReadFuture; import org.apache.mina.core.future.WriteFuture; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; import com.fox.mina.base.c2.codec.DefaultMinaCodecFactory; import com.fox.mina.base.c2.codec.FDecoder; import com.fox.mina.base.c2.codec.FEncoder; /** * @author huangfox * @date 2013年12月3日 上午11:15:46 * */ public class TCPConnector { IoConnector connector = null; IoSession session = null; String ip = "127.0.0.1"; int port = 9999; public TCPConnector() { connector = new NioSocketConnector(); connector.getSessionConfig().setUseReadOperation(true); connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new DefaultMinaCodecFactory( new FEncoder(), new FDecoder()))); connector.setHandler(new IOHander()); ConnectFuture connectF = connector.connect(new InetSocketAddress(ip, port)); connectF.awaitUninterruptibly(); session = connectF.getSession(); } Random r = new Random(); public void sendMsg(User u) { WriteFuture writeF = session.write(u); writeF.awaitUninterruptibly(); if (writeF.getException() != null) { System.out.println(writeF.getException().getMessage()); } else if (writeF.isWritten()) { System.out.println("msg was sent!"); // 发送、接受 ReadFuture readF = session.read(); readF.awaitUninterruptibly(1000); if (readF.getException() != null) { System.out.println(readF.getException().getMessage()); } else { System.out.println("[client]"+readF.getMessage().toString()); } } else { System.out.println("error!"); } } public void close() { this.connector.dispose(); } private class IOHander extends IoHandlerAdapter { } public static void main(String[] args) { TCPConnector client = new TCPConnector(); while (true) { BufferedReader r = new BufferedReader(new InputStreamReader( System.in)); try { System.out.println("输入:"); String msg = r.readLine(); User u = new User(); u.setName(msg); client.sendMsg(u); } catch (IOException e) { e.printStackTrace(); } } } }
注意:以上代码在多线程环境下是有问题的!!!
(最大字体的提示了!)
在TcpConnector中,只有一个session,如果多线程同时对这个session进行读写操作将发生问题(B可能拿到A的响应结果)。
最简单的处理方式就是,加入一个连接池,在sendMsg方法中先获取一个连接,用完以后再归还到连接池。可以理解为“一个线程独占一个连接”。
代码如下:
package com.fox.mina.base.c2; import java.net.InetSocketAddress; import java.util.Date; import java.util.Random; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.ReadFuture; import org.apache.mina.core.future.WriteFuture; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; import com.fox.mina.base.c2.codec.DefaultMinaCodecFactory; import com.fox.mina.base.c2.codec.FDecoder; import com.fox.mina.base.c2.codec.FEncoder; /** * @author huangfox * @date 2013年12月3日 上午11:15:46 * */ public class TCPConnector { IoConnector connector = null; // IoSession session = null; String ip = "127.0.0.1"; int port = 9999; int poolSize = 1; LinkedBlockingQueue<IoSession> pool; public TCPConnector() { connector = new NioSocketConnector(10); connector.getSessionConfig().setUseReadOperation(true); connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new DefaultMinaCodecFactory( new FEncoder(), new FDecoder()))); connector.setHandler(new IOHander()); // pool pool = new LinkedBlockingQueue<IoSession>(poolSize); for (int i = 0; i < poolSize; i++) { ConnectFuture connectF = connector.connect(new InetSocketAddress( ip, port)); connectF.awaitUninterruptibly(); IoSession session = connectF.getSession(); try { pool.put(session); } catch (InterruptedException e) { e.printStackTrace(); } } } Random r = new Random(); public void sendMsg(User u) { IoSession session = null; try { session = pool.take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (session == null) return; try { WriteFuture writeF = session.write(u); writeF.awaitUninterruptibly(); if (writeF.getException() != null) { System.out.println(writeF.getException().getMessage()); } else if (writeF.isWritten()) { // System.out.println("msg was sent!"); // 发送、接受 ReadFuture readF = session.read(); readF.awaitUninterruptibly(); if (readF.getException() != null) { System.out.println(readF.getException().getMessage()); } else { Date d = new Date(System.currentTimeMillis()); // System.out.println(Thread.currentThread().getId() + "-" // + d.getSeconds() + "-" + session.hashCode()); // System.out.println("[client]" // + readF.getMessage().toString()); } } else { System.out.println("error!"); } } finally { try { pool.put(session); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public void close() { this.connector.dispose(); } private class IOHander extends IoHandlerAdapter { } }
扩展:
1.加入连接池(见上文)
2.加入连接、读写超时