zoukankan      html  css  js  c++  java
  • RabbitMQ

    AMQP简介

    Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议

    AMQP工作过程

    队列

    RabbitMQ简介

    解决应用耦合

    使用MQ解决耦合

    RabbitMQ适用场景

    RabbitMQ原理

    Message

    Publisher

    Consumer

    Exchange

    Binding

    Queue

    Routing-key

    Connection

    Channel

    Virtual Host

    Borker

     交换器和队列的关系

    RabbitMQ为什么需要信道,为什么不需要TCP进行通信

    Erlang安装

    修改主机名

    vim /etc/sysconfig/network

    vim /etc/hosts

    安装依赖

    yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel

    下载erlang/opt

    git clone https://github.com/erlang/otp.git

    切换到自己想要的版本

    配置参数

    创建文件夹,配置参数,然后开始安装

    mkdir /usr/local/erlang
    cd otp .
    /configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac

    编译安装

    make && make install

    修改环境变量

    打开profile文件

    vim /etc/profile

    配置,添加以下代码

    export PATH=$PATH:/usr/local/erlang/bin

    使能文件

    source /etc/profile

    查看配置是否成功

    erl -version

    安装RabbitMQ

    进入指定目录,下载rabbitmq-server

    cd /usr/local/tmp
    git clone https://github.com/rabbitmq/rabbitmq-server.git

    配置环境变量

    vim /etc/profile

    在文件中添加

    export PATH=$PATH:/usr/local/rabbitmq/sbin

    使能文件

    source /etc/profile

    开启web管理插件

    进入rabbitmq/sbin目录

    cd /usr/local/rabbitmq/sbin

    查看插件列表

    ./rabbitmq-plugins list

    生效管理插件

    ./rabbitmq-plugins enable rabbitmq_management

    后台运行

    启动rabbitmq

    ./rabbitmq-server -detached

    停止命令,如果无法停止,使用kill -9进程号进行关闭

    ./rabbitmqctl stop_app

    查看web管理界面

    RabbitMQ账户管理

    创建账户

    cd /usr/local/rabbitmq/sbin
    ./rabbitmqctl add_user test test

    给用户授予管理员角色

    ./rabbitmqctl set_user_tags test administrator

    给用户授权

    “/”表示虚拟机,test表示用户名,".", ".", ".*",表示完整权限

    ./rabbitmqctl set_permissions -p "/" test ".*" ".*" ".*"

    登录

    交换机

    direct交换器

    添加parent和test的依赖

    添加依赖,官方地址:https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.2.2.RELEASE</version>
    </dependency>

    编写配置文件

    application.yml

    spring:
        rabbitmq:
            host: 192.168.52.132
            username: test
            password: test

    PublisherApplication

    public class PublisherApplication{
        public static void main(String[] args){
            SpringApplication.run(PublisherApplication.class, args);
        }
    }

    RabbitmqConfig

    @Configuration
    public class RabbitmqConfig{
        @Bean
        protected Queue queue(){
            Queue queue = new Queue("myQueue");
            return queue;
        }
    }

    MyTest

    @SpringBootTest(classes = PublisherApplication.class)
    @RunWith(SpringJUnit4ClassRunner.class)
    public class MyTest{
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @Test
        public void test(){
            amqpTemplate.convertAndSend("myQueue", "this is content");
            System.out.println("发送成功!");
        }       
    }

    Receive

    @Component
    public class Receive{
        @RabbitListener(queues = "myQueue")
        public void demo(String msg){
            System.out.println("获取到的消息: " + msg);
        }
    }

    ConsumerApplication

    @SpringBootApplication
    public class ConsumerApplication{
        public static void main(String[] args){
            SpringAppllication.run(ConsumerApplication.class, args);
        }
    }

    fanout交换器

    public class RabbitmqConfig{
        @Bean
        protected Queue queue(){
            Queue queue = new Queue("queue");
            return queue;
        }
        @Bean
        public Queue createQueue1(){
            return new Queue("myfanout1");
        }
        @Bean
        public Queue createQueue2(){
            return new Queue("myfanout2");
        }
        @Bean
        public FanoutExchange getFanoutExchange(){
            return new FanoutExchange("amq.fanout");
        }
        @Bean
        public Binding binding(Queue createQueue1, FanoutExchange getFanoutExchange){
            return BindingBuilder.bind(createQueue1).to(getFanoutExchange);
        }
    
        @Bean
        public Binding binding2(Queue createQueue2, FanoutExchange getFanoutExchange){
            return BindingBuilder.bind(createQueue2).to(getFanoutExchange);
        }
    }

    testMethod

    @Test
    public void testMethod(){
        amqpTemplate.convertAndSend("myQueue", "content");
        System.out.println("success");
    }
    @Test
    public void testMethod2(){
        amqpTemplate.convertAndSend("amq.fanout", "core", "fanout msg");
        System.out.println("success");
    }
    @RabbitListener(queues = "myfanout2")
    public void demo2(String msg){
        System.out.println("fanout3:" + msg );
    }
    @RabbitListener(queues = "myfanout3")
    public void demo3(String msg){
        System.out.println("fanout3:" + msg );
    }

    topic交换器

    @Bean
    public Queue topicQueue1(){
        return new Queue("topic1");
    }
    @Bean
    public Queue topicQueue2(){
        return new Queue("topic2");
    }
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("amq.topic");
    }
    @Bean
    public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("com.test.*");
    }
    @Bean
    public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("com.test.a");
    }

    同步数据-项目搭建

    添加依赖parent和web启动器

    dubbo依赖,官方地址:https://mvnrepository.com/artifact/org.apache.dubbo/dubbo

    <!-- https://mvnrepository.com/artifact/org.apache.dubbo/dubbo -->
    <dependency>
        <groupId>org.apache.dubbo</groupId>
        <artifactId>dubbo</artifactId>
        <version>2.7.3</version>
    </dependency>

    zookeeper框架curator-recipes依赖,官方地址:https://mvnrepository.com/artifact/org.apache.curator/curator-recipes

    <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.2.0</version>
    </dependency>

    zookeeper框架curatorf-framework依赖,官方地址:https://mvnrepository.com/artifact/org.apache.curator/curator-framework

    <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.2.0</version>
    </dependency>

    mybatis依赖,官方地址:https://mvnrepository.com/artifact/org.mybatis.spring.boot/mybatis-spring-boot-starter

    <!-- https://mvnrepository.com/artifact/org.mybatis.spring.boot/mybatis-spring-boot-starter -->
    <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
        <version>2.1.1</version>
    </dependency>

    mysql依赖,官方地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java

    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.6</version>
    </dependency>

    common-io依赖,官方地址:https://mvnrepository.com/artifact/commons-io/commons-io

    <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.6</version>
    </dependency>

    data-solr依赖,官方地址:https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-solr

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-solr -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-solr</artifactId>
        <version>2.1.11.RELEASE</version>
    </dependency>

    amqp依赖,官方地址:https://mvnrepository.com/search?q=spring-boot-starter-amqp

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.1.11.RELEASE</version>
    </dependency>

    httpClient,官方地址:https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient

    <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.10</version>
    </dependency>

    配置各种文件

    Provider模板编写

    provider中的application.yml

    dubbo:
        application:
            name: dubbo-provider
        registry:
            address: zookeeper://192.168.93.10:2181
    spring:
        profiles:
            active: mybatis

    ProviderApplication

    @SpringBootApplication
    public class ProviderApplication{
        public void main(String[] args){
            SpringApplication.run(ProviderApplication.class, args);
        }
    }

    ProductServiceImpl

    @Service
    public class ProductServiceImpl implements ProductDubboService{
        @Autowired
        private ProductMapper productMapper;
        @Override
        public int insertProduct(Product product){
            return insertProduct(product);
        }
    }

    ProductMapper

    public interface ProductMapper{
        public int insertProduct(Product product);
    }

    ProductMapper.xml

    <insert id="insertProduct" parameterType="com.test.bean.Product">
        insert into t_product (name, price) values (#{name}, #{price})
    </insert>

    商品新增功能

    ProductController

    @Controller
    public class ProductController{
        @Autowired
        private ProductService productService;
        @GetMapping("/")
        public String showAdd(){
            return "add";
        }
    
        @PostMapping("/add")
        public String add(Product product){
            int index = productService.insertProduct(product);
            return "add";
        }
    }

    ProductApplication

    @SpringBootApplication
    @EnableDubbo
    public class ProductApplication{
        public static void main(String[] args){
            SpringApplicationl.run(ProductApplication.class, args);
        }
    }

    ProductDubboServiceImpl

    @Service
    public class ProductDubboServiceImpl implements ProductDubboService{
        @Autowired
        private ProductMapper productMapper;
        @Override
        public int insertProduct(Product product){
            return productMapper.insertProduct(product);
        }
    }

    Search项目搭建

    data-solr依赖,官方地址:https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-solr

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-solr -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-solr</artifactId>
    </dependency>

    SearchService

    public interface SearchService{
        public boolean insert(SearchPojo searchPojo);
    }

    SearchPojo

    public class SearchPojo implements Serializable{
    public static final long serialVersionUID=1L;
    @Field
    private Integer id;
    @Field
    private String name;
    @Field
    private Double price; // getter setter }

    SearchServiceImpl

    @Service
    public class SearchServiceImpl implements SearchService{
        @Autowired
        privarte SolrTemplate solrTemplate;
    
        @Override
        public boolean insert(SearchPojo serachPojo){
            UpdateResponse response = solrTemplate.saveBean("testcore", searchPojo);
            solrTemplate.commit("testcore");
            if(response.getStatuc() == 0){
                return true;
            }
            return false;
        }
    }

    SearchController

    @Controller
    public class SearchController{
        @Autowired
        private searchService searchService;
        @RequestMapping("/insert")
        @ResponseBody
        public boolean search(SearchPojo searchPojo){
            return searchService.insert(searchPojo);
        }
    }

    SpringBootApplication

    @SpringBootApplication
    public class SearchApplication{
        SpringBootApplication.run(SearchApplication.class, args);
    }

    同步solr数据

    ProductServiceImpl

    @Service
    public class ProductServiceImpl implements ProductService{
    @Autowired
    private Sender sender; @Reference
    private ProductDubboService productDubboService; @Override public int insertProduct(Product product){ Random random = new Random(); product.setId(rando.nextInt(500000)); int index = productDubboService.insertProduct(product); if(index == 1){ // 数据同步到solr中, 调用insert Map<String, String> map = nwe HashMap<>(); map.put("id", product.getId() + ""); map.put("name", product.getName() + ""); map.put("price", product.getPrice() + ""); String result = HttpClientUtil.doPost("http://localhost:8988/insert", map); boolean resultBool = Boolean.parseBoolean(result); System.out.println("result: " + resultBool); if(!resultBool){ // 把数据库中的数据删除 }
    SearchPojo sp = new SearchPojo();
    BeanUtils.copyProperties(product, sp);
    sender.send(sp); }
    return index; } }

    ProductMapper.xml

    <mapper namespace="com.test.mapper.ProductMapper">
        <insert id="insertProduct" parameterType="com.test.bean.Product">
            insert into t_product (id, name, price) values (#{id}, #{name}, #{price})
        </insert>
    </mapper>

    ProductApplication

    @SpringBootApplication
    @EnableDubbo
    @Mapper("com.test.mapper")
    public class ProviderApplication{
        public static void main(String[] args){
            SpringApplication.run(ProviderApplication.class, args);
        }
    }

    同步数据,使用rabbitmq改写

    新建modelrabbitmq

    amqp依赖,官方地址:https://mvnrepository.com/search?q=spring-boot-starter-amqp

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    application-rabbitmq.yml

    spring:
        rabbitmq:
            host: 192.168.93.10
            username: test
            password: test

    SenderConfig

    @Configuration
    public class SenderConfig{
        @Bean
        protected Queue queue(){
            return new Queue("demoqueue");
        }
    }

    Sender

    @Component
    public class Sender{
        @Autowired
        private AmqpTemplate amqpTemplate;
        public void send(Object obj){
            amqpTemplate.convertAndSend("demoqueue", obj);
        }
    }

    product下的application.yml

    dubbo:
        application:
            name: dubbo-product
        registry:
            address: zookeeper://192.168.93.10:2181
    spring:
        profiles:
            active: rabbitmq
        

    传递对象类型参数

    Recieve

    application.yml

    spring:
        rabbitmq:
            host: 192.168.93.10
            username: test
            password: test

    ReceiveApplication

    @SpringBootApplication
    public class ReceiveApplication{
        public static void main(String[] args){
            SpringApplication.run(ReceiveApplication.class, args);
        }
    }

    Receive

    @Component
    public class Receive{
        @RabbitListener(queues = "demoqueue")
        public void solr(Object obj){
            Message msg = (Message)obj;
            try{
                // 从消息队列中获取要保存的对象
                ByteArrayInputStream bis = new ByteArrayInputStream(msg.getBody());
                ObjectInputStream ois = new ObjectInputStream(bis):
                Object spObj = ois.readObject();
                SearchPojo sp = (SearchPojo)spObj;
                // 调用HttpClient方法,执行http链接,进行insert
                Map<String, String> params = new HashMap<>();
                params.put("id", sp.getId() + "");
                params.put("name", sp.getName());
                params.put("price", sp.getPrice() + "");
                HttpClientUtil.doPost("http://localhost:8099/insert", params);
                ois.close();
                bis.close();            
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    论读书
    睁开眼,书在面前
    闭上眼,书在心里
  • 相关阅读:
    莫队专题
    AJAX XML 实例
    AJAX 简介
    AJAX 服务器响应
    AJAX 创建XMLHttpRequest 对象
    AJAX 教程
    AJAX 向服务器发送请求
    AJAX onreadystatechange 事件
    AJAX ASP/PHP 请求实例
    让卖场的死角“起死回生”
  • 原文地址:https://www.cnblogs.com/YC-L/p/14383917.html
Copyright © 2011-2022 走看看