zoukankan      html  css  js  c++  java
  • 让 Zipkin 能通过 RabbitMQ 接收消息

    上一篇

    Zipkin+Sleuth 链路追踪整合

    增加基于 MQ Zipkin 埋点功能

    1.rabbitmq

    docker run --name rabbitmq -d -p 5672:5672 -p 15672:15672  -e RABBITMQ_DEFAULT_USER=spring -e RABBITMQ_DEFAULT_PASS=spring rabbitmq:management

    2.启动 Zipkin绑定 rabbitmq

    docker run --name rabbit-zipkin -d -p 9411:9411 --link rabbitmq -e RABBIT_ADDRESSES=rabbitmq:5672 -e RABBIT_USER=spring -e RABBIT_PASSWORD=spring openzipkin/zipkin

    3.添加依赖

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-zipkin</artifactId>
    </dependency>

    4.示例

    (1)provider

    配置

    server.port=8010
    management.endpoints.web.exposure.include=*
    management.endpoint.health.show-details=always
    spring.application.name=service-provider
    spring.cloud.consul.host=192.168.99.100
    spring.cloud.consul.port=8500
    spring.cloud.consul.discovery.health-check-path=/actuator/health
    spring.cloud.consul.discovery.service-name=${spring.application.name}
    spring.cloud.consul.discovery.heartbeat.enabled=true
    spring.cloud.consul.discovery.prefer-ip-address=true

    spring.rabbitmq.host=192.168.99.100
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=spring
    spring.rabbitmq.password=spring

    spring.cloud.stream.bindings.finishedOrders.group=service-provider

    spring.sleuth.sampler.probability=1.0
    spring.zipkin.sender.type=rabbit

    启动类

    package com.xyz.provider;
    
    import com.xyz.provider.integration.Barista;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    
    @EnableDiscoveryClient
    @SpringBootApplication
    @EnableBinding(Barista.class)
    public class ProviderApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(ProviderApplication.class, args);
        }
    
    }
    View Code

    Barista.java

    package com.xyz.provider.integration;
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.SubscribableChannel;
    
    public interface Barista {
        String NEW_ORDERS = "newOrders";
        String FINISHED_ORDERS = "finishedOrders";
    
        @Input
        SubscribableChannel finishedOrders();
    
        @Output
        MessageChannel newOrders();
    }
    View Code

    OrderListener.java

    package com.xyz.provider.integration;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @Slf4j
    public class OrderListener {
        @StreamListener(Barista.FINISHED_ORDERS)
        public void listenFinishedOrders(Integer num) {
            log.info("We've finished an order [{}].", num);
        }
    }
    View Code

    OrderService.java

    package com.xyz.provider.service;
    
    import com.xyz.provider.integration.Barista;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    @Service
    @Transactional
    @Slf4j
    public class OrderService{
    
        @Autowired
        private Barista barista;
        public boolean updateNum(Integer num) {
            num++;
            System.out.println(num);
            barista.newOrders().send(MessageBuilder.withPayload(num).build());
            return true;
        }
    }
    View Code

    控制器

    package com.xyz.provider.controller;
    
    import com.xyz.provider.service.OrderService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @Slf4j
    public class demoController {
    
        @Autowired
        private OrderService orderService;
    
        @RequestMapping("/rabbitmq")
        public String rabbitmq(Integer num) {
            log.info("msq num: ", num);
            orderService.updateNum(num);
            return "ok";
        }
    }

    (2)customer

    配置

    server.port=8015
    spring.application.name=service-comsumer
    management.endpoints.web.exposure.include=*
    management.endpoint.health.show-details=always
    
    spring.cloud.consul.host=192.168.99.100
    spring.cloud.consul.port=8500
    spring.cloud.consul.discovery.health-check-path=/actuator/health
    spring.cloud.consul.discovery.service-name=${spring.application.name}
    spring.cloud.consul.discovery.heartbeat.enabled=true
    
    spring.rabbitmq.host=192.168.99.100
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=spring
    spring.rabbitmq.password=spring
    
    spring.cloud.stream.bindings.newOrders.group=service-comsumer
    spring.sleuth.sampler.probability=1.0
    spring.zipkin.sender.type=rabbit

    启动类

    package com.xyz.comsumer;
    
    import com.xyz.comsumer.integration.Waiter;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.client.loadbalancer.LoadBalanced;
    import org.springframework.cloud.openfeign.EnableFeignClients;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.context.annotation.Bean;
    import org.springframework.web.client.RestTemplate;
    
    @EnableFeignClients
    @SpringBootApplication
    @EnableBinding(Waiter.class)
    public class ComsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(ComsumerApplication.class, args);
        }
    
        @Bean
        @LoadBalanced
        public RestTemplate restTemplate() {
            return new RestTemplate();
        }
    
    }
    View Code

    Waiter.java

    package com.xyz.comsumer.integration;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.SubscribableChannel;
    
    public interface Waiter {
        String NEW_ORDERS = "newOrders";
        String FINISHED_ORDERS = "finishedOrders";
    
        @Input(NEW_ORDERS)
        SubscribableChannel newOrders();
    
        @Output(FINISHED_ORDERS)
        MessageChannel finishedOrders();
    }
    View Code

    OrderListener.java

    package com.xyz.comsumer.integration;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    
    @Component
    @Slf4j
    @Transactional
    public class OrderListener {
        @Autowired
        @Qualifier(Waiter.FINISHED_ORDERS)
        private MessageChannel finishedOrdersMessageChannel;
        @StreamListener(Waiter.NEW_ORDERS)
        public void processNewOrder(Integer num) {
            num++;
            log.info("Receive a new order",
                    num);
            System.out.println(num);
            finishedOrdersMessageChannel.send(MessageBuilder.withPayload(num).build());
        }
    }
    View Code

    启动consul

    启动provider

    启动customer

    GET   

      http://localhost:8010/rabbitmq

    返回

      ok

    浏览器打开

      http://192.168.99.100:9411

    进入详情页

     

     

     RabbitMQ 的管理界面,可以看到 zipkin 这个 Queue 有消息处理

  • 相关阅读:
    (转)Dynamic Web project转成Maven项目
    (转)nodejs搭建本地http服务器
    jquery mobile validation
    Quartz任务调度快速入门(转)
    珠宝首饰
    免费素材:25套免费的 Web UI 设计的界面元素(转)
    WebUI框架
    超越大典汽车维修系统
    如何申请开通微信多客服功能
    微信开发者文档连接
  • 原文地址:https://www.cnblogs.com/baby123/p/12807164.html
Copyright © 2011-2022 走看看