客户端:
package com.client; import java.net.InetSocketAddress; import java.util.Scanner; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; /** * netty客户端入门 */ public class Client { public static void main(String[] args) { //服务类 ClientBootstrap bootstrap = new ClientBootstrap(); //2个线程池 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); //设置niosocket工厂,传入2个线程池, bootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker)); //设置管道工厂 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { //工厂返回ChannelPipeline,是管道,管道里面装的是过滤器, @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); //传的是字节流,不是字符 pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("hiHandler", new ClientHandler()); return pipeline; } }); //连接服务端 ChannelFuture connect = bootstrap.connect(new InetSocketAddress("127.0.0.1", 10101)); Channel channel = connect.getChannel(); System.out.println("client start"); Scanner scanner = new Scanner(System.in); while(true){ System.out.println("请输入"); channel.write(scanner.next()); } } }
package com.client; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; /** * 消息接受处理类 */ public class ClientHandler extends SimpleChannelHandler { /** * 接收消息 */ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { String s = (String) e.getMessage(); System.out.println(s); super.messageReceived(ctx, e); } /** * 捕获异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { System.out.println("exceptionCaught"); super.exceptionCaught(ctx, e); } /** * 新连接 */ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelConnected"); super.channelConnected(ctx, e); } /** * 必须是链接已经建立,关闭通道的时候才会触发。服务端关闭了也调用。 */ @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelDisconnected"); super.channelDisconnected(ctx, e); } /** * channel关闭的时候触发,服务端关闭了也调用。 */ @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelClosed"); super.channelClosed(ctx, e); } }
服务端:
package com.server; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; /** * netty服务端入门 */ /* netty版本大致版本分为 netty3.x 和 netty4.x、netty5.x netty可以运用的领域: 1分布式进程通信 hadoop、dubbo、akka等具有分布式功能的框架,只要是java的,底层RPC通信都是基于netty实现的,这些框架使用的版本通常都还在用netty3.x 2、游戏服务器开发 页游手游都是java,服务器是netty或者mina,最新的游戏服务器有部分公司可能已经开始采用netty4.x 或 netty5.x */ public class Server { public static void main(String[] args) { //服务类 ServerBootstrap bootstrap = new ServerBootstrap(); //boss线程监听端口,worker线程负责数据读写 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); //设置niosocket工厂,boss和worker线程都会分配一个selector,boss的selector负责监听端口,worker的selector负责读写任务。 bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker)); //设置管道的工厂 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { //管道里面装的是过滤器, @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); //传的不是字符,是字节流。 //StringEncoder/StringDecoder就不用在接收消息的地方把ChannelBuffer转成string了。 pipeline.addLast("decoder", new StringDecoder());//StringDecoder是上行的管道。 pipeline.addLast("encoder", new StringEncoder());//StringEncoder是下行的管道。 pipeline.addLast("helloHandler", new ServerHandler());//ServerHandler是上行和下行管道。 return pipeline; } }); bootstrap.bind(new InetSocketAddress(10101));//接收消息在HelloHandler System.out.println("start!!!"); } }
package com.server; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; /** * 消息接受处理类 */ public class ServerHandler extends SimpleChannelHandler { /** * 接收消息 */ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { String s = (String) e.getMessage(); /*ChannelBuffer message = (ChannelBuffer)e.getMessage(); String s = new String(message.array());*/ System.out.println(s); //回写数据 ctx.getChannel().write("hi"); super.messageReceived(ctx, e); } /** * 捕获异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { System.out.println("exceptionCaught"); super.exceptionCaught(ctx, e); } /** * 新连接,通常用来检测IP是否是黑名单 */ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelConnected"); super.channelConnected(ctx, e); } /** * 必须是链接已经建立成功,关闭通道的时候才会触发,先触发channelDisconnected在触发channelClosed。 * * 可以再用户断线的时候清楚用户的缓存数据等 */ @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelDisconnected"); super.channelDisconnected(ctx, e); } /** * 就算没有建立成功,channel关闭的时候触发 * * 可以再用户断线的时候清楚用户的缓存数据等 */ @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelClosed"); super.channelClosed(ctx, e); } }