zoukankan      html  css  js  c++  java
  • python脚本,调用接口清理镜像多余tag

        问题背景

                某平台镜像仓库,对上传上来的镜像tag,无法设置tag保留策略,如设置tag最多保留数目,导致开发人员流水线构建的镜像push到镜像仓库的tag不停累积,镜像仓库存储空间消耗很快。

               经调研,虽然镜像仓库功能前端缺少相关设置选项,但是有相关restapi接口,包括删除镜像tag的后端接口。

               镜像仓库设计上由上到下分为3层,最上层为registryname,对应docker registry仓库;中间层为project,最下层为repo。一个registry下面有多个project,一个project下面有多个repo,一个repo有多个tag,如下图所示。

     相关rest接口如下:

           为了实现tag保留策略的功能,用Python撰写了脚本,通过requests模块调用接口,传入想保留的最大tag数目,repo的tag数目超过最大tag数目的,安装tag的创建时间进行排序,只保留日期最新的tag,其余tag进行清理,同时支持特定projec或者repo可以跳过tag数目的检查,脚本如下;

    #!/bin/env python
    # -*- coding: utf-8 -*-
    import requests
    import json
    import argparse
    #from collections import defaultdict
    from dateutil.parser import parse
    from time import mktime,sleep
    from sys import exit
    
    
    class repoTagClean:
      def  __init__(self,apiendpoint,token,registryName,maxtags,skipproject,skiprepo):
          self.apiendpoint=apiendpoint
          self.token=token
          self.registryName=registryName
          self.maxtags=maxtags
          self.skipproject=skipproject
          self.skiprepo=skiprepo
          self.headers={"Authorization": "token"+" "+self.token}
    
      def checkInput(self):
          url=self.apiendpoint+"/v1/registries/alauda/"
          try:
            r=requests.get(url=url,headers=self.headers)
            if r.status_code != 200:
              print "the request return code %s,make sure the jakiro apiendpoint and the token is correct" %(r.status_code)
              exit(6)
          except:
            print "some erros,quit "
            exit(7)
    
      def getProjecList(self,skipproject=None):
          url=self.apiendpoint+"/v1/registries/alauda/"+self.registryName+"/projects"
          r=requests.get(url=url,headers=self.headers)
          r.encoding="utf-8"
          ProjecList=[]
          for p in json.loads(r.text):
              ProjecList.append(p["project_name"].encode("utf-8"))
    
          if skipproject!=None:
            for i  in skipproject.split(","):
              ProjecList.remove(i)
          return ProjecList 
             
      def getRepoList(self,projectName,skiprepo=None):
          url=self.apiendpoint+"/v1/registries/alauda/"+self.registryName+"/projects/"+projectName+"/repositories"
          r=requests.get(url=url,headers=self.headers)
          r.encoding="utf-8"
          #repoList=defaultdict(list)
          repoList=[]
          if len(json.loads(r.text))>0:
            for i in json.loads(r.text):
              repoList.append(i["name"].encode("utf-8")) 
    
          if skiprepo!=None:
            for i  in skiprepo.split(","):
               if i in repoList:
                 repoList.remove(i)
          return repoList
    
      def getAllTagDict(self,projectName,repoName):
          url=self.apiendpoint+"/v1/registries/alauda/"+self.registryName+"/projects/"+projectName+"/repositories/"+repoName+"/tags"+"?view_type=detail&page_size=100"
          r=requests.get(url=url,headers=self.headers)
          r.encoding="utf-8"
          tagDict={}
          if json.loads(r.text)["count"]>self.maxtags:
            for i in json.loads(r.text)["results"]:
              tagDict[i["tag_name"].encode("utf-8")]=mktime(parse(i["created_at"].encode("utf-8")).timetuple()) 
          return tagDict
    
      def getDeleteTagList(self,tagDict):
          pendDeleteTag=[]
          allTags=sorted(tagDict.items(),key=lambda x:x[1])
          for i in allTags[:len(allTags)-self.maxtags]:
            pendDeleteTag.append(i[0])
          return pendDeleteTag
         
      def getTagList(self):
         #tagDict=defaultdict(defaultdict)
         pendingDelete=[]
         projectList=self.getProjecList(self.skipproject)
         for p in projectList:
           repoList=self.getRepoList(p,self.skiprepo)  
           if repoList:
             for r in repoList:
               print "begin to check project: [%s], the repo is: [%s]" %(p,r)
               tagDict=self.getAllTagDict(p,r)
               if tagDict:
                 t1=self.getDeleteTagList(tagDict)
                 pendingDelete.append((p,r,t1))
         return pendingDelete         
    
      def deleteTag(self,pendingDelete):
        for i in pendingDelete:
          for j in i[2]:
            url=self.apiendpoint+"/v1/registries/alauda/"+self.registryName+"/projects/"+i[0]+"/repositories/"+i[1]+"/tags/"+j
            print "begin to clean tag:%s/%s/%s" %(i[0],i[1],j)
            d=requests.delete(url=url,headers=self.headers)
            sleep(1)
             
    def main():
      parser = argparse.ArgumentParser(description="Cleanup the extra images tag")
      parser.add_argument("-i","--apiendpoint",dest="apiendpoint",required=True,help="the jakiro apiendpoint,like http://1.1.1.1:32001")
      parser.add_argument("-t","--token",dest="token",required=True,help="the jakiro apiendpoint auth token")
      parser.add_argument("-r","--registryname",dest="registryname",choices=["k8s-tools-registry","k8s-devops-registry"],required=True,help="the registryname")
      parser.add_argument("-m","--maxtags",dest="maxtags",required=False,type=int,help="the maxtags to keep",default=10)
      parser.add_argument("-S","--skipproject",dest="skipproject",type=str,help="the project which to be ignored for check")
      parser.add_argument("-s","--skiprepo",dest="skiprepo",type=str,help="the repo which to be ignored for check")
      args = parser.parse_args() 
    
      print "the maxtags have been set to: %d" %args.maxtags 
      print "begin to scan the registry: %s" %args.registryname
      clearner=repoTagClean(args.apiendpoint,args.token,args.registryname,args.maxtags,args.skipproject,args.skiprepo)
      clearner.checkInput()
      c=clearner.getTagList()
      if c:
        print "scan finished,the following tag pending to be clean"
        print c
        r=raw_input("are you sure to clean the tag,if confirm,please input one of y|Y|yes:")
        if r=="y" or r=="Y" or r=="yes":
          clearner.deleteTag(c)
        else:
          print "wrong input,quit"
      else:
        print "finish check,no extral tag need to be cleaned"
        exit(0)
      
    if __name__=="__main__":
      main()

     执行效果如下:

  • 相关阅读:
    使用psycopg2操作PostgreSQL数据库之二
    .Net3.5中调用gzip压缩遇到的问题
    开发人员真的不值钱啊
    Python DBAPI 2.0规范
    python MySQLdb学习笔记
    python访问PostgreSQL数据库之连接库Psycopg2
    python的类方法和类的静态方法
    Python运算符重载
    windows下postgreSQL服务接收远程客户连接
    MySQLdb访问mysql的中文字符问题解决之道
  • 原文地址:https://www.cnblogs.com/360linux/p/13062059.html
Copyright © 2011-2022 走看看