zoukankan      html  css  js  c++  java
  • SpringCloud-Bus组件的使用

    1.什么是Bus

    • https://spring.io/projects/spring-cloud-bus

    • springcloudbus使用轻量级消息代理将分布式系统的节点连接起来。然后,可以使用它来广播状态更改(例如配置更改)或其他管理指令。AMQP和Kafka broker实现包含在项目中。或者,在类路径上找到的任何springcloudstream绑定器都可以作为传输使用。

    • 通俗定义: bus称之为springcloud中消息总线,主要用来在微服务系统中实现远端配置更新时通过广播形式通知所有客户端刷新配置信息,避免手动重启服务的工作

    2.实现配置刷新原理

    3.搭建RabbitMQ服务

    可以看之前写的文章,在linux系统中安装rabbitmq

    4.实现自动配置刷新

    继续在上一个写的configserver项目中进行使用

    1.在所有项目中引入bus依赖

    <!--引入bus依赖-->
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-bus-amqp</artifactId>
    </dependency>
    

    2.配置统一配置中心连接到mq

    spring.rabbitmq.host=192.168.52.130											#连接主机
    spring.rabbitmq.port=5672														#连接mq端口
    spring.rabbitmq.username=guest												#连接mq用户名
    spring.rabbitmq.password=guest										#连接mq密码
    

    3.远端配置中加入连接mq配置

    在上一个项目configserver得基础之上,看一看上篇文章,和在上面得配置一样,

    4.启动统一配置中心服务

    • 正常启动

    5.启动客户端服务

    • 加入bus组件之后客户端启动报错
    • 原因springcloud中默认链接不到远程服务器不会报错,但是在使用bus消息总线时必须开启连接远程服务失败报错

    spring.cloud.config.fail-fast=true
    

    在configclient项目中添加,记得先引入bus依赖

    6.修改远程配置后在配置中心服务通过执行post接口刷新配置

    当修改远程配置之后不进行刷新配置,那么访问得还是原来得数据,必须进行post接口刷新配置

    首先再configserver7878得配置文件中添加

    management.endpoints.web.exposure.include=*
    

    r然后重启再进行刷新

    此时是通过配置server来进行得刷新,配置server连接得所有节点都得到了刷新,

    可以看到在配置文件中修改之后得数据了

    7.通过上述配置就实现了配置统一刷新

    5.指定服务刷新配置

    1.说明

    • 默认情况下使用curl -X POST http://localhost:7878/actuator/bus-refresh
      这种方式刷新配置是全部广播形式,也就是所有的微服务都能接收到刷新配置通知,但有时我们修改的仅仅是某个服务的配置,这个时候对于其他服务的通知是多余的,因此就需要指定服务进行通知

    2.指定服务刷新配置实现

    5.集成webhook实现自动刷新

    1.配置webhooks

    • 添加webhooks
    • 在webhooks中添加刷新配置接口

    使用natapp来进行内网穿透,直接看官网教程就可以了

    https://natapp.cn/

    使用natapp作为公网地址之后会出现400错误

    2.解决400错误问题

    • 在配置中心服务端加入过滤器进行解决(springcloud中一个坑)
    @Component
    public class UrlFilter  implements Filter {
        @Override
        public void init(FilterConfig filterConfig) throws ServletException {
     
        }
     
        @Override
        public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
            HttpServletRequest httpServletRequest = (HttpServletRequest)request;
            HttpServletResponse httpServletResponse = (HttpServletResponse)response;
     
            String url = new String(httpServletRequest.getRequestURI());
     
            //只过滤/actuator/bus-refresh请求
            if (!url.endsWith("/bus-refresh")) {
                chain.doFilter(request, response);
                return;
            }
     
            //获取原始的body
            String body = readAsChars(httpServletRequest);
     
            System.out.println("original body:   "+ body);
     
            //使用HttpServletRequest包装原始请求达到修改post请求中body内容的目的
            CustometRequestWrapper requestWrapper = new CustometRequestWrapper(httpServletRequest);
     
            chain.doFilter(requestWrapper, response);
     
        }
     
        @Override
        public void destroy() {
     
        }
     
        private class CustometRequestWrapper extends HttpServletRequestWrapper {
            public CustometRequestWrapper(HttpServletRequest request) {
                super(request);
            }
     
            @Override
            public ServletInputStream getInputStream() throws IOException {
                byte[] bytes = new byte[0];
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
     
                return new ServletInputStream() {
                    @Override
                    public boolean isFinished() {
                        return byteArrayInputStream.read() == -1 ? true:false;
                    }
     
                    @Override
                    public boolean isReady() {
                        return false;
                    }
     
                    @Override
                    public void setReadListener(ReadListener readListener) {
     
                    }
     
                    @Override
                    public int read() throws IOException {
                        return byteArrayInputStream.read();
                    }
                };
            }
        }
     
        public static String readAsChars(HttpServletRequest request)
        {
     
            BufferedReader br = null;
            StringBuilder sb = new StringBuilder("");
            try
            {
                br = request.getReader();
                String str;
                while ((str = br.readLine()) != null)
                {
                    sb.append(str);
                }
                br.close();
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
            finally
            {
                if (null != br)
                {
                    try
                    {
                        br.close();
                    }
                    catch (IOException e)
                    {
                        e.printStackTrace();
                    }
                }
            }
            return sb.toString();
        }
    }
    

  • 相关阅读:
    【转】最大子序列和(动态规划学习)
    [转]修改Oracle XDB的8080端口
    【转】 C++常见编译/链接错误及其解决办法
    Pentaho Dashboard Editor使用向导
    [转]什么是Unicode是什么是UTF8是什么
    【转】 typedef的四个用途和两个陷阱
    【转】 C++中类型转换的解释
    从一道笔试题谈算法优化
    [转]谈谈Unicode编码,简要解释UCS、UTF、BMP、BOM等名词
    [转]对Oracle数据库的字符集问题的资料收集,受益匪浅
  • 原文地址:https://www.cnblogs.com/mengd/p/14152444.html
Copyright © 2011-2022 走看看