zoukankan      html  css  js  c++  java
  • 大数据入门第四天——基础部分之轻量级RPC框架的开发

    一、概述

    1、掌握RPC原理
    2、掌握nio操作
    3、掌握netty简单的api
    4、掌握自定义RPC框架
    主要内容

      1.RPC是什么

      RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

      // 来自百度百科

      更加通俗一些的RPC的解释与介绍,参考https://www.zhihu.com/question/25536695

      TCP传输控制协议的通俗解释,参考这里,术语式的解释,参考这里

      2.自定义RPC框架思路图解

    二、先导知识准备

      由上图可以知道,需要自定义RPC框架,需要的知识主要有:spring,socket(由nio为原理的netty替代),动态代理等

      1.nio先导知识

        参考基础随笔篇:http://www.cnblogs.com/jiangbei/p/7499444.html

       2.nio框架——netty

        netty推荐书籍:netty权威指南

        netty是什么?

      是一个网络编程框架(就是一个封装了NIO的框架)

      Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.

        netty起步

          由于暂时不对netty进行深入了解(后续应该会专门有针对netty的随笔),这里仅作到“能用”,点击查看:get started

          依赖:

            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.0.24.Final</version>
            </dependency>        

      //如果不使用maven,请去maven仓库下载jar即可!

             模板代码:

    package cn.itcast_03_netty.sendstring.client;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    import java.net.InetSocketAddress;
    
    import cn.itcast_03_netty.sendobject.coder.PersonEncoder;
    
    /**
     * • 连接服务器 • 写数据到服务器 • 等待接受服务器返回相同的数据 • 关闭连接
     * 
     * @author wilson
     *
     */
    public class EchoClient {
    
        private final String host;
        private final int port;
    
        public EchoClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        public void start() throws Exception {
            EventLoopGroup nioEventLoopGroup = null;
            try {
                // 客户端引导类
                Bootstrap bootstrap = new Bootstrap();
                // EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据
                nioEventLoopGroup = new NioEventLoopGroup();
                bootstrap.group(nioEventLoopGroup)//多线程处理
                        .channel(NioSocketChannel.class)//指定通道类型为NioServerSocketChannel,一种异步模式,OIO阻塞模式为OioServerSocketChannel
                        .remoteAddress(new InetSocketAddress(host, port))//地址
                        .handler(new ChannelInitializer<SocketChannel>() {//业务处理类
                                    @Override
                                    protected void initChannel(SocketChannel ch)
                                            throws Exception {
                                        ch.pipeline().addLast(new EchoClientHandler());//注册handler
                                    }
                                });
                // 链接服务器
                ChannelFuture channelFuture = bootstrap.connect().sync();
                channelFuture.channel().closeFuture().sync();
            } finally {
                nioEventLoopGroup.shutdownGracefully().sync();
            }
        }
    
        public static void main(String[] args) throws Exception {
            new EchoClient("localhost", 20000).start();
        }
    }
    EchoClient
    package cn.itcast_03_netty.sendstring.client;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import cn.itcast_03_netty.sendobject.bean.Person;
    
    public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
        // 客户端连接服务器后被调用
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("客户端连接服务器,开始发送数据……");
            byte[] req = "QUERY TIME ORDER".getBytes();//消息
            ByteBuf firstMessage = Unpooled.buffer(req.length);//发送类
            firstMessage.writeBytes(req);//发送
            ctx.writeAndFlush(firstMessage);//flush
        }
    
        // • 从服务器接收到数据后调用
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
                throws Exception {
            System.out.println("client 读取server数据..");
            // 服务端返回消息后
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println("服务端数据为 :" + body);
        }
    
        // • 发生异常时被调用
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            System.out.println("client exceptionCaught..");
            // 释放资源
            ctx.close();
        }
    }
    EchoClientHandler
    package cn.itcast_03_netty.sendstring.server;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import cn.itcast_03_netty.sendobject.coder.PersonDecoder;
    
    /**
     * • 配置服务器功能,如线程、端口 • 实现服务器处理程序,它包含业务逻辑,决定当有一个请求连接或接收数据时该做什么
     * 
     * @author wilson
     *
     */
    public class EchoServer {
    
        private final int port;
    
        public EchoServer(int port) {
            this.port = port;
        }
    
        public void start() throws Exception {
            EventLoopGroup eventLoopGroup = null;
            try {
                //server端引导类
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                //连接池处理数据
                eventLoopGroup = new NioEventLoopGroup();
                //装配bootstrap
                serverBootstrap.group(eventLoopGroup)
                .channel(NioServerSocketChannel.class)//指定通道类型为NioServerSocketChannel,一种异步模式,OIO阻塞模式为OioServerSocketChannel
                .localAddress("localhost",port)//设置InetSocketAddress让服务器监听某个端口已等待客户端连接。
                .childHandler(new ChannelInitializer<Channel>() {//设置childHandler执行所有的连接请求
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(new EchoServerHandler());//注册handler
                    }
                        });
                // 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭,因为使用sync(),所以关闭操作也会被阻塞。
                ChannelFuture channelFuture = serverBootstrap.bind().sync();
                System.out.println("开始监听,端口为:" + channelFuture.channel().localAddress());
                channelFuture.channel().closeFuture().sync();
            } finally {
                eventLoopGroup.shutdownGracefully().sync();
            }
        }
    
        public static void main(String[] args) throws Exception {
            new EchoServer(20000).start();
        }
    }
    EchoServer
    package cn.itcast_03_netty.sendstring.server;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    import java.util.Date;
    
    import cn.itcast_03_netty.sendobject.bean.Person;
    
    public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            System.out.println("server 读取数据……");
            //读取数据
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println("接收客户端数据:" + body);
            //向客户端写数据
            System.out.println("server向client发送数据");
            String currentTime = new Date(System.currentTimeMillis()).toString();
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            ctx.write(resp);
            
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println("server 读取数据完毕..");
            ctx.flush();//刷新后才将数据发出到SocketChannel
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    EchoServerHandler

        以上就是发送字符串的模板代码,基本上,总体的架构都不需要进行改动,只需要根据业务逻辑调整handler的代码即可!

        接下来介绍多个handler的处理方式:

      Handler在netty中,无疑占据着非常重要的地位。Handler与Servlet中的filter很像,
      通过Handler可以完成通讯报文的解码编码、拦截指定的报文、

      统一对日志错误进行处理、统一对请求进行计数、控制Handler执行与否。一句话,没有它做不到的只有你想不到的。   Netty中的所有handler都实现自ChannelHandler接口。按照输出输出来分,
    分为ChannelInboundHandler、ChannelOutboundHandler两大类。

    ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;
    ChannelOutboundHandler对从服务器发往客户端的报文进行处理,

    一般用来进行编码、发送报文到客户端。   Netty中,可以注册多个handler。ChannelInboundHandler按照注册的先后顺序执行;
    ChannelOutboundHandler按照注册的先后顺序逆序执行,

        给出handler的顺序的结论:

          多个IN OUT的handler的时候,最后一个addList必须是OUT,而且处理顺序为先IN 的正序以及后OUT的逆序!

    1、ChannelInboundHandler之间的传递,通过调用 ctx.fireChannelRead(msg) 实现;
      调用ctx.write(msg) 将传递到ChannelOutboundHandler。
    2、ctx.write()方法执行后,需要调用flush()方法才能令它立即执行。 3、流水线pipeline中outhandler不能放在最后,否则不生效 4、Handler的消费处理放在最后一个处理。

        使用POJO代替上文中的string也是本质上相同的,故这里不再赘述,完整的代码,请查看这里,更多概念与示例,可以参见网友随笔

        3.Spring的IOC与AOP

          参考之前随笔:http://www.cnblogs.com/jiangbei/p/6790251.html

          spring的自定义注解的扫描读取参考Java基础注解部分

     三、自定义RPC框架

       整个框架架构如下:

        

        //其实dubbo也有这个功能...

       由于源码比较多,这里给出下载地址:https://pan.baidu.com/s/1htQcnEs

       其余相关后续补充

    四、JVM技术

      1.JVM内存模型

        参考基础篇:http://www.cnblogs.com/jiangbei/p/7813748.html

             :https://www.cnblogs.com/peak-c/p/6647381.html

        推荐阅读:深入理解JVM虚拟机

        可视化工具jconsole的使用:https://www.cnblogs.com/tina-smile/p/5327057.html

                        https://www.cnblogs.com/baihuitestsoftware/articles/6405580.html

        VisualVM的使用待补充

    五、hadoop中的RPC框架封装

       详细讲解,参考https://www.cnblogs.com/edisonchou/p/4285817.html

       参考https://www.cnblogs.com/junneyang/p/5841135.

      

  • 相关阅读:
    leetcode 763 划分字母区间
    leetcode 392 判断子序列
    Leetcode 665 修改一个数成为非递减数组 (Easy)
    leetcode 605 种花问题 贪心算法
    leetcode 452 用最少数量的箭引爆气球 贪心算法
    leetcode 455 分发饼干 贪心算法
    delphi中的 CLX Application
    delphi 之DCOM应用服务器定义函数
    SqlServer 之 sp_executesql系统存储过程的介绍和使用
    delphi 之调用WinSock的API获取本机的机器名称和IP地址
  • 原文地址:https://www.cnblogs.com/jiangbei/p/8321030.html
Copyright © 2011-2022 走看看