zoukankan      html  css  js  c++  java
  • Zookeeper实现服务注册/发现

    what that?

    Zookeeper在分布式开发中使用频繁,但许多框架都对其进行了封装,初学者可能无法较好的理解其工作原理,该文章演示了使用Zookeeper实现服务注册,服务发现的简单demo,希望能达到抛砖引玉的效果;

    why need RegisterCenter?

    之所以需要访问注册和服务发现是因为分布式系统中,服务之间需要相互调用,但若每个服务自己维护一份依赖的服务信息的话,就显得很麻烦,且自身维护的数据无法保证其实时性,当依赖的服务信息发生变更时,无法及时获取更新,解决方案就是引入一个注册中心,服务提供方将自己的信息写入到注册中心,服务使用方从注册中心来获取服务信息; 如下图:

    client表示服务使用方,server表示服务提供方

    image-20200609100056798

    实现的效果: 客户端可自动发现服务信息,当服务状态发生变化时(上线,下线,更换地址),客户端可以及时响应变化,效果如下图:

    效果演示

    实现

    1. 首先保证Zookeeper以安装启动,且可以正常访问

    2. 创建Maven项目并添加Zookeeper的Java客户端依赖(注意版本号需>3.6)

      				<dependency>
                  <groupId>org.apache.zookeeper</groupId>
                  <artifactId>zookeeper</artifactId>
                  <version>3.6.1</version>
              </dependency>
      
    3. 编写服务提供方

      package com.jerry;
      
      import org.apache.zookeeper.CreateMode;
      import org.apache.zookeeper.KeeperException;
      import org.apache.zookeeper.ZooDefs;
      import org.apache.zookeeper.ZooKeeper;
      import org.apache.zookeeper.data.ACL;
      
      import java.io.IOException;
      import java.io.InputStream;
      import java.net.*;
      import java.nio.charset.StandardCharsets;
      import java.util.ArrayList;
      import java.util.Enumeration;
      
      import static java.net.InetAddress.getLocalHost;
      
      public class UserService {
      
          public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
              new UserService().serving();
          }
      
          public void serving() throws IOException, KeeperException, InterruptedException {
              //获取本机ip地址
              String ip = null;
              Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
              while (networkInterfaces.hasMoreElements()) {
                  NetworkInterface ni = (NetworkInterface) networkInterfaces.nextElement();
                  Enumeration<InetAddress> nias = ni.getInetAddresses();
                  while (nias.hasMoreElements()) {
                      InetAddress ia = (InetAddress) nias.nextElement();
                      if (!ia.isLinkLocalAddress() && !ia.isLoopbackAddress() && ia instanceof Inet4Address) {
                          ip = ia.getHostAddress();
                      }
                  }
              }
              int port = 8988;
      
              //启动服务
              ServerSocket socket = new ServerSocket(port);
              System.out.println("服务器已启动...");
              //注册服务
              serverRegister(ip, port);
              //处理请求
              clientHandler(socket);
          }
      
          private void clientHandler(ServerSocket socket) throws IOException {
              while (true) {
                  Socket accept = socket.accept();
                  InputStream inputStream = accept.getInputStream();
                  byte[] barr = new byte[1024];
                  while (true) {
                      int size = inputStream.read(barr);
                      if (size == -1) {
                          //System.out.println("客户端已关闭..");
                          accept.close();
                          break;
                      }
                      String s = new String(barr, 0, size);
                      //输出客户端消息
                      System.out.println(accept.getInetAddress().getHostAddress() + ": " + s);
                  }
              }
      
          }
      
          private void serverRegister(String ip, int port) throws IOException, KeeperException, InterruptedException {
              //注册服务
              ZooKeeper zooKeeper = new ZooKeeper("10.211.55.4: 2181",3000, null);
              try {
                  ArrayList<ACL> acl = new ArrayList<>();
                  acl.add(new ACL(31, ZooDefs.Ids.ANYONE_ID_UNSAFE));
                  zooKeeper.create("/userServer", (ip + ":" + port).getBytes(StandardCharsets.UTF_8), acl, CreateMode.EPHEMERAL);
                  System.out.println("服务发布成功!");
              } catch (KeeperException | InterruptedException e) {
                  e.printStackTrace();
                  throw e;
              }
          }
      }
      
    4. 编写服务服务使用方

      package com.yyh;
      
      import org.apache.zookeeper.*;
      
      import java.io.IOException;
      import java.io.OutputStream;
      import java.net.InetSocketAddress;
      import java.net.Socket;
      import java.util.Scanner;
      
      public class UserClient implements Watcher {
          String node = "/userServer"; //服务信息所在的节点 服务提供方和服务消费方一致
          private ZooKeeper zooKeeper;
          String server_ip;
          int server_port;
      
          public static void main(String[] args) throws Exception {
              //开始服务监听
              UserClient userClient = new UserClient();
              userClient.run();
              //当访问可用时与服务交互
              Scanner scanner = new Scanner(System.in);
              while (true){
                  System.out.println("输入要发送的信息(e:退出)");
                  String text = scanner.next();
                  if (text.equals("e"))System.exit(-1);
                  if (userClient.server_ip == null){
                      System.err.println("没有可用的服务...");
                  }else {
                      userClient.sendToServer(text);
                  }
              }
          }
          
          private void run() throws Exception {
              //连接zookeeper
              zooKeeper = new ZooKeeper("10.211.55.4:2181", 3000, null);
              //尝试获取服务信息
              getServerInfo();
              //添加对服务信息的永久监听
              zooKeeper.addWatch(node,this,AddWatchMode.PERSISTENT);
          }
      
          //获取服务信息
          private void getServerInfo()  {
              try {
                  byte[] data = zooKeeper.getData(node, false, null);
                  String[] infos = new String(data).split(":");
                  server_ip = infos[0];
                  server_port = Integer.parseInt(infos[1]);
                  System.out.println("获取服务信息成功!");
                  System.out.println(server_ip+":"+ server_port);
              } catch (KeeperException e) {
                  System.err.println("服务信息不存在! 等待服务上线........");
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      
          //当节点状态发送变化时将执行该方法(通知处理)
          @Override
          public void process(WatchedEvent event) {
              if (event.getPath().equals(node)) {
                  //根据具体逻辑处理不同的事件类型,此处只关心节点的创建删除和更新
                  if (event.getType() == Event.EventType.NodeCreated) {
                      System.err.println("服务上线了");
                      getServerInfo();
                  } else if (event.getType() == Event.EventType.NodeDataChanged) {
                      System.err.println("服务更新了");
                      getServerInfo();
                  }else if (event.getType()== Event.EventType.NodeDeleted){
                      server_ip = null;
                      server_port = 0;
                      System.err.println("服务下线了");
                  }
              }
          }
      
          public void sendToServer(String text) {
              InetSocketAddress server_address = new InetSocketAddress(server_ip, server_port);
              Socket socket = new Socket();
              try {
                  socket.connect(server_address);
                  //System.out.println("连接服务器成功!");
                  OutputStream outputStream = socket.getOutputStream();
                  outputStream.write(text.getBytes());
                  System.out.println("消息发送成功!");
              } catch (IOException e) {
                  e.printStackTrace();
              }
              try {
                  socket.close();
              } catch (IOException e) {
                  e.printStackTrace();
              }
          }
      }
      
    5. 打包服务端代码,该步骤可忽略,仅为了测试客户端正确性, 为了在打包时附带其全部依赖,此处借助Spring的打包插件,在pom中添加以下内容:

      		<build>
              <plugins>
                  <plugin>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-maven-plugin</artifactId>
                      <version>1.5.6.RELEASE</version>
                      <executions>
                          <execution>
                              <goals>
                                  <goal>repackage</goal>
                              </goals>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
      

      注意:Spring-boot打包插件会自动获取项目中的主函数,必须保证主函数只有一个,所以需要暂时注释客户端的主函数,最后执行maven的package,得到jar包

    6. 将jar上传至虚拟机并运行

      java -jar ZookeeperTest-1.0-SNAPSHOT.jar
      

      若没有其他问题则客户端依然可以正常连接服务器发送消息;

    以上便是使用Zookeeper实现服务注册和服务发现的具体步骤,在实际开发中,我们可能还会将提供的服务部署为集群,这时可将集群中的各个服务信息作为子节点注册到指定节点下,客户端监听该节点变化,获取子节点列表从而获取到服务列表,还可以在此基础上加上负载均衡算法实现对服务列表的合理访问; 如图:

    image-20200609103431764

  • 相关阅读:
    C# 把带有父子关系的数据转化为------树形结构的数据 ,以及 找出父子级关系的数据中里面的根数据Id
    基于角色的菜单按钮权限的设计及实现
    基于记忆性的中值滤波O(r)与O(1)复杂度的算法实现
    Canny算法检测边缘
    图像平滑去噪之高斯滤波器
    运动元素提取,基于帧间差分与背景差分
    基于RGB与HSI颜色模型的图像提取法
    基于阈值的灰度图像提取法
    C语言深入学习
    大津法实现图像二值化
  • 原文地址:https://www.cnblogs.com/yangyuanhu/p/13071166.html
Copyright © 2011-2022 走看看