RabbitMQ能做啥
场景一:支付的通知
生产者:微信支付完成之后在其回调方法中调用一个服务接收消息,这个服务作为生产者。
消费者:消费者服务是一个不断从队列中获取支付结果的应用,然后在app或者页面展示。
场景二:注册的短信或者邮件通知
生产者:注册成功之后的回调中,发送注册成功信息到队列生产者。
消费者:应用程序不断的获取队列中的消息,获取到就发送短信或邮件。
1、安装
sudo apt-get update sudo apt-get install rabbitmq-server
2、启动服务、停止服务、查看服务状态
sudo service rabbitmq-server start sudo service rabbitmq-server stop sudo service rabbitmq-server status
3、修改打开文件句柄限制
打开/etc/default/rabbitmq-server,修改ulimit
4、支持的操作系统
- Solaris
- BSD
- Linux
- MacOSX
- TRU64
- Windows NT/2000/XP/Vista/Windows 7/Windows 8
- Windows Server 2003/2008/2012
- Windows 95, 98
- VxWorks
5、支持的编程语言
- C# (using .net/c# client)
- clojure (using Langohr)
- erlang (using erlang client)
- java (using java client)
- javascript/node.js (using amqp.node)
- perl (using Net::RabbitFoot)
- python (using pika)
- python-puka (using puka)
- ruby (using Bunny)
- ruby (using amqp gem)
6、java示例-生产者
private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");//因为两个进程在同一个机器上 Connection connection = null; Channel channel = null; connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [Producer] Sent '" + message + "'"); channel.close(); connection.close(); }
java示例-消费者
private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [Consumer] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); }
7、C#示例生产者
using System; using System.Diagnostics; using System.Text; using System.Threading; using RabbitMQ.Client; namespace Producer { classProgram { staticvoidMain(string[] args) { Thread.Sleep(1000); var connectionFactory = new ConnectionFactory(); IConnection connection = connectionFactory.CreateConnection(); IModel channel = connection.CreateModel(); channel.ExchangeDeclare("direct-exchange-example",ExchangeType.Direct); stringvalue = DoSomethingInteresting(); stringlogMessage = string.Format("{0}:{1}",TraceEventType.Information,value); byte[]message = Encoding.UTF8.GetBytes(logMessage); channel.BasicPublish("direct-exchange-example","",null,message); channel.Close(); connection.Close(); } staticstringDoSomethingInteresting() { returnGuid.NewGuid().ToString(); } } }
C#示例消费者
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace Consumer { classProgram { staticvoidMain(string[] args) { var connectionFactory = new ConnectionFactory(); IConnection connection = connectionFactory.CreateConnection(); IModel channel = connection.CreateModel(); channel.ExchangeDeclare("direct-exchange-example",ExchangeType.Direct); channel.QueueDeclare("logs",false,false,true,null); channel.QueueBind("logs","direct-exchange-example",""); varconsumer = new QueueingBasicConsumer(channel); channel.BasicConsume("logs",true,consumer); vareventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); stringmessage = Encoding.UTF8.GetString(eventArgs.Body); Console.WriteLine(message); channel.Close(); connection.Close(); Console.ReadLine(); } } }
8、Go语言生产者
package main import ( "github.com/streadway/amqp" ) var conn *amqp.Connection var ch *amqp.Channel func main() { //设置连接地址和交换器队列名称 addr := "amqp://test:test@192.168.1.104:5672/" exname := "amqp.ex" quname := "amqp.qu" //创建连接 conn, _ = amqp.Dial(addr) //创建通道,注意,一个连接可以有多个通道 ch, _ = conn.Channel() //创建一个交换器,参数fanout说明该交换器会将消息转发到所有与之绑定的队列中 ch.ExchangeDeclare(exname, "fanout", true, false, false, true, nil) //创建一个队列,用来接受和消费信息,注意这里的队列名称,消费者获取数据需要该名称 ch.QueueDeclare(quname, true, false, false, false, nil) //绑定交换器和队列 ch.QueueBind(quname, "", exname, false, nil) //发送消息 ch.Publish(exname, quname, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte("hello,world"), }) //关闭连接 conn.Close() ch.Close() select {} }
Go语言消费者
package main import ( "bytes" "fmt" "github.com/streadway/amqp" ) var conn *amqp.Connection var ch *amqp.Channel func main() { addr := "amqp://test:test@192.168.1.104:5672/" quname := "amqp.qu" conn, _ = amqp.Dial(addr) ch, _ = conn.Channel() //接受消息 value, _ := ch.Consume(quname, "", true, false, false, false, nil) go func() { for v := range value { s := bytesToString(&(v.Body)) fmt.Println(*s) } }() conn.Close() ch.Close() select {} } func bytesToString(b *[]byte) *string { s := bytes.NewBuffer(*b) r := s.String() return &r }
9、PHP生产者
<?php //配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_linvo'; //交换机名 //$q_name = 'q_linvo'; //无需队列名 $k_route = 'key_1'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($conn); //创建交换机对象 $ex = new AMQPExchange($channel); $ex->setName($e_name); date_default_timezone_set("Asia/Shanghai"); //发送消息 //$channel->startTransaction(); //开始事务 for($i=0; $i<5; ++$i){ sleep(1);//休眠1秒 //消息内容 $message = "TEST MESSAGE!".date("h:i:sa"); echo "Send Message:".$ex->publish($message, $k_route)." "; } //$channel->commitTransaction(); //提交事务 $conn->disconnect(); ?>
PHP消费者
<?php //配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_linvo'; //交换机名 $q_name = 'q_linvo'; //队列名 $k_route = 'key_1'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($conn); //创建交换机 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declare()." "; //创建队列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 echo "Message Total:".$q->declare()." "; //绑定交换机与队列,并指定路由键 echo 'Queue Bind: '.$q->bind($e_name, $k_route)." "; //阻塞模式接收消息 echo "Message: "; while(True){ $q->consume('processMessage'); //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 } $conn->disconnect(); /** * 消费回调函数 * 处理消息 */ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg." "; //处理消息 $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 } ?>
参考:
https://blog.csdn.net/calm_85/article/details/80848664
https://www.shiyanlou.com/courses/630/learning/
https://blog.csdn.net/pony_maggie/article/details/69781478
https://blog.csdn.net/luoye4321/article/details/83722437