java我用了java.nio,没有用nettiy,mina等框架,因为这些框架让我看起来更难理解原理。
偶封装的只是用来玩滴,没有经过实际项目的磨练,还需要不断润色和加工的,
后续需要研究的:传输对象,或者至少有类型,
类型、顺序、值
--------------------------------
java代码:
View Code
1 package my;
2
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.nio.ByteBuffer;
6 import java.nio.channels.SelectionKey;
7 import java.nio.channels.Selector;
8 import java.nio.channels.ServerSocketChannel;
9 import java.nio.channels.SocketChannel;
10 import java.nio.channels.spi.SelectorProvider;
11 import java.nio.charset.Charset;
12 import java.util.Iterator;
13
14 /**
15 * @author elvisjiang
16 */
17 public class SocketServer implements Runnable {
18
19 private int port1 = 8090;
20 private Selector selector;
21 private ServerSocketChannel serverChannel;
22 private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
23 private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
24 private SocketChannel socketChannel;
25 private Charset charset = Charset.forName("utf-8");
26
27 public SocketServer(){
28 initSelector();
29 }
30 public void initSelector() {
31 try {
32 selector = Selector.open();
33 serverChannel = ServerSocketChannel.open();
34 serverChannel.configureBlocking(false);
35 InetSocketAddress isa = new InetSocketAddress("localhost", this.port1);
36 serverChannel.socket().bind(isa);
37 serverChannel.register(selector, SelectionKey.OP_ACCEPT);
38 } catch (IOException e) {
39 e.printStackTrace();
40 }
41 }
42
43 @Override
44 public void run() {
45 while (true) {
46 try {
47 //在Selector对象中保存了selectedKeys,而该列表是一个HashSet。Selector每次在监听到有消息时,
48 //会将与之对应的SocketChannel加入到该HashSet中,并将update,并且会唤醒selector.select(0)阻塞。
49 selector.select();
50 Iterator iter = selector.selectedKeys().iterator();
51 while (iter.hasNext()) {
52 SelectionKey key = (SelectionKey) iter.next();
53 iter.remove();
54
55 if (!key.isValid()) {
56 continue;
57 }
58
59 if (key.isAcceptable()) {//新的连接
60 this.accept(key);
61 } else if (key.isReadable()) {//可读
62 this.read(key);
63 } else if (key.isWritable()) {
64 this.write(key);
65 }
66 }
67 } catch (IOException e) {
68 e.printStackTrace();
69 }
70 try {
71 Thread.sleep(100);
72 } catch (InterruptedException e) {
73 e.printStackTrace();
74 }
75 }
76 }
77
78 public void accept(SelectionKey key) throws IOException {
79 ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
80 if (serverSocketChannel.equals(serverChannel)) {
81 socketChannel = serverSocketChannel.accept();
82 socketChannel.configureBlocking(false);
83 socketChannel.register(this.selector, SelectionKey.OP_READ);
84 System.out.println("[server]有一个客户端连接到服务器了");
85 }
86 }
87
88 public void read(SelectionKey key) throws IOException {
89 SocketChannel socketChannel = (SocketChannel) key.channel();
90
91 this.readBuffer.clear();
92
93 // Attempt to read off the channel
94 int numRead;
95 try {
96 numRead = socketChannel.read(this.readBuffer);
97 } catch (IOException e) {
98 // The remote forcibly closed the connection, cancel
99 // the selection key and close the channel.
100 key.cancel();
101 socketChannel.close();
102 return;
103 }
104
105 if (numRead == -1) {
106 // Remote entity shut the socket down cleanly. Do the
107 // same from our end and cancel the channel.
108 key.channel().close();
109 key.cancel();
110 return;
111 }
112
113 readBuffer.flip();
114 // System.out.println(readBuffer.remaining());
115
116
117 // readBuffer.compact();
118 int size = readBuffer.getInt();
119 int request = readBuffer.getInt();
120
121 // System.out.println(readBuffer.position());
122 // System.out.println(readBuffer.remaining());
123
124 String content = decode(readBuffer);
125 System.out.println("<<<[commandId="+ request +",size="+ size + ",content:" + content + "]");
126
127 key.interestOps(SelectionKey.OP_WRITE);
128 key.attach(request);
129
130 }
131 public void write(SelectionKey key) {
132 SocketChannel socketChannel = (SocketChannel) key.channel();
133 if(readBuffer.array().length > 0)
134 {
135 readBuffer.flip();
136 int size = readBuffer.getInt();
137 int request = readBuffer.getInt();
138 int id = readBuffer.getInt();
139 String name = decode(readBuffer);
140
141 writeBuffer.clear();
142 if(request == Commands.REQ_TEST)
143 {
144 writeBuffer.putInt(size);
145 writeBuffer.putInt(request);
146 writeBuffer.putInt(id);
147 writeBuffer.put(name.getBytes());
148 }
149 writeBuffer.flip();
150 try{
151 System.out.println(">>>[commandId="+ request +",size="+ size + ",content:]");
152
153 socketChannel.write(writeBuffer);
154 key.interestOps(SelectionKey.OP_READ);
155 }catch(IOException io){
156 io.printStackTrace();
157 }
158 }
159
160 }
161 private String decode(ByteBuffer byteBuffer) {
162 return charset.decode(byteBuffer).toString();
163 }
164
165 private ByteBuffer encode(String str) {
166 return charset.encode(str);
167 }
168 public static void main(String[] args) {
169 SocketServer server = new SocketServer();
170 Thread t = new Thread(server);
171 t.start();
172 }
173
174 }
AS3代码:
View Code
1 package baseSocket
2 {
3 import flash.events.Event;
4 import flash.events.EventDispatcher;
5 import flash.events.IOErrorEvent;
6 import flash.events.ProgressEvent;
7 import flash.events.SecurityErrorEvent;
8 import flash.net.Socket;
9 import flash.system.Security;
10 import flash.utils.ByteArray;
11
12 public class BinarySocket extends EventDispatcher
13 {
14 private var _host:String;
15 private var _port:uint;
16 private var _socket:Socket;
17
18 public function BinarySocket(host:String,port:uint = 80)
19 {
20 this._host = host;
21 this._port = port;
22 this._socket = new Socket();
23 Security.loadPolicyFile("xmlsocket://" + this.host + ":" + this.port);
24 this._socket.addEventListener(Event.CONNECT, handler);
25 this._socket.addEventListener(Event.CLOSE, handler);
26 this._socket.addEventListener(IOErrorEvent.IO_ERROR, handler);
27 this._socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, handler);
28 this._socket.addEventListener(ProgressEvent.SOCKET_DATA, handler);
29 }
30 private function handler(event:Event):void
31 {
32 switch(event.type)
33 {
34 case Event.CLOSE:
35 this.dispatchEvent(new BaseSocketEvent(BaseSocketEvent.CLOSE));
36 break;
37 case Event.CONNECT:
38 case IOErrorEvent.IO_ERROR:
39 case SecurityErrorEvent.SECURITY_ERROR:
40 this.dispatchEvent(new BaseSocketEvent(event.type));
41 break;
42 case ProgressEvent.SOCKET_DATA:
43 this.received();
44 break;
45
46 }
47 }
48 private function received():void{
49 var bytes:ByteArray = new ByteArray();
50 while(_socket.bytesAvailable > 0)
51 {
52 _socket.readBytes(bytes,0,_socket.bytesAvailable);
53 }
54 var packLength:uint = bytes.readUnsignedInt();
55 var commandId:uint = bytes.readUnsignedInt();
56 bytes.position = 0;
57 trace("<<<[commandId="+ commandId +",size="+ packLength + ",content:]");
58 this.dispatchEvent(new BaseSocketEvent(BaseSocketEvent.RESPONSE,bytes));
59 }
60 public function send(params:ByteArray):void
61 {
62 if(!this.connected || params == null)
63 {
64 return;
65 }
66 _socket.writeBytes(params);
67 _socket.flush();
68 this.dispatchEvent(new BaseSocketEvent(BaseSocketEvent.REQUEST, params));
69 }
70 /**
71 * 发包
72 * @param commandId 协议号
73 * @param content 协议内容
74 * @return 包结构:[包长、协议号、内容]
75 *
76 */
77 public function request(commandId:uint,content:ByteArray):void
78 {
79 if(commandId == 0 )
80 {
81 throw new Error("请注意协议号为0");
82 }
83
84 var packLength:uint = 4+4 + content.length;
85 var output:ByteArray = new ByteArray();
86 output.writeUnsignedInt(packLength);//包长
87 output.writeUnsignedInt(commandId);
88
89 if(content)
90 {
91 output.writeBytes(content);
92 }
93 trace(">>>[commandId="+ commandId +",size="+ packLength + ",content:]");
94 send(output);
95 }
96 public function get host():String
97 {
98 return _host;
99 }
100
101 public function get port():uint
102 {
103 return _port;
104 }
105 public function get connected():Boolean {
106 return this._socket.connected;
107 }
108
109 public function connect():void {
110 this._socket.connect(host, port);
111 }
112 public function close():void {
113 this._socket.close();
114 }
115 }
116 }
View Code
1 package baseSocket{
2 import flash.events.*;
3
4 public class BaseSocketEvent extends Event {
5
6 public static const SECURITY_ERROR:String = SecurityErrorEvent.SECURITY_ERROR;
7 public static const IO_ERROR:String = IOErrorEvent.IO_ERROR;
8 public static const DECODE_ERROR:String = "decode_error";
9 public static const RESPONSE:String = "response";
10 public static const REQUEST:String = "request";
11 public static const CLOSE:String = Event.CLOSE;
12 public static const CONNECT:String = Event.CONNECT;
13
14 private var _data:Object;
15
16 public function BaseSocketEvent(type:String, data:Object = null) {
17 super(type, true);
18 this._data = data;
19 }
20
21 public function get data():Object {
22 return _data;
23 }
24 }
25
26 }
View Code
1 package
2 {
3 import baseSocket.BaseSocketEvent;
4 import baseSocket.BinarySocket;
5
6 import flash.display.Sprite;
7 import flash.events.Event;
8 import flash.utils.ByteArray;
9
10 public class TestBinarySocket extends Sprite
11 {
12 public static var REQ_TEST:uint = 1001;
13 private var bs:BinarySocket;
14 public function TestBinarySocket()
15 {
16 bs = new BinarySocket("localhost",8090);
17 bs.addEventListener(BaseSocketEvent.CONNECT,handler);
18 bs.addEventListener(BaseSocketEvent.RESPONSE,res_handler);
19 bs.connect();
20
21 }
22
23 protected function handler(event:Event):void
24 {
25 bs.removeEventListener(BaseSocketEvent.CONNECT,handler);
26 var byteArray:ByteArray = new ByteArray();
27 byteArray.writeInt(1);//id
28 byteArray.writeUTFBytes("name");//name
29 bs.request(REQ_TEST,byteArray);
30
31 }
32 private function res_handler(evt:BaseSocketEvent):void
33 {
34 var resByteArray:ByteArray = evt.data as ByteArray;
35 var packLength:uint = resByteArray.readUnsignedInt();
36 var commandId:uint = resByteArray.readUnsignedInt();
37
38 switch(commandId)
39 {
40 case REQ_TEST:
41 trace("收到"+REQ_TEST+"的回包");
42 break;
43 }
44 }
45 }
46 }