zoukankan      html  css  js  c++  java
  • php如何使用rabbitmq实现发布消息和消费消息(tp框架)(第一篇)

    1,默认已经安装好了rabbitmq: 参考 http://www.cnblogs.com/spicy/p/7017603.html

    2,安装rabbitmq客户端: 方法1: pecl 扩展安装  方法2:composer安装

      我是用第二种: composer require php-amqplib/php-amqplib 

    3,新建一个发送的路由 和 接受的路由(tp5)

      Route::rule('test','index/index/test1');

      Route::rule('getmsg','index/receiver/receive');

    4,发布信息的方法:
      
    <?php
    namespace appindexcontroller;
    
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    use validatorValidator;
    
    class Index
    {
        //rbmp example
        public function test1()
        {
            $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
            $channel = $connection->channel();
            $channel->queue_declare('hello', false, true);
    
            $sendMsg = [
                'name'=>'kevin'.rand(1,100),
                'phone'=>'171921743'.rand(1,100),
            ];
    
            $msg = new AMQPMessage(json_encode($sendMsg));
            $channel->basic_publish($msg, '', 'hello');
            echo " [x] Sent 'Hello Kevin!'
    ";
            $channel->close();
            $connection->close();
        }
    View Code

     5,消费信息的方法:

      

    <?php
    namespace appindexcontroller;
    
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    class Receiver
    {
        public function receive()
        {
            set_time_limit(0);
            $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
            $channel = $connection->channel();
            $channel->queue_declare('hello', false, true);
    
            $receiver = new self();
            $channel->basic_consume('hello', '', false, true, false, false, [$receiver, 'callFunc']);
    
            while(true) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        }
    
        public function callFunc($msg) {
            $content = json_decode($msg->body,true);
            
            //把用户信息插入数据库
            db('user_info')->insert([
                'ui_username'=>$content['name'],
                'ui_phone'=>$content['phone'],
            ]);
            
    
        }
    
    }
    View Code

    注意:  我们公司是把消费消息做成了常驻  



      

     

  • 相关阅读:
    Flume下读取kafka数据后再打把数据输出到kafka,利用拦截器解决topic覆盖问题
    idea 党用快捷键
    Idea 调试快捷键
    log4j实时将数据写入到kafka,Demo和相关的配置详解
    windows环境下,kafka常用命令
    ElasticSearch 基本概念
    elasticsearch REST API方式批量插入数据
    自提柜-资产管理柜
    10.智能快递柜(源码下载)
    9.智能快递柜SDK(串口型锁板)
  • 原文地址:https://www.cnblogs.com/spicy/p/7886820.html
Copyright © 2011-2022 走看看