zoukankan      html  css  js  c++  java
  • django入门到精通12 django2 + celery4自动化任务实现网站ssl证书的检测

    django入门到精通12 django2 + celery4自动化任务实现网站ssl证书的检测


    我们在日常运维工作中如果管理的网站较多,经常会发生ssl证书过期而不能及时更新的问题,我们需要对域名证书的使用情况做检测,并且能及时知道什么时候续费证书并进行更新

    这个项目只是个雏形,毕竟学了一段时间的django,用来小试牛刀,打通 前端 和 服务端及 redis,mysql的基本使用

    功能如下:
    a.可以添加、删除、修改网站
    b.后台启用celery 任务对证书的信息进行更新

    后续功能可以加入
    1.如果证书过期实际小于10天发邮件报警
    2.自动读取godaddy,aliyun等域名管理平台的域名信息并写入系统,还可以对二级域名进行批量检查,避免证书更新遗漏等
    3.对网站的可用状态进行检查
    4.对网站所在服务器的端口进行扫描汇总
    ...
    celery原理


    video 项目目录结构

    # tree video

    video
    # 修改了默认的 vedeo 的settings.py 目录名为 config
    ├── config
    │   ├── asgi.py
    │   ├── celery.py
    │   ├── __init__.py
    │   ├── __pycache__
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── manage.py
    ├── __pycache__
    ├── ssl_check
    │   ├── admin.py
    │   ├── apps.py
    │   ├── check_ssl.py
    │   ├── __init__.py
    │   ├── migrations
    │   │   ├── 0001_initial.py
    │   │   ├── 0002_auto_20210222_2142.py
    │   │   ├── __init__.py
    │   │   └── __pycache__
    │   ├── models.py
    │   ├── __pycache__
    │   ├── tasks.py
    │   ├── tests.py
    │   ├── urls.py
    │   └── views.py
    └── templates
        ├── add_domain.html
        ├── index.html
        └── list_domains.html

    1.搭建基本的开发环境

    pip install virtualenv
    pip install virtualenvwrapper-win

    # 创建虚拟环境 python379_django2
    mkvirtualenv python379_django2

    # 安装依赖

    workon python379_django2

    pip install celery==4.4.2
    pip install eventlet==0.25.2
    pip install Django==2.0.4

    pip install pymysql
    pip install requests
    pip install redis==3.4.1

    2.配置基本的数据库app等信息

    video/config/settings.py

    import os
    from pathlib import Path
    
    BASE_DIR = Path(__file__).resolve().parent.parent
    
    
    
    SECRET_KEY = '7qh#b#@'
    
    DEBUG = True
    ALLOWED_HOSTS = ['*']
    
    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'app',
        'ssl_check',
    ]
    
    MIDDLEWARE = [
        'django.middleware.security.SecurityMiddleware',
        'django.contrib.sessions.middleware.SessionMiddleware',
        'django.middleware.common.CommonMiddleware',
        'django.middleware.csrf.CsrfViewMiddleware',
        'django.contrib.auth.middleware.AuthenticationMiddleware',
        'django.contrib.messages.middleware.MessageMiddleware',
        'django.middleware.clickjacking.XFrameOptionsMiddleware',
    ]
    
    ROOT_URLCONF = 'config.urls'
    TEMPLATES = [
        {
            'BACKEND': 'django.template.backends.django.DjangoTemplates',
            'DIRS': [os.path.join(BASE_DIR, 'templates')],
            'APP_DIRS': True,
            'OPTIONS': {
                'context_processors': [
                    'django.template.context_processors.debug',
                    'django.template.context_processors.request',
                    'django.contrib.auth.context_processors.auth',
                    'django.contrib.messages.context_processors.messages',
                ],
            },
        },
    ]
    
    WSGI_APPLICATION = 'config.wsgi.application'
    DATABASES = {
        'default': {
            'ENGINE': 'django.db.backends.mysql',
            'NAME': 'muke_video',
            'USER': 'root',
            'PASSWORD': 'root',
            'HOST': '10.9.4.199',
            'PORT': 3306
        }
    }
    
    AUTH_PASSWORD_VALIDATORS = [
        {
            'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
        },
        {
            'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
        },
        {
            'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
        },
        {
            'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
        },
    ]
    
    
    LANGUAGE_CODE = 'zh-hans'
    
    TIME_ZONE = 'Asia/Shanghai'
    
    USE_I18N = True
    USE_L10N = True
    USE_TZ = True
    STATIC_URL = '/static/'
    STATICFILES_DIRS = (os.path.join(BASE_DIR, 'static'), )
    CELERY_BROKER_URL = 'redis://10.9.4.199:6379/1'
    CELERY_RESULT_BACKEND = 'redis://10.9.4.199:6379/2'
    
    CELERY_RESULT_SERIALIZER = 'json'
    from celery.schedules import crontab
    
    CELERY_BEAT_SCHEDULE = {
        # 周期性任务
        # 'task-one': {
        #     'task': 'ssl_check.tasks.print_test',
        #     'schedule': 5.0, # 每5秒执行一次
        #     # 'args': ()
        # },
        'task-modify-ssl-info': {
            'task': 'ssl_check.tasks.modify_domain_ssl_info_task',
            # 'schedule': crontab(month_of_year='9', day_of_month='9', minute='*/1'),  # 设置9月9日,每一分钟执行一次
            'schedule': crontab(minute='*/10'),  # 每一分钟执行一次
            # 'args': ()
        }
    }

    2.celery异步任务插件的引入


    在settings.py同级目录创建celery.py

    # coding:utf-8
    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    # 设置环境变量
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
    
    # 注册Celery的APP,
    app = Celery('config')
    # 绑定配置文件
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # 自动发现各个app下的tasks.py文件
    app.autodiscover_tasks()

    注意 config 为你当前的django项目settings.py的目录名称

    修改settings.py同级目录的 __init__.py文件

    # coding:utf-8
    from __future__ import absolute_import, unicode_literals
    
    from .celery import app as celery_app
    
    # 导包
    import pymysql
    
    # 初始化
    pymysql.install_as_MySQLdb()
    
    __all__ = ['celery_app']

    # 数据库models

    video/ssl_check/models.py

    from django.db import models
    
    # Create your models here.
    
    
    class Domain(models.Model):
    
        name = models.CharField(max_length=200, blank=False, unique=True)
        status = models.IntegerField(default=1)
        idc_id = models.IntegerField(default=0)
        company_id = models.IntegerField(default=0)
        ssl_start_date = models.CharField(max_length=200, blank=True, default="0000-00-00")
        ssl_expire_date = models.CharField(max_length=200, blank=True, default="0000-00-00")
        remaining_time = models.IntegerField(blank=True, default=0)
    
        def __str__(self):
            return "domain:{}".format(self.name)

    在应用中创建tasks.py文件

    video/ssl_check/tasks.py

    # _*_ coding:utf-8 _*_
    # __author__ == 'jack'
    # __date__ == '2021-02-22 9:24 PM'
    
    import re
    import time
    import subprocess
    from datetime import datetime
    from io import StringIO
    
    from celery.task import task
    from .models import Domain
    
    
    # 自定义要执行的task任务,这个测试任务
    @task
    def print_test():
        print("nict try")
        return "hello"
    
    
    def modify_domain_ssl_info(domain):
        print("domain_name",domain.name)
        print(type(domain))
        f = StringIO()
        comm = f"curl -Ivs https://{domain.name} --connect-timeout 10"
        print("comm:", comm)
        result = subprocess.getstatusoutput(comm)
        f.write(result[1])
    
        m = re.search('start date: (.*?)
    .*?expire date: (.*?)
    .*?common name: (.*?)
    .*?issuer: CN=(.*?)
    ',
                      f.getvalue(), re.S)
        start_date = m.group(1)
        expire_date = m.group(2)
        common_name = m.group(3)
        issuer = m.group(4)
    
        # time 字符串转时间数组
        start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
        start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
        # datetime 字符串转时间数组
        expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
        expire_date_st = datetime.strftime(expire_date, "%Y-%m-%d %H:%M:%S")
    
        # 剩余天数
        remaining = (expire_date - datetime.now()).days
    
        print('域名:', "domain")
        print('通用名:', common_name)
        print('开始时间:', start_date_st)
        print('到期时间:', expire_date_st)
        print(f'剩余时间: {remaining}天')
        print('颁发机构:', issuer)
        print('*' * 30)
        domain.remaining_time = remaining
        domain.ssl_expire_date = expire_date_st
        domain.ssl_start_date = start_date_st
        # domain.update(remaining_time=remaining, ssl_expire_date=expire_date_st, ssl_start_date=start_date_st)
        domain.save()
        f.close()
        time.sleep(0.5)
    
    
    @task
    def modify_domain_ssl_info_task():
        domains = Domain.objects.all()
    
        print("start check domain ssl celery task")
        for domain in domains:
            modify_domain_ssl_info(domain)
        print("end check domain ssl celery task")
        return "modify ssl"

    可以在settings.py里将该任务配置为定时任务(周期任务)

    from celery.schedules import crontab
    
    CELERY_BEAT_SCHEDULE = {
        # 周期性任务
        # 'task-one': {
        #     'task': 'ssl_check.tasks.print_test',
        #     'schedule': 5.0, # 每5秒执行一次
        #     # 'args': ()
        # },
        'task-modify-ssl-info': {
            'task': 'ssl_check.tasks.modify_domain_ssl_info_task',
            # 'schedule': crontab(month_of_year='9', day_of_month='9', minute='*/1'),  # 设置9月9日,每一分钟执行一次
            'schedule': crontab(minute='*/10'),  # 每一分钟执行一次
            # 'args': ()
        }
    }

    同时异步任务也可以通过django的视图进行在线调用

    # 这个是测试任务,可以通过django的web直接访问触发

    class CetestView(View):
    
        def get(self, request, *args, **kwargs):
            res = print_test.delay()
            # 任务逻辑
            return JsonResponse({'status': 'successful', 'task_id': res.task_id})
    
    
    # 可以通过web页面直接触发任务
    class CheckSslView(View):
    
        def get(self, request):
            domains = Domain.objects.all()
    
            for domain in domains:
                self.check_ssl(domain)
    
            return redirect(reverse('list_domain'))
    
        def check_ssl(self, domain):
            print("domain_name",domain.name)
            print(type(domain))
            f = StringIO()
            comm = f"curl -Ivs https://{domain.name} --connect-timeout 10"
            print("comm:", comm)
            result = subprocess.getstatusoutput(comm)
            f.write(result[1])
    
            m = re.search('start date: (.*?)
    .*?expire date: (.*?)
    .*?common name: (.*?)
    .*?issuer: CN=(.*?)
    ',
                          f.getvalue(), re.S)
            start_date = m.group(1)
            expire_date = m.group(2)
            common_name = m.group(3)
            issuer = m.group(4)
    
            # time 字符串转时间数组
            start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
            start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
            # datetime 字符串转时间数组
            expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
            expire_date_st = datetime.strftime(expire_date, "%Y-%m-%d %H:%M:%S")
    
            # 剩余天数
            remaining = (expire_date - datetime.now()).days
    
            print('域名:', "domain")
            print('通用名:', common_name)
            print('开始时间:', start_date_st)
            print('到期时间:', expire_date_st)
            print(f'剩余时间: {remaining}天')
            print('颁发机构:', issuer)
            print('*' * 30)
    
            # 修改域名信息
            domain.remaining_time = remaining
            domain.ssl_expire_date = expire_date_st
            domain.ssl_start_date = start_date_st
            domain.save()
            f.close()
            time.sleep(0.5)

    视图信息 video/ssl_check/views.py

    from django.shortcuts import render,reverse,redirect
    
    import re
    import time
    import subprocess
    from datetime import datetime
    from io import StringIO
    
    
    from django.views import View
    from .models import Domain
    from .tasks import *
    from django.http import JsonResponse
    
    
    '''
    域名的增、删、查询 的视图
    包括可以手动调取更新域名 ssl 证书信息的视图:CheckSslView
    
    '''
    class DomainRegisterView(View):
    
        def get(self, request):
            return render(request, "add_domain.html")
    
    
    class AddDomainView(View):
    
        def post(self, request):
            domain_name = request.POST.get("domain_name", "")
            Domain.objects.create(
                name=domain_name
            )
    
            return redirect(reverse('list_domain'))
    
    
    class ListDomainView(View):
    
        def get(self, request):
            domains = Domain.objects.all()
    
            return render(request, 'list_domains.html', {"domain_list":domains})
    
    
    class DeleteDomainView(View):
    
        def get(self, request):
            domain_id = request.GET.get("domain_id", "")
            print("domain_id={}".format(domain_id))
            Domain.objects.filter(id=domain_id).delete()
    
            return redirect(reverse('list_domain'))
    
    
    class DoChecksslTaskView(View):
    
        def get(self, request, *args, **kwargs):
            # 执行异步任务
            print('start to check domain ssl')
            # modify_domain_ssl_info_task.delay()
            # modify_domain_ssl_info_task.apply_async(args=('check',), queue='work_queue')
            res = modify_domain_ssl_info_task.delay()
            print('end do check domain ssl')
            # 任务逻辑
            return JsonResponse({'status': 'successful', 'task_id': res.task_id})
    
    
    # 这个是测试任务,可以通过django的web直接访问触发
    class CetestView(View):
    
        def get(self, request, *args, **kwargs):
            res = print_test.delay()
            # 任务逻辑
            return JsonResponse({'status': 'successful', 'task_id': res.task_id})
    
    
    # 可以通过web页面直接触发任务
    class CheckSslView(View):
    
        def get(self, request):
            domains = Domain.objects.all()
    
            for domain in domains:
                self.check_ssl(domain)
    
            return redirect(reverse('list_domain'))
    
        def check_ssl(self, domain):
            print("domain_name",domain.name)
            print(type(domain))
            f = StringIO()
            comm = f"curl -Ivs https://{domain.name} --connect-timeout 10"
            print("comm:", comm)
            result = subprocess.getstatusoutput(comm)
            f.write(result[1])
    
            m = re.search('start date: (.*?)
    .*?expire date: (.*?)
    .*?common name: (.*?)
    .*?issuer: CN=(.*?)
    ',
                          f.getvalue(), re.S)
            start_date = m.group(1)
            expire_date = m.group(2)
            common_name = m.group(3)
            issuer = m.group(4)
    
            # time 字符串转时间数组
            start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
            start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
            # datetime 字符串转时间数组
            expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
            expire_date_st = datetime.strftime(expire_date, "%Y-%m-%d %H:%M:%S")
    
            # 剩余天数
            remaining = (expire_date - datetime.now()).days
    
            print('域名:', "domain")
            print('通用名:', common_name)
            print('开始时间:', start_date_st)
            print('到期时间:', expire_date_st)
            print(f'剩余时间: {remaining}天')
            print('颁发机构:', issuer)
            print('*' * 30)
    
            # 修改域名信息
            domain.remaining_time = remaining
            domain.ssl_expire_date = expire_date_st
            domain.ssl_start_date = start_date_st
            domain.save()
            f.close()
            time.sleep(0.5)

    4.模板文件

    # 添加页面
    templates/add_domain.html

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>add</title>
    </head>
    <body>
    
        <form action="/domain/add/" method="post">
            {% csrf_token %}
            域名:<input type="text" name="domain_name" /> <br/>
            <input type="submit" value="提交">
    
        </form>
    
    </body>
    </html>

    # 列出域名信息页面
    templates/list_domain.html

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>域名列表</title>
    </head>
    <body>
    
        <table border="1" style="solid-color: black" cellpadding="0" cellspacing="0" width="30%">
            <tr>
                <th>域名</th>
                <th>状态</th>
                <th>证书启用时间</th>
                <th>证书过期时间</th>
                <th>证书剩余过期时间</th>
                <th>操作</th>
            </tr>
            {% for domain in domain_list %}
            <tr>
                <td>{{ domain.name }}</td>
                <td>{{ domain.status }}</td>
                <td>{{ domain.ssl_expire_date }}</td>
                <td>{{ domain.ssl_start_date }}</td>
                <td>{{ domain.remaining_time }}</td>
                <td> <a href="/domain/delete/?domain_id={{ domain.id }}">删除</a></td>
            </tr>
            {% endfor %}
        </table>
    
    </body>
    </html>

    页面

    这里的delay方法就是异步方式请求,而非django默认的同步执行步骤

    在manage.py的目录下启动celery服务

    windows 中启动方式

    (python37_django2) D:pythondjango_imooc_xiaobaimuke_vedio_testvideo>celery worker -A config -l info -P eventlet
    
     -------------- celery@SZ18052967C01 v4.4.2 (cliffs)
    --- ***** -----
    -- ******* ---- Windows-10-10.0.19041-SP0 2021-02-23 17:03:40
    - *** --- * ---
    - ** ---------- [config]
    - ** ---------- .> app:         config:0x1ca2a3bf128
    - ** ---------- .> transport:   redis://localhost:6379/1
    - ** ---------- .> results:     redis://localhost:6379/2
    - *** --- * --- .> concurrency: 8 (eventlet)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
    
    
    [tasks]
      . ssl_check.tasks.print_test
    
    [2021-02-23 17:03:40,585: INFO/MainProcess] Connected to redis://localhost:6379/1
    [2021-02-23 17:03:40,604: INFO/MainProcess] mingle: searching for neighbors
    [2021-02-23 17:03:41,757: INFO/MainProcess] mingle: all alone
    [2021-02-23 17:03:41,777: INFO/MainProcess] pidbox: Connected to redis://localhost:6379/1.
    [2021-02-23 17:03:41,782: WARNING/MainProcess] c:userswsenvspython37_django2libsite-packagesceleryfixupsdjango.py:203: UserWarning: Using settings.DEBUG leads to a memory
                leak, never use this setting in production environments!
      leak, never use this setting in production environments!''')
    [2021-02-23 17:03:41,782: INFO/MainProcess] celery@SZ18052967C01 ready.
    [2021-02-23 17:03:41,784: INFO/MainProcess] Received task: ssl_check.tasks.print_test[383d9f23-1224-481f-8a3e-364b07dc2dc8]
    [2021-02-23 17:03:41,785: WARNING/MainProcess] nict try
    [2021-02-23 17:03:41,788: INFO/MainProcess] Received task: ssl_check.tasks.print_test[0959e985-d4ee-4439-8dd7-0a0e2377e610]
    [2021-02-23 17:03:41,790: WARNING/MainProcess] nict try
    [2021-02-23 17:03:41,792: INFO/MainProcess] Task ssl_check.tasks.print_test[383d9f23-1224-481f-8a3e-364b07dc2dc8] succeeded in 0.0s: 'hello'
    [2021-02-23 17:03:41,793: INFO/MainProcess] Task ssl_check.tasks.print_test[0959e985-d4ee-4439-8dd7-0a0e2377e610] succeeded in 0.0s: 'hello'
    [2021-02-23 17:03:41,793: INFO/MainProcess] Received task: ssl_check.tasks.print_test[f418314a-a01e-48df-834f-79220526eca2]
    [2021-02-23 17:03:41,794: WARNING/MainProcess] nict try
    [2021-02-23 17:03:41,795: INFO/MainProcess] Task ssl_check.tasks.print_test[f418314a-a01e-48df-834f-79220526eca2] succeeded in 0.0s: 'hello'
    [2021-02-23 17:03:41,795: INFO/MainProcess] Received task: ssl_check.tasks.print_test[a6b95ed9-7a37-49ff-8445-2db250fd402b]
    [2021-02-23 17:03:41,796: WARNING/MainProcess] nict try
    [2021-02-23 17:03:41,796: INFO/MainProcess] Task ssl_check.tasks.print_test[a6b95ed9-7a37-49ff-8445-2db250fd402b] succeeded in 0.0s: 'hello'
    [2021-02-23 17:03:41,797: INFO/MainProcess] Received task: ssl_check.tasks.print_test[0d18e1f3-7af3-4776-903b-97517c4d7d81]
    [2021-02-23 17:03:41,798: WARNING/MainProcess] nict try
    [2021-02-23 17:03:41,799: INFO/MainProcess] Task ssl_check.tasks.print_test[0d18e1f3-7af3-4776-903b-97517c4d7d81] succeeded in 0.0s: 'hello'
    [2021-02-23 17:03:41,799: INFO/MainProcess] Received task: ssl_check.tasks.print_test[1b415bd2-e7da-49cc-810c-57c99f1b80c1]
    [2021-02-23 17:03:41,800: WARNING/MainProcess] nict try
    [2021-02-23 17:03:41,800: INFO/MainProcess] Task ssl_check.tasks.print_test[1b415bd2-e7da-49cc-810c-57c99f1b80c1] succeeded in 0.0s: 'hello'

    linux中调用

    celery -A config worker -l info

    在浏览器中调用异步服务接口

    127.0.0.1:8000/domain/ssl/

    同时也可以在backend中查询任务结果

    注意一点,redis中的key并不是单纯的task_id,而是需要加上前缀celery-task-meta-

    最后,如果需要启动定时任务,就需要在manage.py所在的文件夹内单独启动beat服务

    windows中调用方法

    (python37_django2) D:pythondjango_imooc_xiaobaimuke_vedio_testvideo>celery -A config beat -l info
    celery beat v4.4.2 (cliffs) is starting.
    __    -    ... __   -        _
    LocalTime -> 2021-02-23 17:15:23
    Configuration ->
        . broker -> redis://localhost:6379/1
        . loader -> celery.loaders.app.AppLoader
        . scheduler -> celery.beat.PersistentScheduler
        . db -> celerybeat-schedule
        . logfile -> [stderr]@%INFO
        . maxinterval -> 5.00 minutes (300s)
    [2021-02-23 17:15:23,454: INFO/MainProcess] beat: Starting...
    [2021-02-23 17:15:28,555: INFO/MainProcess] Scheduler: Sending due task task-one (ssl_check.tasks.print_test)

    linux中调用

    celery -A config beat -l info
  • 相关阅读:
    Linux查看某个端口使用情况并kill
    CentOS 7.0关闭默认防火墙启用iptables防火墙
    Centos7配置JAVA_HOME
    Android Http请求方法汇总
    Android开发总结之 --- 定时,读写文件
    二维码,条形码
    SIM卡相关
    Eclipse快捷键
    easyui datagrid 表格不让选中(双层嵌套)
    easyui datagrid 三层嵌套
  • 原文地址:https://www.cnblogs.com/reblue520/p/14444083.html
Copyright © 2011-2022 走看看