zoukankan      html  css  js  c++  java
  • Beanstalkd消息队列 -- php类Pheanstalk使用

    业务场景

      商城订单生成30分钟后 如果未支付关闭订单

    解决办法

    可以使用延迟消息队列   这里我们用的是beanstalkd

    Beanstalkd介绍

    Beanstalk,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟。

    Beanstalkd特性

    1、支持优先级(支持任务插队)
    2、延迟(实现定时任务)
    3、持久化(定时把内存中的数据刷到binlog日志)
    4、预留(把任务设置成预留,消费者无法取出任务,等某个合适时机再拿出来处理)
    5、任务超时重发(消费者必须在指定时间内处理任务,如果没有则认为任务失败,重新进入队列)

    任务job状态

    delayed 延迟状态
    ready 准备好状态
    reserved 消费者把任务读出来,处理时
    buried 预留状态
    delete 删除状态



    Beanstalkd安装

    yum -y install beanstalkd
    

      

    Beanstalkd守护进程启动

    nohup beanstalkd -l 0.0.0.0 -p 11300 -b /home/software/binstalkd/binlogs &
    

    1. -b表示开启binlog,断电后重启自动恢复任务

    2.  /home/software/binstalkd/binlogs是我们已经创建好的一个文件目录 这里面主要放一些持久化的日志 

    Pheanstalk安装

          composer require pda/pheanstalk

    PheanstalkGit仓库地址

        https://github.com/pda/pheanstalk

    连接Beanstalkd

     
    <?php
    require __DIR__ . '/vendor/autoload.php';
    
    use PheanstalkPheanstalk;
    
    /**
     * 实例化beanstalk
     * 参数依次为:ip地址 端口号默认11300 连接超时时间 是否长连接
     */
    $pheanstalk = new Pheanstalk('127.0.0.1', 11300, 3, false);
    ?>

    producer.php 生产者

    <?php
    
    // Hopefully you're using Composer autoloading.
    require_once __DIR__.'/vendor/autoload.php';
    use PheanstalkPheanstalk;
    
    /**
     * 实例化beanstalk
     * 参数依次为:ip地址 端口号默认11300 连接超时时间 是否长连接
     */
    $pheanstalk = new Pheanstalk('127.0.0.1', 11300, 3, false);
    
    //向tubeName管道中添加任务,返回任务ID
    //put()方法有四个参数
    //第一个任务的数据  
    //第二个任务的优先级,值越小,越先处理 (默认为0  no delay)
    //第三个任务的延迟 单位:秒 (most urgent: 0, least urgent: 4294967295)
    //第四个任务的ttr超时时间 单位:秒
    $arr['name'] = 'mark';
    $jobData = json_encode($arr);
    
    $pheanstalk ->useTube('tubeName') ->put($jobData,1024,5);
    

      

    Beanstalkd生产者方法

    指定需要使用的管道

    $tube = $pheanstalk->useTube('default');

    向管道插入数据

    $tube = $pheanstalk->useTube('default');
    $put = $tube->put(
        'hello, beanstalk', // 任务内容
        1024, // 任务的优先级
        10,  // 不等待直接放到ready队列中
        60 // 处理任务的时间
    );

    或者:

    $pheanstalk->putInTube('default', 'test1', 1024, 10, 60);

    ---------------------------------------------------------------------------------------------------------------------+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

    consumer.php 消费者

    <?php

    // Hopefully you're using Composer autoloading.
    require_once __DIR__.'/vendor/autoload.php';
    use PheanstalkPheanstalk;
    
    /**
     * 实例化beanstalk
     * 参数依次为:ip地址 端口号默认11300 连接超时时间 是否长连接
     */
    $pheanstalk = new Pheanstalk('127.0.0.1', 11300, 3, false);
    
    //监听tubeName管道,忽略default管道
    $job = $pheanstalk ->watch('tubeName') ->ignore('default') ->reserve();
    
    echo $job->getData();
    
    //删除当前任务 $pheanstalk->delete($job);

      

    Beanstalkd消费者方法

    监听管道

    $tube = $pheanstalk->watch('user');

    去除不需要监听的管道

    $tube = $pheanstalk->watch('user')->ignore('default');

    以堵塞的方式监听管道

    $job = $pheanstalk->watch('user')->reserve(4); //堵塞时间为4秒

    列出所有已经监听的管道

    $pheanstalk->listTubesWatched();

    watch + reserve 方法

    $pheanstalk->reserveFromTube('default')

    删除当前任务

    $job =  $pheanstalk->watch('default')->reserve();
    $pheanstalk->delete($job);

    将当前任务重新放入管道

    $job =  $pheanstalk->watch('default')->reserve();
    $pheanstalk->release($job);

    为任务续命(当处理任务的时间小于当前任务执行时间时)

    $job =  $pheanstalk->watch('default')->reserve();
    $pheanstalk->touch($job);
    //TODO

    将任务预留

    $job =  $pheanstalk->watch('default')->reserve();
    $pheanstalk->bury($job);

    将预留任务释放(变为reday状态)

    $job = $pheanstalk->peekBuried('default');
    $pheanstalk->kickJob($job);

    批量将预留任务释放

    $pheanstalk->userTube('default')->kick(999); //将id小于999的预留任务全部释放

    读取当前准备就绪的任务(ready)

    $job = $pheanstalk->peekReady('default');

    读取当前处于延迟状态的任务(delayed)

    $job = $pheanstalk->peekDelayed('default');

    对管道设置延迟

    $pheanstalk->pauseTube('default', 100); //设置100秒延迟

    取消对管道的延迟

    $pheanstalk->resumeTube('default');
    
    
    
     
  • 相关阅读:
    数据库备份 DBS(Database Backup),知识点
    关系型数据库 RDS(Relational Database Service),知识点
    对象存储服务 OSS(Object Storage Service),知识点(待补充上仓库代码)
    Java 为什么需要包装类,如何使用包装类?
    for each 语句
    缓存中,2个注解:@cacheable 与 @cacheput 的区别
    微信小程序,相关代码
    微信小程序中的事件
    通俗易懂:索引、单列索引、复合索引、主键、唯一索引、聚簇索引、非聚簇索引、唯一聚簇索引 的区别与联系
    MySQL 的各种 join
  • 原文地址:https://www.cnblogs.com/foreversun/p/10554018.html
Copyright © 2011-2022 走看看