zoukankan      html  css  js  c++  java
  • php使用Pheanstalk实现消息队列

    1.安装(linux安装beanstalkd, windows暂不支持)

    # wget https://github.com/kr/beanstalkd/archive/v1.10.tar.gz

     # tar xzvf v1.10

     # cd beanstalkd-1.10/

    # make && make install

    # beanstalkd -v beanstalkd 1.10 2    

    启动:
         进入解压的文件夹里面使用beanstalkd -l 127.0.0.1 -p 11300 &
     
    2,php安装pheanstalk依赖

    composer require pda/pheanstalk

    (官网:https://github.com/pda/pheanstalk)

    下面是自己写的封装的一个类里面的方法(类的话自己创建就可以了):

    // 添加任务
        public static function addJob()
        {
            //创建队列消息
    
            $pheanstalk = self::const();
            print_r($pheanstalk->stats());//查看状态
            exit;
            $tubeName = 'email_list';
    
            $jobData = [
                'email' => '123456@163.com',
                'message' => 'Hello World !!',
                'dtime' => date('Y-m-d H:i:s'),
            ];
    
            $pda = $pheanstalk->useTube($tubeName)->put(json_encode($jobData));
            return $pda;
        }
    
        // 引用公共信息(自己写的方法连接Pheanstalk,首先在服务器开启beanstalkd)
        public static function const()
        {
            $pheanstalk = Pheanstalk::create('127.0.0.1', 11300);
            return $pheanstalk;
        }
    消费 job
    
    
    // 消费job
    public static function Consumption()
    {
    $pheanstalk = self::const();
    $tubeName = 'email_list';
    $job = $pheanstalk->watch($tubeName)->ignore('default')->reserve();
    if ($job !== false) {
    $res = $pheanstalk->statsJob($job);
    if ($res['reserves'] > 5) { //判断一个任务请求失败5次后直接删除
    $pheanstalk->delete($job);
    } else {
    $job_data = $job->getData();
    print_r($job_data);
    exit;
    self::subscribe($job_data);
    $pheanstalk->delete($job);
    }
    /* 继续 Watch 下一个 job */
    self::Consumption();
    } else {
    $pheanstalk->log->error('reserve false', 'reserve false');
    }
    }
    // 添加到数据库
        public static function subscribe($job_data)
        {
            print_r($job_data); 
    // 数据格式
    {"time":"1212312412","email":"123456@163.com","message":"Hello World !!","dtime":"2020-09-28 15:12:14"}
        }

     

    default_socket_timeout 这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误,我写的是 7 天超时,您可以根据业务去调整,记住一定要配置,网上很多搜的 consumer 脚本都没有配置这个,根本不能投入生产环境使用,这是我亲自实践的结果。
      关于 while true 是否死循环,很明确告诉你是死循环,但是不会一直耗性能的那样执行下去,它会在 reserve 这里阻塞不动,直到有消息产生才会往下走,所以大可放心使用,我的项目代码里面是使用了方法调用方法自身去实现循环的。

    4.Pheanstalk使用方法

    维护方法        

        stats() 查看状态方法        

        listTubes() 目前存在的管道

        listTubesWatched() 目前监听的管道        

        statsTube() 管道的状态        

        useTube() 指定使用的管道        

        statsJob() 查看任务的详细信息        

        peek() 通过任务ID获取任务    

    生产者方法        

        putInTube() 往管道中写入数据        

        put() 配合useTube()使用    

    消费者方法        

        watch() 监听管道,可以同时监听多个管道        

        ignore() 不监听管道        

        reserve() 以阻塞方式监听管道,获取任务        

        reserveFromTube()        

        release() 把任务重新放回管道        

        bury() 把任务预留        

        peekBuried() 把预留任务读取出来        

        kickJob() 把buried状态的任务设置成ready        

        kick() 批量把buried状态的任务设置成ready        

        peekReady() 把准备好的任务读取出来        

        peekDelayed() 把延迟的任务读取出来        

        pauseTube() 给管道设置延迟        

        resumeTube() 取消管道延迟        

        touch() 让任务重新计算ttr时间,给任务续命

  • 相关阅读:
    Windows平台下的读写锁
    进程的阻塞和挂起的区别
    事件函数SetEvent、PulseEvent与WaitForSingleObject详解
    多线程的那点儿事(之多线程调试)
    多线程同步内功心法——PV操作上(未完待续。。。)
    读者写者问题(有bug 后续更改)
    解决VS2010控制台程序运行结束不显示请按任意键继续
    Method has too many Body parameters openfeign
    Eclipse中Cannot nest src folder解决方法
    restTemplate重定向问题 &cookie问题
  • 原文地址:https://www.cnblogs.com/smilevv/p/13744598.html
Copyright © 2011-2022 走看看