zoukankan      html  css  js  c++  java
  • BeanStalkd 做队列服务

    今天无意间看到这个仓库讲php关于 BeanStalkd 的扩展,然后就去了解了一下beanstalkd,才知道它可以用来做队列服务。

    话不多说,安装一下试试。

    首先 sudo apt search beanstalk 搜索一下发现

    Sorting... Done
    Full Text Search... Done
    awscli/focal-updates,focal-updates 1.18.69-1ubuntu0.20.04.1 all
      Universal Command Line Environment for AWS
    
    beanstalkd/focal,now 1.11-1 amd64 [installed]
      simple, in-memory, workqueue service
    
    python-celery-common/focal,focal 4.2.1-5ubuntu1 all
      async task/job queue based on message passing (common files)
    
    python-celery-doc/focal,focal 4.2.1-5ubuntu1 all
      async task/job queue based on message passing (Documentation)
    
    python3-celery/focal,focal 4.2.1-5ubuntu1 all
      async task/job queue based on message passing (Python3 version)
    
    ruby-beaneater/focal,focal 1.0.0-1 all
      simple beanstalkd client for Ruby

    好了,找到这个beanstalkd了

    然后安装

     sudo apt-get install beanstalkd 

    Reading package lists... Done
    Building dependency tree       
    Reading state information... Done
    Suggested packages:
      doc-base
    The following NEW packages will be installed:
      beanstalkd
    0 upgraded, 1 newly installed, 0 to remove and 90 not upgraded.
    Need to get 43.9 kB of archives.
    After this operation, 125 kB of additional disk space will be used.
    Get:1 http://cn.archive.ubuntu.com/ubuntu focal/universe amd64 beanstalkd amd64 1.11-1 [43.9 kB]
    Get:1 http://cn.archive.ubuntu.com/ubuntu focal/universe amd64 beanstalkd amd64 1.11-1 [43.9 kB]
    Fetched 31.3 kB in 4s (7,047 B/s)     
    Selecting previously unselected package beanstalkd.
    (Reading database ... 256731 files and directories currently installed.)
    Preparing to unpack .../beanstalkd_1.11-1_amd64.deb ...
    Unpacking beanstalkd (1.11-1) ...
    Setting up beanstalkd (1.11-1) ...
    Created symlink /etc/systemd/system/multi-user.target.wants/beanstalkd.service → /lib/systemd/system/beanstalkd.service.
    beanstalkd.socket is a disabled or a static unit, not starting it.
    Processing triggers for man-db (2.9.1-1) ...
    Processing triggers for systemd (245.4-4ubuntu3.11) ...

    安装成功,这个东西会监听 11300 端口

    然后使用这个工具来看看

    监控工具

    wget https://github.com/src-d/beanstool/releases/download/v0.2.0/beanstool_v0.2.0_linux_amd64.tar.gz

    获取文件后解压

    tar -xvzf beanstool_v0.2.0_linux_amd64.tar.gz

    然后拷贝到 /usr/local/bin/

    sudo cp beanstool_v0.2.0_linux_amd64/beanstool /usr/local/bin/

    这样就直接用 beanstool 了

    查看当前状态

    beanstool stats

    结果

    +---------+----------+----------+----------+----------+----------+----------+----------+
    | Name    | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
    +---------+----------+----------+----------+----------+----------+----------+----------+
    | default | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
    +---------+----------+----------+----------+----------+----------+----------+----------+

    然后使用composer的vendor包

    composer require pda/pheanstalk

    安装完毕后创建一个input.php文件做生产者

    <?php
    require __DIR__ . '/vendor/autoload.php';
    
    use PheanstalkPheanstalk;
    
    $pheanstalk = Pheanstalk::create('127.0.0.1');
    
    // Queue a Job
    $pheanstalk
      ->useTube('testtube')
      ->put("job payload goes here
    ");
    
    $pheanstalk
        ->useTube('testtube')
        ->put(
            json_encode(['test' => 'data']),  // encode data in payload
            Pheanstalk::DEFAULT_PRIORITY,     // default priority
            30, // delay by 30s
            60  // beanstalk will retry job after 60s
         );

    再创建一个output.php文件做消费者

    <?php
    require __DIR__ . '/vendor/autoload.php';
    use PheanstalkPheanstalk;
    
    $pheanstalk = Pheanstalk::create('127.0.0.1');
    
    // we want jobs from 'testtube' only.
    $pheanstalk->watch('testtube');
    
    // this hangs until a Job is produced.
    $job = $pheanstalk->reserve();
    
    try {
        $jobPayload = $job->getData();
        // do work.
    
        sleep(2);
        // If it's going to take a long time, periodically
        // tell beanstalk we're alive to stop it rescheduling the job.
        $pheanstalk->touch($job);
        sleep(2);
    
        // eventually we're done, delete job.
        $pheanstalk->delete($job);
    }
    catch(Exception $e) {
        // handle exception.
        // and let some other worker retry.
        $pheanstalk->release($job); 
    }

    然后执行一下  php input.php 

    查看 状态

    $ beanstool stats
    +----------+----------+----------+----------+----------+----------+----------+----------+
    | Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
    +----------+----------+----------+----------+----------+----------+----------+----------+
    | default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
    | testtube | 0        | 1        | 1        | 0        | 0        | 0        | 2        |
    +----------+----------+----------+----------+----------+----------+----------+----------+

    我们看到有一个在ready状态,一个在delayed状态,这是由于第二次的put采用了延时30s,然后过一段时间后再看

    $ beanstool stats
    +----------+----------+----------+----------+----------+----------+----------+----------+
    | Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
    +----------+----------+----------+----------+----------+----------+----------+----------+
    | default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
    | testtube | 0        | 0        | 2        | 0        | 0        | 0        | 2        |
    +----------+----------+----------+----------+----------+----------+----------+----------+

    已经有2个在ready状态了。

    此时我们用消费者执行一下 php output.php 

    与此同时迅速看状态

    $ beanstool stats
    +----------+----------+----------+----------+----------+----------+----------+----------+
    | Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
    +----------+----------+----------+----------+----------+----------+----------+----------+
    | default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
    | testtube | 0        | 0        | 1        | 1        | 0        | 0        | 2        |
    +----------+----------+----------+----------+----------+----------+----------+----------+
    再次执行
    $ beanstool stats
    +----------+----------+----------+----------+----------+----------+----------+----------+
    | Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
    +----------+----------+----------+----------+----------+----------+----------+----------+
    | default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
    | testtube | 0        | 0        | 1        | 0        | 0        | 0        | 2        |
    +----------+----------+----------+----------+----------+----------+----------+----------+

    因为我们有sleep(2),所以要尽量快点操作这个状态监控的命令,可以看到有一个拿出来放入了reserved,然后就消失了(实际上这是后面的代码delete导致的,因为已经消费完毕)

    再次执行 php output.php 

    $ beanstool stats
    +----------+----------+----------+----------+----------+----------+----------+----------+
    | Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
    +----------+----------+----------+----------+----------+----------+----------+----------+
    | default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
    | testtube | 0        | 0        | 0        | 1        | 0        | 0        | 2        |
    +----------+----------+----------+----------+----------+----------+----------+----------+
    $ beanstool stats
    +---------+----------+----------+----------+----------+----------+----------+----------+
    | Name    | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
    +---------+----------+----------+----------+----------+----------+----------+----------+
    | default | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
    +---------+----------+----------+----------+----------+----------+----------+----------+

    同样也是迅速观测这个状态,发现消费1个,然后删除1个,现在队列空了,这说明确实是符合我们的期望的。

    然后回到文章开头提到的扩展,这个扩展就是帮我们实现了composer装的那个pheanstalk包。

    这个扩展如何安装呢?

    步骤如下:

    克隆项目

    git clone https://gitee.com/qzfzz/php-beanstalk.git

    进行编译 

    phpize

    然后

    ./configure

    之后

    make

    再 make install

    sudo make install

    然后修改 php.ini

    sudo gedit /etc/php/7.4/cli/php.ini /etc/php/7.4/apache2/php.ini

    加上这句

    extension=beanstalk

    重启apache2

    sudo /etc/init.d/apache2 restart

    之后就可以使用这个扩展了。

    这个扩展它封装为函数了,可以看到他有个例子文件

    简单的找了几个例子

    <?php
    
    $config = [
      'host' => '127.0.0.1',
      'port' => 11300
    ];
    $beanstalk_obj = beanstalk_connect($config['host'], $config['port']);
    $last_job_id = beanstalk_put($beanstalk_obj, "message detail");
    beanstalk_delete($beanstalk_obj, $last_job_id);
    $last_job_id = beanstalk_putInTube($beanstalk_obj, 'tubea', "message detail");

    可以看到使用 connect 连接, put 塞入新的job消息, putInTube 来塞入指定管道的tubea,delete来删除等等,具体可以看看源代码学习一下,我对比了一下这两种方式实现效率。

    第一种采用composer包(我还特意去掉了加载文件所需要的时间)

    <?phprequire __DIR__ . '/vendor/autoload.php';
    
    use PheanstalkPheanstalk;
    
    $start = microtime( true );
    $pheanstalk = Pheanstalk::create('127.0.0.1');
    
    $pheanstalk
      ->useTube('testtube')
      ->put(date("Y-m-d H:i:s") . "job payload goes here
    ");
    
    $end = microtime(true);
    
    echo ($end - $start) * 1000 . " ms";

    执行需要2.59ms

    第二种直接用扩展函数

    <?php
    $start = microtime(true);
    
    $config = [
      'host' => '127.0.0.1',
      'port' => 11300
    ];
    
    $beanstalk_object = beanstalk_connect($config['host'], $config['port']);
    $last_job_id = beanstalk_putInTube($beanstalk_object, 'testtube', date("Y-m-d H:i:s") . "job payload goes here
    ");
    
    $end = microtime(true);
    
    echo ($end - $start) * 1000 . " ms";

    执行需要0.34ms

    不得不说,扩展就是扩展,就是快的多啊!

    我另外测试了一下投递极限

    循环投递10000次消息,大概在500ms左右

    $start = microtime( true );
    for ($i=0; $i < 10000; $i++) { 
      $pheanstalk
      ->useTube('testtube')
      ->put(date("Y-m-d H:i:s") . "job payload goes here
    ");
    }
    $end = microtime( true );
    echo ($end - $start) * 1000 . " ms";

    也就意味着1秒钟只能投递20000条消息,这比rabbitmq的投递慢多了(参见这篇文章)

    但是它执行1次投递消息的时候却比这个rabbitmq的代码执行的快,原因是我测试了一下这个rabbitmq的连接上就耗费了9ms

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

    更别提还有这两步了

    $channel = $connection->channel();
    $channel->queue_declare('hello', false, false, false, false);

    这样来看,想迅速投递一条短消息,或者建立小信息量的job用beanstalk是很不错的,如果有大量消息集中投递,那使用 rabbitmq 是很不错的。

    另外这个beanstalk投递可以延时,非常适合有些时候需要在当前时间一段时间后执行某个任务,往后弄个类似于一次性的定时器的功能,这个东西值得尝试。

    而且如果使用while死循环将 output.php 脚本一直挂着跑,它就能一直消费消息了,这样就等于有个后端进程一直能帮我们做消费者处理堆积的任务了,特殊场景可以考虑一下这个方案。

  • 相关阅读:
    win7下安装配置tomcat,java运行环境
    Ubuntu 12.10安装配置JDK7环境
    全面介绍Linux终端命令
    Ubuntu 配置 Tomcat
    Linux手动导入导出mysql数据库
    navicat数据库管理软件(支持mysql,oracle,sqlserver,sqlite,postgreSQL)
    信号量(semaphore)和互斥量(mutex)
    最新版fcitx 4.1.2源码编译安装(ubuntu 10.04)
    Groovy动态语言简介
    读书笔记:《java脚本编程:语言、框架与模式》(2)jvm内部的脚本语言
  • 原文地址:https://www.cnblogs.com/lizhaoyao/p/15395242.html
Copyright © 2011-2022 走看看