zoukankan      html  css  js  c++  java
  • RabbitMQ 使用

    RabbitMQ能做啥

    场景一:支付的通知

    生产者:微信支付完成之后在其回调方法中调用一个服务接收消息,这个服务作为生产者。

    消费者:消费者服务是一个不断从队列中获取支付结果的应用,然后在app或者页面展示。

    场景二:注册的短信或者邮件通知

    生产者:注册成功之后的回调中,发送注册成功信息到队列生产者。

    消费者:应用程序不断的获取队列中的消息,获取到就发送短信或邮件。

    1、安装

    sudo apt-get update
    sudo apt-get install rabbitmq-server

    2、启动服务、停止服务、查看服务状态

    sudo service rabbitmq-server start
    sudo service rabbitmq-server stop
    sudo service rabbitmq-server status

    3、修改打开文件句柄限制

    打开/etc/default/rabbitmq-server,修改ulimit

    4、支持的操作系统

    • Solaris
    • BSD
    • Linux
    • MacOSX
    • TRU64
    • Windows NT/2000/XP/Vista/Windows 7/Windows 8
    • Windows Server 2003/2008/2012
    • Windows 95, 98
    • VxWorks

    5、支持的编程语言

    • C# (using .net/c# client)
    • clojure (using Langohr)
    • erlang (using erlang client)
    • java (using java client)
    • javascript/node.js (using amqp.node)
    • perl (using Net::RabbitFoot)
    • python (using pika)
    • python-puka (using puka)
    • ruby (using Bunny)
    • ruby (using amqp gem)

    6、java示例-生产者

    private final static String QUEUE_NAME = "hello";
        
        public static void main(String[] args) throws Exception{
            // TODO Auto-generated method stub
            
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");//因为两个进程在同一个机器上
            Connection connection = null;
            Channel channel = null;
            
            connection = factory.newConnection();
            channel = connection.createChannel();
            
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [Producer] Sent '" + message + "'");
                
            channel.close();
            connection.close();
        }

    java示例-消费者

    private final static String QUEUE_NAME = "hello";
        
        public static void main(String[] args) throws Exception{
            
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer consumer = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                  throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [Consumer] Received '" + message + "'");
              }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
            
        }

     7、C#示例生产者

    using System;
    using System.Diagnostics;
    using System.Text;
    using System.Threading;
    using RabbitMQ.Client;
     
    namespace Producer
    {
        classProgram
        {
            staticvoidMain(string[] args)
            {
                Thread.Sleep(1000);
                var connectionFactory = new ConnectionFactory();
                IConnection connection = connectionFactory.CreateConnection();
                IModel channel = connection.CreateModel();
     
               channel.ExchangeDeclare("direct-exchange-example",ExchangeType.Direct);
                stringvalue = DoSomethingInteresting();
                stringlogMessage = string.Format("{0}:{1}",TraceEventType.Information,value);
     
                byte[]message = Encoding.UTF8.GetBytes(logMessage);
               channel.BasicPublish("direct-exchange-example","",null,message);
     
               channel.Close();
               connection.Close();
            }
     
            staticstringDoSomethingInteresting()
            {
                returnGuid.NewGuid().ToString();
            }
        }
    }

    C#示例消费者

    using System;
    using System.Text;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
     
    namespace Consumer
    {
        classProgram
        {
            staticvoidMain(string[] args)
            {
                var connectionFactory = new ConnectionFactory();
                IConnection connection = connectionFactory.CreateConnection();
                IModel channel = connection.CreateModel();
     
               channel.ExchangeDeclare("direct-exchange-example",ExchangeType.Direct);
               channel.QueueDeclare("logs",false,false,true,null);
               channel.QueueBind("logs","direct-exchange-example","");
     
                varconsumer = new QueueingBasicConsumer(channel);
               channel.BasicConsume("logs",true,consumer);
     
                vareventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
     
                stringmessage = Encoding.UTF8.GetString(eventArgs.Body);
                Console.WriteLine(message);
     
               channel.Close();
               connection.Close();
                Console.ReadLine();
            }
        }
    }

    8、Go语言生产者

    package main
     
    import (
        "github.com/streadway/amqp"
    )
     
    var conn *amqp.Connection
    var ch *amqp.Channel
     
    func main() {
        //设置连接地址和交换器队列名称
        addr := "amqp://test:test@192.168.1.104:5672/"
        exname := "amqp.ex"
        quname := "amqp.qu"
     
        //创建连接
        conn, _ = amqp.Dial(addr)
        //创建通道,注意,一个连接可以有多个通道
        ch, _ = conn.Channel()
        //创建一个交换器,参数fanout说明该交换器会将消息转发到所有与之绑定的队列中
        ch.ExchangeDeclare(exname, "fanout", true, false, false, true, nil)
        //创建一个队列,用来接受和消费信息,注意这里的队列名称,消费者获取数据需要该名称
        ch.QueueDeclare(quname, true, false, false, false, nil)
        //绑定交换器和队列
        ch.QueueBind(quname, "", exname, false, nil)
        //发送消息
        ch.Publish(exname, quname, false, false, amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte("hello,world"),
        })
        //关闭连接
        conn.Close()
        ch.Close()
     
        select {}
    }

    Go语言消费者

    package main
     
    import (
        "bytes"
        "fmt"
        "github.com/streadway/amqp"
    )
     
    var conn *amqp.Connection
    var ch *amqp.Channel
     
    func main() {
        addr := "amqp://test:test@192.168.1.104:5672/"
        quname := "amqp.qu"
        conn, _ = amqp.Dial(addr)
        ch, _ = conn.Channel()
     
        //接受消息
        value, _ := ch.Consume(quname, "", true, false, false, false, nil)
        go func() {
            for v := range value {
                s := bytesToString(&(v.Body))
                fmt.Println(*s)
            }
        }()
     
        conn.Close()
        ch.Close()
     
        select {}
    }
     
    func bytesToString(b *[]byte) *string {
        s := bytes.NewBuffer(*b)
        r := s.String()
        return &r
     
    }

    9、PHP生产者

    <?php
    //配置信息 
    $conn_args = array( 
        'host' => '127.0.0.1',  
        'port' => '5672',  
        'login' => 'guest',  
        'password' => 'guest', 
        'vhost'=>'/' 
    );   
    $e_name = 'e_linvo'; //交换机名 
    //$q_name = 'q_linvo'; //无需队列名 
    $k_route = 'key_1'; //路由key 
     
    //创建连接和channel 
    $conn = new AMQPConnection($conn_args);   
    if (!$conn->connect()) {   
        die("Cannot connect to the broker!
    ");   
    }   
    $channel = new AMQPChannel($conn);   
     
    
     
    //创建交换机对象    
    $ex = new AMQPExchange($channel);   
    $ex->setName($e_name);   
    date_default_timezone_set("Asia/Shanghai");
    //发送消息 
    //$channel->startTransaction(); //开始事务  
    for($i=0; $i<5; ++$i){ 
        sleep(1);//休眠1秒
        //消息内容 
        $message = "TEST MESSAGE!".date("h:i:sa");   
        echo "Send Message:".$ex->publish($message, $k_route)."
    ";  
    } 
    //$channel->commitTransaction(); //提交事务 
     
    $conn->disconnect();
    ?>

    PHP消费者

    <?php 
    //配置信息 
    $conn_args = array( 
        'host' => '127.0.0.1',  
        'port' => '5672',  
        'login' => 'guest',  
        'password' => 'guest', 
        'vhost'=>'/' 
    );   
    $e_name = 'e_linvo'; //交换机名 
    $q_name = 'q_linvo'; //队列名 
    $k_route = 'key_1'; //路由key 
     
    //创建连接和channel 
    $conn = new AMQPConnection($conn_args);   
    if (!$conn->connect()) {   
        die("Cannot connect to the broker!
    ");   
    }   
    $channel = new AMQPChannel($conn);   
     
    //创建交换机    
    $ex = new AMQPExchange($channel);   
    $ex->setName($e_name); 
    $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型  
    $ex->setFlags(AMQP_DURABLE); //持久化 
    echo "Exchange Status:".$ex->declare()."
    ";   
       
    //创建队列    
    $q = new AMQPQueue($channel); 
    $q->setName($q_name);   
    $q->setFlags(AMQP_DURABLE); //持久化  
    echo "Message Total:".$q->declare()."
    ";   
     
    //绑定交换机与队列,并指定路由键 
    echo 'Queue Bind: '.$q->bind($e_name, $k_route)."
    "; 
     
    //阻塞模式接收消息 
    echo "Message:
    ";   
    while(True){ 
        $q->consume('processMessage');   
        //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答  
    } 
    $conn->disconnect();   
     
    /**
     * 消费回调函数
     * 处理消息
     */ 
    function processMessage($envelope, $queue) { 
        $msg = $envelope->getBody(); 
        echo $msg."
    "; //处理消息 
        $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 
    }
    ?>


    参考:
    https://blog.csdn.net/calm_85/article/details/80848664
    https://www.shiyanlou.com/courses/630/learning/
    https://blog.csdn.net/pony_maggie/article/details/69781478
    https://blog.csdn.net/luoye4321/article/details/83722437

  • 相关阅读:
    EventBus--介绍
    EventBus--出现的问题
    File存对象--android 的File存储到SD卡();
    SharePrecences--(json+sharePrecences)存list 或对象
    缓存AsimpleCache -- 解决Android中Sharedpreferences无法存储List数据/ASimpleCache
    ViewPager--左右可滑动的
    git之win安装git和环境配置及常用命令总结
    mySql事务_ _Java中怎样实现批量删除操作(Java对数据库进行事务处理)?
    eclispe---快捷键设置
    bug_ _org.json.JSONException: End of input at character 0 of
  • 原文地址:https://www.cnblogs.com/zhaogaojian/p/11450845.html
Copyright © 2011-2022 走看看