zoukankan      html  css  js  c++  java
  • laravel 5.6 使用RabbitMQ作为消息中间件

    1、Composer安装laravel-queue-rabbitmq
    composer require vladimir-yuldashev/laravel-queue-rabbitmq
    2、在config/app.php文件中,providers中添加:
    VladimirYuldashevLaravelQueueRabbitMQLaravelQueueRabbitMQServiceProvider::class,
    3、在app/config/queue.php配置文件中的connections数组中加入以下配置

    'rabbitmq' => [
    
                'driver' => 'rabbitmq',
    
                'dsn' => env('RABBITMQ_DSN', null),
    
                /*
                 * Could be one a class that implements InteropAmqpAmqpConnectionFactory for example:
                 *  - EnqueueAmqpExtAmqpConnectionFactory if you install enqueue/amqp-ext
                 *  - EnqueueAmqpLibAmqpConnectionFactory if you install enqueue/amqp-lib
                 *  - EnqueueAmqpBunnyAmqpConnectionFactory if you install enqueue/amqp-bunny
                 */
    
                'factory_class' => EnqueueAmqpLibAmqpConnectionFactory::class,
    
                'host' => env('RABBITMQ_HOST', '127.0.0.1'),
                'port' => env('RABBITMQ_PORT', 5672),
    
                'vhost' => env('RABBITMQ_VHOST', '/'),
                'login' => env('RABBITMQ_LOGIN', 'guest'),
                'password' => env('RABBITMQ_PASSWORD', 'guest'),
    
                'queue' => env('RABBITMQ_QUEUE', 'default'),
    
                'options' => [
    
                    'exchange' => [
    
                        'name' => env('RABBITMQ_EXCHANGE_NAME'),
    
                        /*
                         * Determine if exchange should be created if it does not exist.
                         */
    
                        'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),
    
                        /*
                         * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
                         */
    
                        'type' => env('RABBITMQ_EXCHANGE_TYPE', InteropAmqpAmqpTopic::TYPE_DIRECT),
                        'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
                        'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
                        'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
                        'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),
                    ],
    
                    'queue' => [
    
                        /*
                         * Determine if queue should be created if it does not exist.
                         */
    
                        'declare' => env('RABBITMQ_QUEUE_DECLARE', true),
    
                        /*
                         * Determine if queue should be binded to the exchange created.
                         */
    
                        'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),
    
                        /*
                         * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
                         */
    
                        'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
                        'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
                        'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
                        'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
                        'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),
                    ],
                ],
    
                /*
                 * Determine the number of seconds to sleep if there's an error communicating with rabbitmq
                 * If set to false, it'll throw an exception rather than doing the sleep for X seconds.
                 */
    
                'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5),
    
                /*
                 * Optional SSL params if an SSL connection is used
                 * Using an SSL connection will also require to configure your RabbitMQ to enable SSL. More details can be founds here: https://www.rabbitmq.com/ssl.html
                 */
    
                'ssl_params' => [
                    'ssl_on' => env('RABBITMQ_SSL', false),
                    'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                    'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                    'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                    'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                    'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
                ],
    
            ],

    4、修改 .env 文件

    
    QUEUE_CONNECTION=rabbitmq    #这个配置env一般会有先找到修改为这个
    
    #以下是新增配置
    
    RABBITMQ_HOST=rabbitmq  #mq的服务器地址,我这里用的是laradock,具体的就具体修改咯
    RABBITMQ_PORT=5672  #mq的端口
    RABBITMQ_VHOST=/
    RABBITMQ_LOGIN=guest    #mq的登录名
    RABBITMQ_PASSWORD=guest   #mq的密码
    RABBITMQ_QUEUE=queue_name   #mq的队列名称

    5、创建任务类
    php artisan make:job Queue
    执行之后会生成一个文件app/Jobs/Queue.php

    例子:

    <?php
    
    namespace AppJobs;
    
    use AppEntitiesPosts;
    use IlluminateBusQueueable;
    use IlluminateFoundationBusDispatchable;
    use IlluminateQueueSerializesModels;
    use IlluminateQueueInteractsWithQueue;
    use IlluminateContractsQueueShouldQueue;
    
    class Queue  implements ShouldQueue
    {
        use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
    
        private $data;
    
        /**
         * Queue constructor.
         * @param $data
         */
        public function __construct($data)
        {
            $this->data = $data;
        }
    
        /**
         * Execute the job.
         *
         * @return void
         */
        public function handle()
        {
    
            try{
                $insert = [
                    'title'=>$this->data->title,
                    'author_id'=>$this->data->author_id,
                    'content'=>$this->data->content,
                    'description'=>$this->data->description,
                ];
                $result = Posts::create($insert);
                echo json_encode(['code' => 200, 'msg' => $result]);
            }catch (Exception $exception) {
                echo json_encode(['code'=>0,'msg'=>$exception->getMessage()]);
            }
    
        }
    }

    6、生产,把数据放进mq队列

    <?php
    
    namespace AppHttpControllers;
    
    use AppEntitiesCostaNews;
    use AppJobsQueue;
    
    class IndexController extends Controller
    {
    
        public function index()
        {
            $data = CostaNews::get();
            foreach ($data as $item) {
                $this->dispatch(new Queue($item));
            }
            return response()->json(['code'=>0, 'msg'=>"success"]);
        }
    
    }

    7、消费队列
    执行命令进行消费:
    php artisan queue:work rabbitmq
    效果如下:

    root@9e99cf9fba73:/var/www/blog# php artisan  queue:work rabbitmq
    [2018-12-24 07:34:32][5c208bf66e63b3.56379160] Processing: AppJobsQueue
    {"code":200,"msg":{"title":1,"author_id":2,"content":"u5185u5bb9","description":"u63cfu8ff0","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":1}}[2018-12-24 07:34:32][5c208bf66e63b3.56379160] Processed:  AppJobsQueue
    [2018-12-24 07:34:32][5c208bf66ff7c3.20969590] Processing: AppJobsQueue
    {"code":200,"msg":{"title":2,"author_id":2,"content":"u5185u5bb92","description":"u63cfu8ff02","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":2}}[2018-12-24 07:34:32][5c208bf66ff7c3.20969590] Processed:  AppJobsQueue
    [2018-12-24 07:34:32][5c208bf6702695.93123122] Processing: AppJobsQueue
    {"code":200,"msg":{"title":3,"author_id":2,"content":"u5185u5bb93","description":"u63cfu8ff03","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":3}}[2018-12-24 07:34:32][5c208bf6702695.93123122] Processed:  AppJobsQueue
    [2018-12-24 07:34:32][5c208bf6706e24.78015170] Processing: AppJobsQueue
    {"code":200,"msg":{"title":4,"author_id":2,"content":"u5185u5bb94","description":"u63cfu8ff04","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":4}}[2018-12-24 07:34:32][5c208bf6706e24.78015170] Processed:  AppJobsQueue
    [2018-12-24 07:34:32][5c208bf6709be0.07998731] Processing: AppJobsQueue
    {"code":200,"msg":{"title":5,"author_id":2,"content":"u5185u5bb95","description":"u63cfu8ff05","updated_at":"2018-12-24 07:34:32","created_at":"2018-12-24 07:34:32","id":5}}[2018-12-24 07:34:32][5c208bf6709be0.07998731] Processed:  AppJobsQueue

    注意:使用这个laravel-queue-rabbitmq这个包需要开启sockets拓展,不然会报错

  • 相关阅读:
    《编写可维护的JavaScript》读书笔记
    第十四天:还是看代码
    第十三天:过了一遍rt_thread,看代码架构
    第十二天:rt_thread系统
    第十一天:要做stm32了
    第十天:没太专注工作
    第九天:rtc问题查找与测试
    第八天:android编译环境搭建
    第七天:终于看到板子了
    第六天和周末:感慨下这周
  • 原文地址:https://www.cnblogs.com/sweetsunnyflower/p/10186626.html
Copyright © 2011-2022 走看看