zoukankan      html  css  js  c++  java
  • celery-rabbitmq 安装部署

    一:Python安装

    1.下载python3源码
    wget https://www.python.org/ftp/python/3.5.0/Python-3.5.0.tgz
    2.解压
    tar xf Python-3.5.0.tgz
    3.进入下载目录

    cd Python-3.5.0
    4.编译
    ./configure --prefix=/usr/local/python3
    make&& make install
    5.创建软链接
    ln -s /usr/local/python3/bin/python3 /usr/bin/python3
    ln -s /usr/local/python3/bin/pip3 /usr/bin/pip3
    6.celery安装
    pip install celery
    ln -s /usr/local/python3/bin/celery /usr/bin/celery

    二:rabbitmp安装使用
    1.rabbitmq安装包下载
    wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.0/rabbitmq-server-3.5.0-1.noarch.rpm
    2.erlang依赖下载
    wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
    3.安装
    rpm -ivh xxx.rpm
    4.配置
    1)添加用户并设置密码:rabbitmqctl add_user admin 123456

    2)添加权限(使admin用户对虚拟主机“/” 具有所有权限):rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

    3)修改用户角色(加入administrator用户组):rabbitmqctl set_user_tags admin administrator

    4)在/etc/rabbitmq/rabbitmq.config 添加:
    [{rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["admin"]}]}].

    5.启动服务

    service rabbitmq-server start
    service rabbitmq-server status
    service rabbitmq-server stop
    6.启动监控界面
    rabbitmq-plugins enable rabbitmq_management


    三、监控安装
    1. pip3 install flower
    2.启动flower
    假设在server2上启动flower,flower默认的端口是5555.

    celery flower --broker=amqp://admin:admin@172.16.1.8:5672//

    启动worker命令
    celery -A center worker --loglevel=debug


    配置mq
    RABBITMQ_HOSTS = "172.16.1.8"

    RABBITMQ_PORT = 5672

    RABBITMQ_VHOST = '/'

    RABBITMQ_USER = 'admin'

    RABBITMQ_PWD = 'admin'

    BROKER_URL = 'amqp://%s:%s@%s:%d/%s' % (RABBITMQ_USER, RABBITMQ_PWD, RABBITMQ_HOSTS, RABBITMQ_PORT, RABBITMQ_VHOST)


    添加用户并设置密码:
    rabbitmqctl add_user admin 123456
    添加权限(使admin用户对虚拟主机“/” 具有所有权限):
    rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
    修改用户角色(加入administrator用户组)
    rabbitmqctl set_user_tags admin administrator

    后台启动服务
    celery multi start w1 -A celery_app worker --loglevel=debug

    启动: rabbitmq-server –detached
    关闭:rabbitmqctl stop
    若单机有多个实例,则在rabbitmqctlh后加–n 指定名称


    新建用户:rabbitmqctl add_user xxxpwd
    删除用户: rabbitmqctl delete_user xxx
    查看用户:rabbitmqctl list_users
    改密码: rabbimqctlchange_password {username} {newpassword}
    设置用户角色:rabbitmqctlset_user_tags {username} {tag ...}
    Tag可以为 administrator,monitoring, management


    权限设置:set_permissions [-pvhostpath] {user} {conf} {write} {read}
    Vhostpath
    Vhost路径
    user
    用户名
    Conf
    一个正则表达式match哪些配置资源能够被该用户访问。
    Write
    一个正则表达式match哪些配置资源能够被该用户读。
    Read
    一个正则表达式match哪些配置资源能够被该用户访问。

    celery -A worker worker --loglevel=debug -n workerA.%h -Q for_task_1,for_task_2 gevent -c 1

    celery worker -A celery_app --loglevel=info -P gevent -c 1

    celery -A celery_app worker --loglevel=debug

    优先级配置
    from kombu import Exchange, Queue


    CELERY_QUEUES = (

    Queue('task1', Exchange('task1', type='direct'), routing_key='task1',

    consumer_arguments={'x-priority': 0}),

    Queue('task2', Exchange('task2', type='direct'), routing_key='task2',

    consumer_arguments={'x-priority': 10}),
    )

    from kombu import Queue
    from kombu import Exchange


    task_queues = (
    Queue('priority_low', exchange=Exchange('priority', type='direct'), routing_key='priority_low'),
    Queue('priority_high', exchange=Exchange('priority', type='direct'), routing_key='priority_high'),
    )

    task_routes = ([
    ('tasks.add', {'queue': 'priority_low'}),
    ('tasks.multiply', {'queue': 'priority_high'}),
    ],)

    你可以限制任务的速率,这样每分钟只允许处理 10 个该类型的任务:
    celeryconfig.py:

    CELERY_ANNOTATIONS = {
    'tasks.add': {'rate_limit': '10/m'}
    }
    使用 rabbitmq 缺陷及注意事项:
    RabbitMQ Result Backend
    The RabbitMQ result backend (amqp) is special as it does not actually store the states, but rather sends them as messages. This is an important difference as it means that a result can only be retrieved once; If you have two processes waiting for the same result, one of the processes will never receive the result!

    Even with that limitation, it is an excellent choice if you need to receive state changes in real-time. Using messaging means the client does not have to poll for new states.

    There are several other pitfalls you should be aware of when using the RabbitMQ result backend:

    Every new task creates a new queue on the server, with thousands of tasks the broker may be overloaded with queues and this will affect performance in negative ways. If you’re using RabbitMQ then each queue will be a separate Erlang process, so if you’re planning to keep many results simultaneously you may have to increase the Erlang process limit, and the maximum number of file descriptors your OS allows.
    Old results will be cleaned automatically, based on the CELERY_TASK_RESULT_EXPIRES setting. By default this is set to expire after 1 day: if you have a very busy cluster you should lower this value.
    For a list of options supported by the RabbitMQ result backend, please see AMQP backend settings.

    使用传统数据库达的缺陷及注意事项:
    Database Result Backend
    Keeping state in the database can be convenient for many, especially for web applications with a database already in place, but it also comes with limitations.

    Polling the database for new states is expensive, and so you should increase the polling intervals of operations such as result.get().

    Some databases use a default transaction isolation level that is not suitable for polling tables for changes.

    In MySQL the default transaction isolation level is REPEATABLE-READ, which means the transaction will not see changes by other transactions until the transaction is committed. It is recommended that you change to the READ-COMMITTED isolation level.


    队列优先级未测试完成


    1.资料博客
    https://blog.csdn.net/weixin_40475396/article/details/80439781


    https://github.com/celery/celery/issues/2635

    erlang 版本查询
    erl -V

    erlang 下载地址
    1.http://erlang.org/download/otp_src_R16B03.tar.gz
    2. ./configure --prefix=/usr/local/erlang --enable-hipe --enable-threads --enable-smp-support --enable-kernel-poll --without-javac
    3. make && make install

    ** celery 队列只能实现队列内的优先级,不能实现队列之间的优先级

  • 相关阅读:
    Java 8 对 List<List<String>> 排序
    获取自然月的起始日期和终止日期
    Java 中如何计算两个字符串时间之间的时间差?(单位为分钟)
    Java StringJoiner
    回文数
    Java 中的值传递和引用传递问题
    substring
    集合的四种遍历方式
    Java 中的静态嵌套类和非静态嵌套类
    使用 Java 查找字符串中出现次数最多的字符以及出现的次数?
  • 原文地址:https://www.cnblogs.com/royfans/p/10204626.html
Copyright © 2011-2022 走看看