zoukankan      html  css  js  c++  java
  • php使用Pheanstalk实现消息队列

    1.安装(linux安装beanstalkd, windows暂不支持)

    # wget https://github.com/kr/beanstalkd/archive/v1.10.tar.gz

     # tar xzvf v1.10

     # cd beanstalkd-1.10/

    # make && make install

    # beanstalkd -v beanstalkd 1.10 2    

    启动:
         进入解压的文件夹里面使用beanstalkd -l 127.0.0.1 -p 11300 &
     
    2,php安装pheanstalk依赖

    composer require pda/pheanstalk

    (官网:https://github.com/pda/pheanstalk)

    下面是自己写的封装的一个类里面的方法(类的话自己创建就可以了):

    // 添加任务
        public static function addJob()
        {
            //创建队列消息
    
            $pheanstalk = self::const();
            print_r($pheanstalk->stats());//查看状态
            exit;
            $tubeName = 'email_list';
    
            $jobData = [
                'email' => '123456@163.com',
                'message' => 'Hello World !!',
                'dtime' => date('Y-m-d H:i:s'),
            ];
    
            $pda = $pheanstalk->useTube($tubeName)->put(json_encode($jobData));
            return $pda;
        }
    
        // 引用公共信息(自己写的方法连接Pheanstalk,首先在服务器开启beanstalkd)
        public static function const()
        {
            $pheanstalk = Pheanstalk::create('127.0.0.1', 11300);
            return $pheanstalk;
        }
    消费 job
    
    
    // 消费job
    public static function Consumption()
    {
    $pheanstalk = self::const();
    $tubeName = 'email_list';
    $job = $pheanstalk->watch($tubeName)->ignore('default')->reserve();
    if ($job !== false) {
    $res = $pheanstalk->statsJob($job);
    if ($res['reserves'] > 5) { //判断一个任务请求失败5次后直接删除
    $pheanstalk->delete($job);
    } else {
    $job_data = $job->getData();
    print_r($job_data);
    exit;
    self::subscribe($job_data);
    $pheanstalk->delete($job);
    }
    /* 继续 Watch 下一个 job */
    self::Consumption();
    } else {
    $pheanstalk->log->error('reserve false', 'reserve false');
    }
    }
    // 添加到数据库
        public static function subscribe($job_data)
        {
            print_r($job_data); 
    // 数据格式
    {"time":"1212312412","email":"123456@163.com","message":"Hello World !!","dtime":"2020-09-28 15:12:14"}
        }

     

    default_socket_timeout 这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误,我写的是 7 天超时,您可以根据业务去调整,记住一定要配置,网上很多搜的 consumer 脚本都没有配置这个,根本不能投入生产环境使用,这是我亲自实践的结果。
      关于 while true 是否死循环,很明确告诉你是死循环,但是不会一直耗性能的那样执行下去,它会在 reserve 这里阻塞不动,直到有消息产生才会往下走,所以大可放心使用,我的项目代码里面是使用了方法调用方法自身去实现循环的。

    4.Pheanstalk使用方法

    维护方法        

        stats() 查看状态方法        

        listTubes() 目前存在的管道

        listTubesWatched() 目前监听的管道        

        statsTube() 管道的状态        

        useTube() 指定使用的管道        

        statsJob() 查看任务的详细信息        

        peek() 通过任务ID获取任务    

    生产者方法        

        putInTube() 往管道中写入数据        

        put() 配合useTube()使用    

    消费者方法        

        watch() 监听管道,可以同时监听多个管道        

        ignore() 不监听管道        

        reserve() 以阻塞方式监听管道,获取任务        

        reserveFromTube()        

        release() 把任务重新放回管道        

        bury() 把任务预留        

        peekBuried() 把预留任务读取出来        

        kickJob() 把buried状态的任务设置成ready        

        kick() 批量把buried状态的任务设置成ready        

        peekReady() 把准备好的任务读取出来        

        peekDelayed() 把延迟的任务读取出来        

        pauseTube() 给管道设置延迟        

        resumeTube() 取消管道延迟        

        touch() 让任务重新计算ttr时间,给任务续命

  • 相关阅读:
    动画效果(二)
    动画效果(一)
    高级事件(二)
    高级事件(一)
    事件对象(二)
    事件对象(一)
    使用jquery ajax代替iframe
    SQL语句汇总(终篇)—— 表联接与联接查询
    SQL语句汇总(三)——聚合函数、分组、子查询及组合查询
    SQL语句汇总(二)——数据修改、数据查询
  • 原文地址:https://www.cnblogs.com/smilevv/p/13744598.html
Copyright © 2011-2022 走看看