zoukankan      html  css  js  c++  java
  • golang连接activemq,发送接收数据

    介绍

    使用golang连接activemq发送数据的话,需要使用一个叫做stomp的包,直接go get github.com/go-stomp/stomp即可
    

    代码

    生产者

    package main
    
    import (
    	"fmt"
    	"github.com/go-stomp/stomp"
    	"time"
    )
    
    func main(){
    	// 调用Dial方法,第一个参数是"tcp",第二个参数则是ip:port
    	// 返回conn(连接)和err(错误)
    	conn,err:=stomp.Dial("tcp", "47.adsasaads89:61613")
    	// 错误判断
    	if err!=nil{
    		fmt.Println("err =", err)
    		return
    	}
    	//发送十条数据
    	for i:=0;i<10;i++ {
    		// 调用conn下的send方法,接收三个参数
    		//参数一:队列的名字
    		//参数二:数据类型,一般是文本类型,直接写text/plain即可
    		//参数三:内容,记住要转化成byte数组的格式
    		//返回一个error
    		err := conn.Send("testQ", "text/plain",[]byte(fmt.Sprintf("message:%d", i)))
    		if err!=nil{
    			fmt.Println("err =", err)
    		}
    	}
    	/*
    	这里为什么要sleep一下,那就是conn.Send这个过程是不阻塞的
    	相当于Send把数据放到了一个channel里面
    	另一个goroutine从channel里面去取数据再放到消息队列里面
    	但是还没等到另一个goroutine放入数据,此时循环已经结束了
    	因此最好要sleep一下,根据测试,如果不sleep,那么发送1000条数据,
    	最终进入队列的大概是980条数据,这说明了什么
    	说明了当程序把1000条数据放到channel里面的时候,另一个goroutine只往队列里面放了980条
    	剩余的20条还没有来得及放入,程序就结束了
    	 */
    	time.Sleep(time.Second * 1)
    }
    

    消费者

    package main
    
    import (
    	"fmt"
    	"github.com/go-stomp/stomp"
    	"time"
    )
    
    func recv_data(ch chan *stomp.Message) {
    	//不断地循环,从channel里面获取数据
    	for {
    		v := <-ch
    		//这里是打印当然还可以做其他的操作,比如写入hdfs平台
    		//v是*stomp.Message类型,属性都在这里面
    		
    		/*
    		type Message struct {
    			// Indicates whether an error was received on the subscription.
    			// The error will contain details of the error. If the server
    			// sent an ERROR frame, then the Body, ContentType and Header fields
    			// will be populated according to the contents of the ERROR frame.
    			Err error
    		
    			// Destination the message has been sent to.
    			Destination string
    		
    			// MIME content type.
    			ContentType string // MIME content
    		
    			// Connection that the message was received on.
    			Conn *Conn
    		
    			// Subscription associated with the message.
    			Subscription *Subscription
    		
    			// Optional header entries. When received from the server,
    			// these are the header entries received with the message.
    			Header *frame.Header
    		
    			// The ContentType indicates the format of this body.
    			Body []byte // Content of message
    		}
    		 */
    		fmt.Println(string(v.Body))
    	}
    }
    
    func main() {
    	//创建一个channel,存放的是*stomp.Message类型
    	ch := make(chan *stomp.Message)
    	//将管道传入函数中
    	go recv_data(ch)
    	//和生产者一样,调用Dial方法,返回conn和err
    	conn, err := stomp.Dial("tcp", "47.dsdsadsa9:61613")
    	if err != nil {
    		fmt.Println("err =", err)
    	}
    	//消费者订阅这个队列
    	//参数一:队列名
    	//参数二:确认信息,直接填默认地即可
    	sub, err := conn.Subscribe("testQ", stomp.AckMode(stomp.AckAuto))
    	for { //无限循环
    		select {
    		//sub.C是一个channel,如果订阅的队列有数据就读取
    		case v := <-sub.C:
    			//读取的数据是一个*stomp.Message类型
    			ch <- v
    			//如果30秒还没有人发数据的话,就结束
    		case <-time.After(time.Second * 30):
    			return
    		}
    	}
    }
    
    message:0
    message:1
    message:2
    message:3
    message:4
    message:5
    message:6
    message:7
    message:8
    message:9
    
  • 相关阅读:
    黑盒测试分类
    Java变量的初始化顺序
    javahttp请求四种方式
    mybatis sql查询慢
    jacoco搭配springMVC maven tomcat项目,单元测试&接口测试踩坑
    maven标签pluginManagement和plugins区别
    Java反射学习之Field类访问和修饰变量
    Classpath重新认识
    properties文件加载的六种方法
    VMware ESXI6.0安装和配置IP地址
  • 原文地址:https://www.cnblogs.com/traditional/p/11193524.html
Copyright © 2011-2022 走看看