zoukankan      html  css  js  c++  java
  • RabbitMQ官方教程一Hello World(GOLANG语言实现)

    介绍

    RabbitMQ是消息中间件:它接受并转发消息。
    您可以将其视为邮局系统:将要发送的邮件放在邮箱中时,
    可以确保邮递员最终将邮件传递给收件人。
    以此类推,RabbitMQ是一个邮箱,一个邮局和一个邮递员。

    RabbitMQ与邮局之间的主要区别在于,
    它不处理纸张,而是接收,存储和转发数据消息的二进制数据。

    以下是RabbitMQ和消息发送的术语

    • Producer:生产者。负责生产消息。

    生产者

    • Queue:队列。负责存储消息。队列在RabbitMQ中充当邮箱的角色,消息传递到RabbitMQ中,只能存储在队列中。队列受主机内存和磁盘大小的约束。本质是一个很大的消息缓冲区。
      许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。

    队列

    • Consumer:消费者。负责处理消息。

    ** 笔者补充

    • 参考连接

    • Connect:连接。生产者和RabbitMQ服务之间建立的TCP连接。

    • Channel:信道,一条连接可包含多条信道,不同信道之间通信互不干扰。考虑下多线程应用场景,每个线程对应一条信道,而不是对应一条连接,这样可以提高性能。

    • body:消息主体,要传递的数据。

    • exchange:交换器,负责把消息转发到对应的队列。交换器本身没有缓存消息的功能,消息是在队列中缓存的,如果队列不存在,则交换器会直接丢弃消息。常用的有四种类型的交换器:direct、fanout、topic、headers。不同类型的交换器有不同的交换规则,交换器会根据交换规则把消息转发到对应的队列。

    • exchangeName:交换器名称,每个交换器对应一个名称,发送消息时会附带交换器名称,根据交换器名称选择对应的交换器。

    • BandingKey:绑定键,一个队列可以有一个到多个绑定键,通过绑定操作可以绑定交换器和队列,交换器会根据绑定键的名称找到对应的队列。

    • RotingKey:路由键,发送消息时,需要附带一条路由键,交换器会对路由键和绑定键进行匹配,如果匹配成功,则消息会转发到绑定键对应的队列中。

    **简而言之就是:

    1. 生产者指定路由Key和交换器的名字发送给RabbitMQ服务

    2. 指定名字的交换器根据路由key去找到绑定的队列

    3. 将消息放入队列当中

    4. 消费者从队列中取出消息进行处理

    生产者

    运行图

    **linux安装RabbitMQ服务

    **Docker安装RabbitMQ服务

    实战 "Hello World"

    golang语言实现

    在本教程的这一部分中,我们将用Go编写两个小程序。 发送单个消息的生产者和接收消息并打印出来的消费者。 我们将介绍Go RabbitMQ API中的一些细节,仅着眼于此非常简单的事情。 这是消息传递的“ Hello World”。

    在下图中,“ P”是我们的生产者,“ C”是我们的消费者。 中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区。

    运行图

    Go RabbitMQ客户端库
    RabbitMQ使用多种协议。 本教程使用AMQP 0-9-1,这是一种开放的通用消息传递协议。 RabbitMQ有许多不同语言的客户。 在本教程中,我们将使用Go amqp客户端。

    go get github.com/streadway/amqp

    生产者发送数据到队列

    运行图

    #send.go 生产者,发送消息到消息队列中
    
    package main
    
    import (
    	"github.com/streadway/amqp"
    	"log"
    )
    
    func main(){
    	// 连接RabbitMQ服务器
    	conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    	// 创建一个channel
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    	
            // 声明一个队列
    	q, err  := ch.QueueDeclare(
    		"hello",			// 队列名称
    		false,			// 是否持久化
    		false,		// 是否自动删除
    		false,			// 是否独立
    		false,nil,
    		)
    	failOnError(err, "Failed to declare a queue")
    	// 发送消息到队列中
    	body := "Hello World!"
    	err = ch.Publish(
    		"",     // exchange
    		q.Name, // routing key
    		false,  // mandatory
    		false,  // immediate
    		amqp.Publishing {
    			ContentType: "text/plain",
    			Body:        []byte(body),
    		})
    	failOnError(err, "Failed to publish a message")
    	fmt.Println("send message success
    "
    }
    
    // 帮助函数检测每一个amqp调用
    func failOnError(err error, msg string)  {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    
    
    

    消费者从队列接收数据

    运行图

    
    package main
    
    import (
    	"github.com/streadway/amqp"
    	"log"
    )
    
    func main(){
    	// 连接RabbitMQ服务器
    	conn, err := amqp.Dial("amqp://admin:admin@47.97.215.189:5672/admin")
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    	// 创建一个channel
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    	// 监听队列
    	q, err  := ch.QueueDeclare(
    		"hello",			// 队列名称
    		false,			// 是否持久化
    		false,		// 是否自动删除
    		false,			// 是否独立
    		false,nil,
    	)
    	failOnError(err, "Failed to declare a queue")
    	// 消费队列
    	msgs, err := ch.Consume(
    		q.Name, // queue
    		"",     // consumer
    		true,   // auto-ack
    		false,  // exclusive
    		false,  // no-local
    		false,  // no-wait
    		nil,    // args
    	)
    	failOnError(err, "Failed to register a consumer")
        // 申明一个goroutine,一遍程序始终监听
    	forever := make(chan bool)
    
    	go func() {
    		for d := range msgs {
    			log.Printf("Received a message: %s", d.Body)
    		}
    	}()
    
    	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    	<-forever
    }
    
    // 帮助函数检测每一个amqp调用
    func failOnError(err error, msg string)  {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    
    
    
  • 相关阅读:
    DLUTOJ 1209 字典序和r-子集
    C++ Standard-Library Random Numbers
    后缀数组模板
    UVA 1398 Meteor
    STL Iterators
    hihocoder1187 Divisors
    51nod 子序列的个数(动态规划)
    51nod 最长单增子序列(动态规划)
    51nod 更难的矩阵取数问题(动态规划)
    51nod 多重背包问题(动态规划)
  • 原文地址:https://www.cnblogs.com/zhouqi666/p/12044253.html
Copyright © 2011-2022 走看看