zoukankan      html  css  js  c++  java
  • rabbitmq 和 kafka 简单的性能测试

    测试环境:ubuntu 15.10 64位

    cpu:inter core i7-4790 3.60GHZ * 8

    内存:16GB

    硬盘:ssd 120GB

    软件环境:rabbmitmq 3.6.0   kafka0.8.1  (均为单机本机运行)

     PS: 测试结果均为单操作测试,即生产的时候没有消费操作

    测试结果:

    kafka :消费速度: 37,586 /s  生产速度: 448,753 /s

    rabbitmq: 消费速度: 20,807 /s  生产速度  16.413 /s

    出现问题:

    rabbitmq 生产4分钟左右出现队列阻塞,无法继续添加数据,1分钟后恢复,再过大约1分钟又出现此现象并以约1分钟为间隔出现此问题。

    rabbitmq 生产对象时有不小的几率(约 1/20)添加队列失败,报出的错误是“tcp链接重置”

    其他并无任何问题

    结论:

      很明显的看出kafka的性能远超rabbitmq。不过这也是理所当然的,毕竟2个消息队列实现的协议是不一样的,处理消息的场景也大有不同。rabbitmq适合处理一些数据严谨的消息,比如说支付消息,社交消息等不能丢失的数据。kafka是批量操作切不报证数据是否能完整的到达消费者端,所以适合一些大量的营销消息的场景。

    代码:

    kafka:

    package main
    import (
        "github.com/Shopify/sarama"
        "os"
        "os/signal"
        "sync"
        "log"
        "time"
    )
    
    
    func main() {
        go producer()
    //    go consumer()
        time.Sleep(10*time.Minute)
    }
    
    func producer()  {
        config :=sarama.NewConfig()
        config.Producer.Return.Successes = true
        proder,err := sarama.NewAsyncProducer([]string{"localhost:9092"},config)
        if err != nil {
            panic(err)
        }
    
        signals :=make(chan  os.Signal,1)
        signal.Notify(signals,os.Interrupt)
    
        var (
            wg                          sync.WaitGroup
            enqueued, successes, errors int
        )
    
        wg.Add(1)
        go func() {
            defer  wg.Done()
            for _=range proder.Successes(){
                successes++
            }
        }()
        wg.Add(1)
        go func() {
            defer wg.Done()
            for err := range proder.Errors(){
                log.Println(err)
                errors++
            }
        }()
    
        go func() {
            t1 := time.NewTicker(time.Second)
            for{
                <- t1.C
                log.Println(enqueued)
            }
        }()
    
        ProducerLoop:
    
        for{
            message :=&sarama.ProducerMessage{Topic:"test",Value:sarama.StringEncoder("testing 123")}
            select {
            case proder.Input() <- message:
                enqueued++
    
            case <- signals:
                proder.AsyncClose()
                break ProducerLoop
            }
    
        }
    
        wg.Wait()
        log.Println("Successfully produced:%d;errors:%d
    ",successes,errors)
    
    }
    
    func consumer()  {
        coner,err := sarama.NewConsumer([]string{"localhost:9092"},nil)
        if err != nil {
            panic(err)
        }
    
        defer func() {
            if err :=coner.Close(); err !=nil{
                log.Fatalln(err)
            }
        }()
    
        partitionConsumer ,err := coner.ConsumePartition("test",0,sarama.OffsetNewest)
        if err != nil {
            panic(err)
        }
    
        defer func() {
            if err := partitionConsumer.Close();err!=nil{
                log.Fatalln(err)
            }
        }()
    
    
    
    
        signals := make(chan os.Signal,1)
        signal.Notify(signals,os.Interrupt)
        consumed:=0
    
        go func() {
            t1 := time.NewTicker(time.Second)
            for{
                <- t1.C
                log.Println(consumed)
            }
        }()
    
        ConsumerLoop:
        for{
            select {
            case _ = <-partitionConsumer.Messages():
    
                consumed++
    //            log.Println( string(msg.Value),"  =>  ",consumed)
            case <-signals:
                break ConsumerLoop
            }
        }
    
        log.Printf("Consumed: %d
    ", consumed)
    }

    rabbitmq:

    package main
    
    import (
        "github.com/streadway/amqp"
        "time"
        "fmt"
        "log"
    )
    
    const (
        queueName = "push.msg.q"
        exchange  = "t.msg.ex"
        mqurl ="amqp://shimeng:shimeng1015@192.168.155.106:5672/push"
    
    )
    
    var conn *amqp.Connection
    var channel *amqp.Channel
    
    func main() {
        fmt.Println(1)
    //    push()
        receive()
    //    fmt.Println("end")
    //    close()
    }
    
    func failOnErr(err error, msg string) {
        if err != nil {
            log.Fatalf("%s:%s", msg, err)
            panic(fmt.Sprintf("%s:%s", msg, err))
        }
    }
    
    func mqConnect() {
        var err error
        conn, err = amqp.Dial(mqurl)
        if err != nil {
            log.Println(1)
            log.Fatalln(err)
        }
        fmt.Println(5)
        channel, err = conn.Channel()
        if err != nil {
            fmt.Println(2)
            log.Fatalln(err)
        }else {
            fmt.Println("a")
        }
    }
    
    func push() {
        count := 0
        if channel == nil {
            fmt.Println(2)
            mqConnect()
        }else {
            fmt.Println(3)
        }
        msgContent := "hello world!"
        t1 := time.NewTicker(time.Second)
    
        go func() {
            for{
                <- t1.C
                log.Println(count)
            }
        }()
    
        for{
            err := channel.Publish(exchange, "test", false, false, amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(msgContent),
            })
            if err != nil {
    
            }else {
                count ++
            }
    
        }
    
    }
    
    func receive() {
        if channel == nil {
            mqConnect()
        }
        count :=0
        msgs, err := channel.Consume(queueName, "", true, false, false, false, nil)
        failOnErr(err, "")
    
        forever := make(chan bool)
    
        t1 := time.NewTicker(time.Second)
        go func() {
            for{
                <- t1.C
                log.Println(count)
            }
        }()
        go func() {
            //fmt.Println(*msgs)
            for _= range msgs {
                count ++
    //            s := BytesToString(&(d.Body))
    //            count++
    //            fmt.Printf("receve msg is :%s -- %d
    ", *s, count)
            }
        }()
    
        fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C
    ")
        <-forever
    }
  • 相关阅读:
    浅析全球电信运营商排名
    《时空骇客》中的远距传物理论和虫洞理论
    优秀的商业计划书一定会“动”
    手机搜索的商业模式
    手机网游排行榜
    手机按键对应表
    "Avatar模式"透析
    百度数据暗示无线互联网将以个人为中心
    一种精神致加西亚的信
    手机定位技术将成社交网络催化剂
  • 原文地址:https://www.cnblogs.com/shi-meng/p/5190980.html
Copyright © 2011-2022 走看看