zoukankan      html  css  js  c++  java
  • 1小时写一个分布式系统基础框架

    互联网时代,各种分布式框架盛行,应用层面有各种变化,但是万变不离其宗,RPC(remote procedure call)是最核心的部分,在其之上再开发服务注册和发现,负载均衡,数据HA,调用链路记录,等等功能。

    分布式系统非常复杂,今天只是管中窥豹,利用1小时搭建一个基础系统。基础系统组件可以归纳为如下三个:

    • 服务注册中心,管理元数据(Registry)

    • 服务提供方(Provider)

    • 服务调用方(Consumer)

    0. 流行系统分析

    0.1 HBase

    HBase是Apache Hadoop的数据库,能够对大型数据提供随机、实时的读写访问。极其复杂,我们不去深究,只看看它的基础框架。

    • ZooKeeper管理元数据(Registry)

    • Data Node(Provider)

    • Client(Consumer)

    0.2 Kafka

    Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。

    • ZooKeeper管理元数据(Registry)

    • Producer,Consumer(Consumer)

    • Broker(Provider)

    0.3 Dubbo

    Dubbo是阿里巴巴开发的一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案。

    1. 基础框架开发

    我们先不深究架构缘由,接下来我们按照如下框架开发。Netty应用在众多分布式系统中,我们也基于Netty开发,大家可以稍微修改即可应用在生产环境中。

    1.0 整体框架

    如下,分成4个模块。接下来分别讲解

    • common(提供基础类定义)

    • registry(注册中心)

    • client(服务调用方,Consumer)

    • server(服务提供方,Provider)

    1.1 common

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.0.36.Final</version>
    </dependency>

    1.1.1 Command.java

    每一个调用都是一个命令

    package com.parsecode.common;
    
    import java.util.concurrent.atomic.AtomicLong;
    
    public class Command {
        // 注册服务端
        public final static int REGISTER_SERVER = 1;
        // 取消注册
        public final static int UNREGISTER_SERVER = 2;
        // 调用方法
        public final static int INVOKE_REQUEST = 3;
        // 方法返回
        public final static int INVOKE_RESPONSE = 4;
        // 获取服务列表
        public final static int GET_SERVER_LIST = 5;
        public final static int GET_SERVER_LIST_RESPONSE = 6;
    
        private static AtomicLong IDS = new AtomicLong(0);
    
        private int type;
        private long requestId;
    
        // 存放具体的方法调用信息、调用结果等
        private byte[] body;
    
        public Command(int type, byte[] body) {
            this.type = type;
            this.body = body;
            this.requestId = IDS.incrementAndGet();
        }
    
        // command 长度 = type(4) + requestId(8) + body.length
        public int length() {
            return 12 + (body == null ? 0 : body.length);
        }
    
        public int getType() {
            return type;
        }
        public void setType(int type) {
            this.type = type;
        }
        public byte[] getBody() { return body; }
        public void setBody(byte[] body) { this.body = body; }
    
        public long getRequestId() {
            return requestId;
        }
    
        public void setRequestId(long requestId) {
            this.requestId = requestId;
        }
    }

    1.1.2 Invocation.java

    用来表示一个方法调用,包括了,调用是哪个接口,哪个方法,参数类型,参数值。会用JSON序列化成byte[]。

    package com.parsecode.common;
    
    import java.io.Serializable;
    
    public class Invocation implements Serializable {
        private String interfaceName;
        private String methodName;
        private String[] parameterTypes;
        private Object[] arguments;
    
        public String getInterfaceName() {
            return interfaceName;
        }
    
        public void setInterfaceName(String interfaceName) {
            this.interfaceName = interfaceName;
        }
    
        public String getMethodName() {
            return methodName;
        }
    
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
    
        public String[] getParameterTypes() {
            return parameterTypes;
        }
    
        public void setParameterTypes(String[] parameterTypes) {
            this.parameterTypes = parameterTypes;
        }
    
        public Object[] getArguments() {
            return arguments;
        }
    
        public void setArguments(Object[] arguments) {
            this.arguments = arguments;
        }
    }

    1.1.3 消息编解码

    也就是大家说的通信协议,简单设计协议如下

    消息解码

    package com.parsecode.common;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import java.nio.ByteBuffer;
    public class NettyDecoder extends LengthFieldBasedFrameDecoder {
    
        public NettyDecoder() {
            super(65536, 0, 4, 0, 4);
        }
    
        @Override
        public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            ByteBuf frame = null;
            try {
                frame = (ByteBuf) super.decode(ctx, in);
                if (null == frame) {
                    return null;
                }
    
                ByteBuffer byteBuffer = frame.nioBuffer();
                int length = byteBuffer.limit();
                int type = byteBuffer.getInt();
                long requestId = byteBuffer.getLong();
                byte[] bodyData = null;
                if ((length - 12) > 0) {
                    bodyData = new byte[length - 12];
                    byteBuffer.get(bodyData);
                }
                Command cmd = new Command(type, bodyData);
                cmd.setRequestId(requestId);
                return cmd;
            } catch (Exception e) {
                ctx.channel().close().addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        System.out.println("closeChannel");
                    }
                });
            } finally {
                if (null != frame) {
                    frame.release();
                }
            }
            return null;
        }
    }

    消息编码

    package com.parsecode.common;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    public class NettyEncoder extends MessageToByteEncoder<Command> {
        @Override
        public void encode(ChannelHandlerContext ctx, Command cmd, ByteBuf out) {
            try {
                int length = cmd.length();
                out.writeInt(length);
                out.writeInt(cmd.getType());
                out.writeLong(cmd.getRequestId());
                out.writeBytes(cmd.getBody());
            } catch (Exception e) {
                ctx.channel().close();
            }
        }
    }

    1.2 registry

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <groupId>com.parsecode.framework</groupId>
            <artifactId>distributed-communication-all</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </parent>
    
        <modelVersion>4.0.0</modelVersion>
        <packaging>jar</packaging>
        <artifactId>distributed-communication-registry</artifactId>
        <name>distributed-communication-registry ${project.version}</name>
    
    
        <dependencies>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.0.36.Final</version>
            </dependency>
            <dependency>
                <groupId>com.parsecode.framework</groupId>
                <artifactId>distributed-communication-common</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.4</version>
            </dependency>
        </dependencies>
    </project>

     

    Registry.java

    package com.parsecode.registry;
    
    import com.parsecode.common.NettyDecoder;
    import com.parsecode.common.NettyEncoder;
    import com.parsecode.common.Command;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.timeout.IdleStateHandler;
    import org.apache.commons.lang3.StringUtils;
    
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Registry {
        private final ServerBootstrap serverBootstrap;
        private final EventLoopGroup eventLoopGroupWorker;
        private final EventLoopGroup eventLoopGroupBoss;
    
        // 存放对外提供的服务对象 <interface, servers>
        private final ConcurrentHashMap<String, List<String>> servers = new ConcurrentHashMap<String, List<String>>();
    
        public Registry() {
            this.serverBootstrap = new ServerBootstrap();
    
            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });
    
            final int threadCount = Runtime.getRuntime().availableProcessors() * 2;
            this.eventLoopGroupWorker = new NioEventLoopGroup(threadCount, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = threadCount;
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerWorker_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }
    
        public void start() {
            serverBootstrap.group(eventLoopGroupBoss, eventLoopGroupWorker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024) // 链接队列个数
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .localAddress("127.0.0.1", 8200)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(
                                    new NettyEncoder(),
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, 60), //空闲链路状态处理
                                    new NettyServerHandler());
                        }
                    });
    
            try {
                serverBootstrap.bind().sync().addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        System.out.println("registry 创建成功");
                    }
                });
            } catch (InterruptedException e1) {
                throw new RuntimeException("serverBootstrap.bind().sync() InterruptedException", e1);
            }
        }
    
        class NettyServerHandler extends SimpleChannelInboundHandler<Command> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, Command msg) throws Exception {
                processMessageReceived(ctx, msg);
            }
        }
    
        // 根据不同的命令,相应处理
        public void processMessageReceived(ChannelHandlerContext ctx, Command msg) {
            final Command cmd = msg;
            if (cmd != null) {
                switch (cmd.getType()) {
                    case Command.GET_SERVER_LIST:
                        try {
                            String interfaceName = new String(msg.getBody(), "UTF-8");
                            List<String> ips = servers.get(interfaceName);
                            if (ips == null) {
                                ips = new ArrayList<String>();
                                servers.put(interfaceName, ips);
                            }
                            // 格式: ip:port,ip:port
                            String str = StringUtils.join(ips, ",");
                            byte[] body = String.valueOf(str).getBytes("UTF-8");
                            Command response = new Command(Command.GET_SERVER_LIST_RESPONSE, body);
                            response.setRequestId(msg.getRequestId());
                            ctx.channel().writeAndFlush(response);
                        } catch (Exception e) {}
                        break;
                    case Command.REGISTER_SERVER:
                    case Command.UNREGISTER_SERVER:
                        // 格式:interface,ip:port
                        try {
                            String str = new String(msg.getBody(), "UTF-8");
                            System.out.println("服务注册:" + str);
                            String[] aStr = str.split(",");
                            List<String> ips = servers.get(aStr[0]);
                            if (ips == null) {
                                ips = new ArrayList<String>();
                                servers.put(aStr[0], ips);
                            }
                            if (msg.getType() == Command.REGISTER_SERVER && !ips.contains(aStr[1])) {
                                ips.add(aStr[1]);
                            } else {
                                ips.remove(aStr[1]);
                            }
                        } catch (Exception e){
                            System.out.println("error" + e.getMessage());
                        };
                        break;
                    default:
                        break;
                }
            }
        }
    
        public static void main(String[] args) {
            Registry registry = new Registry();
            registry.start();
        }
    }

    1.3 server

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <groupId>com.parsecode.framework</groupId>
            <artifactId>distributed-communication-all</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </parent>
    
        <modelVersion>4.0.0</modelVersion>
        <packaging>jar</packaging>
        <artifactId>distributed-communication-server</artifactId>
        <name>distributed-communication-server ${project.version}</name>
    
    
        <dependencies>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.0.36.Final</version>
            </dependency>
            <dependency>
                <groupId>com.parsecode.framework</groupId>
                <artifactId>distributed-communication-common</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.12</version>
            </dependency>
        </dependencies>
    </project>

     

    ServiceProvder.java

    package com.parsecode.server;
    
    import com.alibaba.fastjson.JSON;
    import com.parsecode.common.*;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    import io.netty.handler.timeout.IdleStateHandler;
    
    import java.lang.reflect.Method;
    import java.net.InetSocketAddress;
    import java.util.*;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class ServiceProvider {
        // 提供服务
        private final ServerBootstrap serverBootstrap;
        // 链接注册中心
        private final Bootstrap registry = new Bootstrap();
    
        private final EventLoopGroup eventLoopGroupWorker;
        private final EventLoopGroup eventLoopGroupBoss;
        private final EventLoopGroup eventLoopGroupRegistry;
    
        // 存放对外提供的服务对象 <interface, implement>
        private final Map<String, Object> services = new HashMap<String, Object>();
    
        // 向注册中心,实时发送心跳
        private final Timer timer = new Timer("registry-heartbeat", true);
    
        public ServiceProvider() {
            this.serverBootstrap = new ServerBootstrap();
    
            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });
    
            final int threadCount = Runtime.getRuntime().availableProcessors() * 2;
            this.eventLoopGroupWorker = new NioEventLoopGroup(threadCount, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = threadCount;
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerWorker_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
    
            eventLoopGroupRegistry = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyClientRegistry_%d", this.threadIndex.incrementAndGet()));
                }
            });
        }
    
        public void addService(String interfaceName, Object service) {
            services.put(interfaceName, service);
        }
    
        public void start() {
            serverBootstrap.group(eventLoopGroupBoss, eventLoopGroupWorker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024) // 链接队列个数
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.SO_KEEPALIVE, false) //设置心跳参数 FALSE为不启用参数
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .localAddress(new InetSocketAddress(8080))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(
                                    new NettyEncoder(),
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, 60), //空闲链路状态处理
                                    new NettyServerHandler());
                        }
                    });
            registry.group(this.eventLoopGroupRegistry)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(
                                    new NettyEncoder(),
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, 60),
                                    new NettyServerHandler());
                        }
                    });
    
            try {
                serverBootstrap.bind().sync();
            } catch (InterruptedException e1) {
                throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
            }
    
            // 向注册中心,注册自己
            timer.scheduleAtFixedRate(new TimerTask() {
                private volatile boolean registryOK = false;
                private volatile Channel channel;
                @Override
                public void run() {
                    try {
                        while (!registryOK) {
                            // 注册中心 port 8200
                            InetSocketAddress registryAddress = new InetSocketAddress("127.0.0.1", 8200);
                            ChannelFuture channelFuture = registry.connect(registryAddress);
    
                            channelFuture.syncUninterruptibly().addListener(new ChannelFutureListener() {
                                @Override
                                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                                    registryOK = true;
                                    channel = channelFuture.channel();
                                }
                            });
                        }
    
                        if (registryOK && channel.isActive()) {
                            for (String key : services.keySet()) {
                                // 服务port和ip
                                byte[] body = (key + ",127.0.0.1:8080").getBytes("UTF-8");
                                Command cmd = new Command(Command.REGISTER_SERVER, body);
                                channel.writeAndFlush(cmd);
                                System.out.println("注册服务 > " + channel.toString());
                            }
                        } else {
                            registryOK = false;
                            channel.close();
                        }
                    } catch (Exception e) {
                        registryOK = false;
                        if (null != channel) {
                            channel.close();
                        }
                    }
                }
            }, 10, 1000);
        }
    
        public void shutdown() {
            eventLoopGroupBoss.shutdownGracefully();
            eventLoopGroupWorker.shutdownGracefully();
        }
    
        class NettyServerHandler extends SimpleChannelInboundHandler<Command> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, Command msg) throws Exception {
                final Command cmd = msg;
                switch (cmd.getType()) {
                    case Command.INVOKE_REQUEST:
                        try {
                            Invocation invoke = JSON.parseObject(cmd.getBody(), Invocation.class);
                            // 找到服务
                            Object service = services.get(invoke.getInterfaceName());
                            Class cls = Class.forName(invoke.getInterfaceName());
                            List<Class> argsTypeList = new ArrayList<Class>(invoke.getParameterTypes().length);
                            for (String s : invoke.getParameterTypes()) {
                                argsTypeList.add(Class.forName(s));
                            }
                            Method method = cls.getMethod(invoke.getMethodName(), argsTypeList.toArray(new Class[argsTypeList.size()]));
                            Object result = method.invoke(service, invoke.getArguments());
    
                            Command response = new Command(Command.INVOKE_RESPONSE, JSON.toJSONBytes(result));
                            response.setRequestId(cmd.getRequestId());
                            ctx.channel().writeAndFlush(response);
                        } catch (Exception e) {}
                        break;
                    default:
                        break;
                }
            }
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                if (evt instanceof IdleStateEvent) {
                    IdleStateEvent evnet = (IdleStateEvent) evt;
                    if (evnet.state().equals(IdleState.ALL_IDLE)) {
                        System.out.println("NETTY SERVER PIPELINE: IDLE exception");
                        ctx.channel().close();
                    }
                }
                ctx.fireUserEventTriggered(evt);
            }
        }
    
        public static void main(String[] args) {
            ServiceProvider sp = new ServiceProvider();
            sp.addService(IService.class.getCanonicalName(), new ServiceImpl());
            sp.start();
        }
    }

    1.4 client

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <groupId>com.parsecode.framework</groupId>
            <artifactId>distributed-communication-all</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </parent>
    
        <modelVersion>4.0.0</modelVersion>
        <packaging>jar</packaging>
        <artifactId>distributed-communication-client</artifactId>
        <name>distributed-communication-client ${project.version}</name>
    
    
        <dependencies>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.0.36.Final</version>
            </dependency>
            <dependency>
                <groupId>com.parsecode.framework</groupId>
                <artifactId>distributed-communication-common</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.12</version>
            </dependency>
        </dependencies>
    </project>

     

    ServiceConsumer.java

    package com.parsecode.client;
    
    import com.alibaba.fastjson.JSON;
    import com.parsecode.common.*;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.timeout.IdleStateHandler;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.net.InetSocketAddress;
    import java.util.*;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class ServiceConsumer implements InvocationHandler {
        private final Bootstrap bootstrap = new Bootstrap();
        private final Bootstrap registry = new Bootstrap();
    
        // <ip:port, channel>,本例子只支持一个接口。生产环境需要支持多服务(interface)
        private final ConcurrentHashMap<String, Channel> channelTables = new ConcurrentHashMap<String, Channel>();
        // 存放调用响应
        private final ConcurrentHashMap<Long, Command> responses = new ConcurrentHashMap<Long, Command>();
        private final Timer timer = new Timer("registry-heartbeat", true);
    
        private final EventLoopGroup eventLoopGroupWorker;
        private final EventLoopGroup eventLoopGroupRegistry;
    
        // 远端服务接口
        private Class interfaceClass;
    
        public ServiceConsumer(Class interfaceClass) {
            this.interfaceClass = interfaceClass;
            // 服务连接线程组
            eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyClientWorker_%d", this.threadIndex.incrementAndGet()));
                }
            });
            // 注册中心连接线程组
            eventLoopGroupRegistry = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyClientRegistry_%d", this.threadIndex.incrementAndGet()));
                }
            });
        }
    
        /**
         * 这里简单的用代理就可以了。
         * 生产环境:需要用javassist,jdk等字节码技术动态生产class
         */
        public <T> T getTarget() {
            final Class<?>[] interfaces = new Class[]{interfaceClass};
            return (T) Proxy.newProxyInstance(ServiceConsumer.class.getClassLoader(), interfaces, this);
        }
    
        public void start() {
            // 定义远端服务连接参数
            bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(
                                    new NettyEncoder(),
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, 6), //
                                    new NettyClientHandler());
                        }
                    });
            // 定义注册中心连接参数
            registry.group(this.eventLoopGroupRegistry)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(
                                    new NettyEncoder(),
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, 60),
                                    new NettyClientHandler());
                        }
                    });
    
            // 定期从注册中心拉取服务端ip,port
            timer.scheduleAtFixedRate(new TimerTask() {
                volatile boolean registryOK = false;
                volatile Channel channel;
    
                @Override
                public void run() {
                    try {
                        while (!registryOK) {
                            // 注册中心 port 8200
                            ChannelFuture channelFuture = registry.connect(new InetSocketAddress("127.0.0.1", 8200));
                            channelFuture.syncUninterruptibly().addListener(new ChannelFutureListener() {
                                @Override
                                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                                    registryOK = true;
                                    channel = channelFuture.channel();
                                }
                            });
                        }
    
                        if (registryOK && channel.isActive()) {
                            byte[] body = interfaceClass.getCanonicalName().getBytes("UTF-8");
                            Command cmd = new Command(Command.GET_SERVER_LIST, body);
                            channel.writeAndFlush(cmd);
                        } else {
                            channel.close();
                            registryOK = false;
                        }
                    } catch (Exception e) {
                        registryOK = false;
                    }
                }
            }, 10, 1000);
        }
    
        class NettyClientHandler extends SimpleChannelInboundHandler<Command> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, Command msg) throws Exception {
                final Command cmd = msg;
                if (cmd != null) {
                    switch (cmd.getType()) {
                        case Command.INVOKE_RESPONSE:
                            responses.put(msg.getRequestId(), msg);
                            break;
                        case Command.GET_SERVER_LIST_RESPONSE:
                            try {
                                String str = new String(msg.getBody(), "UTF-8");
                                String[] servers = str.split(",");
                                for (String ip : servers) {
                                    System.out.println("服务提供者:" + ip);
                                    String[] ipAndPort = ip.split(":");
                                    // 已经连接到了服务端,跳过
                                    if (channelTables.containsKey(ip)) {
                                        continue;
                                    }
                                    if (ipAndPort.length == 2) {
                                        Channel channel = bootstrap.connect(new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1])))
                                                .sync().channel();
                                        channelTables.put(ip, channel);
                                    }
                                }
                            }catch (Exception e){}
                            break;
                        default:
                            break;
                    }
                }
            }// channelRead0
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            Invocation inv = new Invocation();
            inv.setInterfaceName(interfaceClass.getCanonicalName());
            inv.setMethodName(method.getName());
            inv.setArguments(args);
    
            String[] types = new String[method.getParameterTypes().length];
            int index = 0;
            for (Class type : method.getParameterTypes()) {
                types[index] = type.getCanonicalName();
                index++;
            }
            inv.setParameterTypes(types);
    
            Command cmd = new Command(Command.INVOKE_REQUEST, JSON.toJSONBytes(inv));
            // 如果服务端没有就位,等待。这里只是例子,生产环境需要用超时和线程池。
            while(channelTables.isEmpty()) {
                Thread.sleep(2);
            }
            for (String key : channelTables.keySet()) {
                channelTables.get(key).writeAndFlush(cmd);
                System.out.println("目标调用服务器:" + channelTables.get(key).toString());
            }
    
            // 等待服务端返回。生产环境需要用Future来控制,这里为了简单。
            try {
                Thread.sleep(5);
            } catch (Exception e){}
    
            Command response = responses.get(cmd.getRequestId());
            if (response != null) {
                return JSON.parse(response.getBody());
            }
            return null;
        }
    
        public static void main(String[] args) throws Exception{
            ServiceConsumer sc = new ServiceConsumer(IService.class);
            sc.start();
            IService service = sc.getTarget();
            System.out.println("IService.hello("world"): " + service.hello("world"));
        }
    }

    1.5 应用

    1.5.1 定义服务

    1.5.2 服务实现

    调用效果


  • 相关阅读:
    PHP使用Redis的GEO地理信息类型
    Redis长短链接的区别
    Linux之ln文件创建链接
    xml与json格式互转
    爬虫实例:唐诗宋词爬虫
    爬虫实例:天猫商品评论爬虫
    爬虫实例:饿了么爬虫
    爬虫实例:中国日报高频词汇爬虫
    爬虫实例:今日头条爬虫
    特殊类型的列表切片
  • 原文地址:https://www.cnblogs.com/parse-code/p/6220088.html
Copyright © 2011-2022 走看看