背景
- 需要收集git仓库信息到数据库供前端展示
- 包括:仓库信息、仓库所有者、成员列表、提交信息、活跃情况等
- 需要定时启动、灵活触发
实现简介
- 使用gitlab v4 restful 接口
- 使用python-gitlab 依赖库(pyapi-gitlab 不支持gitlab 10.0+ 版本,故弃用)
- 使用flask 提供接口,远程灵活触发
- 使用requests 完成数据汇报与接口访问(因频率不高,性能优化相关暂不考虑)
- 使用datetime/dateutil 完成gitlab 的时间信息向mysql datatime转化
- 外层使用docker进行封装与部署
api使用
class GitProjectEntry(CCInterfaceHandler):
def __init__(self, _project):
super(GitProjectEntry, self).__init__()
self.logger = logger
self.project = _project
self.attribute = self.project.attributes
self.logger.debug("parse project name : {PROJECT_NAME} with id : {PROJECT_ID}.".format(
PROJECT_NAME = self._self_attribute_get("name"),
PROJECT_ID = self.project.get_id()
))
# id, name, description, owner, web_url, last_activity_at, created_at, num_commits, member_list
self.KEY_ID = "id"
self.KEY_NAME = "name"
self.KEY_DESCRIPTION = "description"
self.KEY_OWNER = "owner"
self.KEY_OWNER_WEB_URL = "owner_web_url"
self.KEY_LAST_ACTIVITY_AT = "last_activity_at"
self.KEY_CREATED_AT = "created_at"
self.KEY_ACTIVE_STATUS = "status"
self.KEY_NAMESPACE_PATH = "namespace_path"
self.KEY_NAMESPACE_WEB_URL = "namespace_web_url"
self.KEY_MEMBER_USERNAME_LIST = "member_username_list"
self.KEY_WEB_URL = "web_url"
self.KEY_HTTP_URL_TO_REPO = "http_url_to_repo"
self.logger.debug(str(self._self_init()))
def _date_formatter(self, _git_date_stamp_str):
try:
return date_parser.parse(_git_date_stamp_str).strftime(database_datefmt)
except Exception,e:
self.logger.warn("date stamp convert failed. with {INNER_GIT_STAMP}".format(INNER_GIT_STAMP = _git_date_stamp_str))
def _git_status_active_judge(self, _git_last_active_at_dt):
# status 1 活跃,0不活跃
KEY_STATUS_ACTIVE = 1
KEY_STATUS_INACTIVE = 0
today=datetime.datetime.today()
month_time_delta=datetime.timedelta(days = 30, hours=1)
activity_threshold = today - month_time_delta
activity_status = KEY_STATUS_ACTIVE if(
date_parser.parse(_git_last_active_at_dt) > activity_threshold) else KEY_STATUS_INACTIVE
return activity_status
def _self_init(self):
self.member_list = []
self.guess_owner = None
self.last_activity_at = self._date_formatter(self._self_attribute_get("last_activity_at"))
self.data = {
self.KEY_ID : self.project.get_id(), # self.project.id
self.KEY_NAME : self._self_attribute_get("name"), # self.project.name
self.KEY_DESCRIPTION : self._self_attribute_get("description"),
self.KEY_OWNER : self._self_attribute_get("owner", {}).get("username"),
self.KEY_OWNER_WEB_URL : self._self_attribute_get("owner", {}).get("web_url"),
self.KEY_LAST_ACTIVITY_AT : self.last_activity_at,
self.KEY_CREATED_AT : self._date_formatter(self._self_attribute_get("created_at")),
self.KEY_ACTIVE_STATUS : self._git_status_active_judge(self.last_activity_at),
self.KEY_NAMESPACE_PATH : self._self_attribute_get("namespace", {}).get("path"),
self.KEY_NAMESPACE_WEB_URL : self._self_attribute_get("namespace", {}).get("web_url"),
self.KEY_MEMBER_USERNAME_LIST : self._self_member_username_list(),
self.KEY_WEB_URL : self._self_attribute_get("web_url"),
self.KEY_HTTP_URL_TO_REPO : self._self_attribute_get("description")
}
if(self.data.get(self.KEY_OWNER) == None):
self.data[self.KEY_OWNER] = self._self_member_top_access_user_name()
if(self.data.get(self.KEY_OWNER_WEB_URL) == None):
self.data[self.KEY_OWNER_WEB_URL] = self._self_member_top_access_user_name()
self.git_project_member_report(self.project.get_id(),self.member_list)
if('200' != str(self.git_project_report(self.data))):
self.logger.warn("reported git_project failed! with project_id : {PR_ID} and project_name : {PR_NAME} .
"
"while data is {PR_DATA}".format(PR_ID = str(self.data.get(self.KEY_ID, "NULL_ID")),
PR_NAME = str(self.data.get(self.KEY_NAME, "NULL_NAME")),
PR_DATA = str(self.data)
)
)
else:
self.logger.info("reported git_project succeed! with project_id : {PR_ID} and project_name : {PR_NAME} .
"
"while data is {PR_DATA}".format(PR_ID = str(self.data.get(self.KEY_ID, "NULL_ID")),
PR_NAME = str(self.data.get(self.KEY_NAME, "NULL_NAME")),
PR_DATA = str(self.data)
)
)
return self.data
def _self_member_list(self):
if(self.member_list != None and len(self.member_list) > 0):
return self.member_list
self.member_list = self.project.members.list(all=True)
if(len(self.member_list)<1):
self.member_list = self.project.members.all(all=True)
return self.member_list
def _self_member_top_access_user(self):
if(not self.guess_owner):
self.member_list = self._self_member_list()
self.sorted_member_list = sorted(self.member_list,key=lambda x : x.attributes.get("access_level",0),reverse=True)
self.guess_owner = self.sorted_member_list[0] if(len(self.sorted_member_list) > 0) else None
return self.guess_owner
def _self_member_top_access_user_weburl(self):
self.guess_owner = self._self_member_top_access_user()
self.top_access_username = self.guess_owner.attributes.get("web_url", None) if(self.guess_owner) else None
return self.top_access_username
def _self_member_top_access_user_name(self):
self.guess_owner = self._self_member_top_access_user()
self.top_access_username = self.guess_owner.attributes.get("username", None) if(self.guess_owner) else None
return self.top_access_username
def _self_member_username_list(self):
'''
gitlab document:
Examples
List the project members:
members = project.members.list()
List the project members recursively (including inherited members through ancestor groups):
members = project.members.all(all=True)
Search project members matching a query string:
members = project.members.list(query='bar')
Get a single project member:
member = project.members.get(user_id)
Add a project member:
member = project.members.create({'user_id': user.id, 'access_level':
gitlab.DEVELOPER_ACCESS})
Modify a project member (change the access level):
member.access_level = gitlab.MAINTAINER_ACCESS
member.save()
Remove a member from the project team:
project.members.delete(user.id)
# or
member.delete()
Share/unshare the project with a group:
project.share(group.id, gitlab.DEVELOPER_ACCESS)
project.unshare(group.id)
:return:
'''
self.member_username_list = [
member.attributes.get("username") for member in self._self_member_list()
]
return self.member_username_list
def _self_attribute_get(self, _key, _default_value = ""):
return self.attribute.get(_key, _default_value)
def to_json(self):
self.json_string = json.dumps(self.data)
return self.json_string
def _self_member_username_list(self):
'''
gitlab document:
Examples
List the project members:
members = project.members.list()
List the project members recursively (including inherited members through ancestor groups):
members = project.members.all(all=True)
Search project members matching a query string:
members = project.members.list(query='bar')
Get a single project member:
member = project.members.get(user_id)
Add a project member:
member = project.members.create({'user_id': user.id, 'access_level':
gitlab.DEVELOPER_ACCESS})
Modify a project member (change the access level):
member.access_level = gitlab.MAINTAINER_ACCESS
member.save()
Remove a member from the project team:
project.members.delete(user.id)
# or
member.delete()
Share/unshare the project with a group:
project.share(group.id, gitlab.DEVELOPER_ACCESS)
project.unshare(group.id)
:return:
'''
self.member_username_list = [
member.attributes.get("username") for member in self._self_member_list()
]
return self.member_username_list
In [260]: user.attributes
Out[260]:
{u'access_level': 50,
u'avatar_url': None,
u'expires_at': None,
u'id': 224,
u'name': u'mosaic',
'project_id': 18915,
u'state': u'active',
u'username': u'mosaic',
u'web_url': u'http://git.intra.mosaic.com/mosaic'}
完整代码
# !/usr/bin/python
# -*- coding: UTF-8 -*-
__author__ = 'enzhao'
# Created by enzhao on 2020/5/25.
# -*- coding: utf-8 -*-
import gitlab
from gitlab.v4 import objects as git_const
import json
import logging
from logging.handlers import RotatingFileHandler
# fh = RotatingFileHandler(filename, maxBytes=1024, backupCount=5)
import datetime
from dateutil import parser as date_parser
import time
from functools import wraps
from flask import Flask
import requests
from gitlab.v4.objects import Project
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
app = Flask(__name__)
RESPONSE_SUCCESS_CODE = 200
RESPONSE_FAILED_CODE = 500
RESPONSE_SUCCESS_MESSAGE = "success"
RESPONSE_RESULT_KEY = "result"
RESPONSE_CODE_KEY = "code"
RESPONSE_MESSAGE_KEY = "message"
RESPONSE_PARAMS_KEY = "params"
RESPONSE_DICT_TEMPLATE = {
RESPONSE_RESULT_KEY : {},
RESPONSE_CODE_KEY : RESPONSE_SUCCESS_CODE,
RESPONSE_MESSAGE_KEY : RESPONSE_SUCCESS_MESSAGE,
RESPONSE_PARAMS_KEY : {}
}
logger_file_suffix = time.strftime('%Y-%m-%d', time.localtime( float( time.time() )))
logger_file_name = "./waic-gitlab-info-retrive.log"
logger_file_name = "".join(["./develop-python-gitlab/host-py-source/waic-gitlab-info-retrive-", logger_file_suffix, ".log"])
# logger_file_name = "./waic-gitlab-info-retrive.log"
logger_time_fmt = '%Y/%m/%d %H:%M:%S %A'
datefmt='%Y-%m-%d %H:%M:%S@%A'
database_datefmt = "%Y-%m-%d %H:%M:%S"
git_datefmt = "%Y-%m-%d %H:%M:%S"
CC_HOST = "http://10.73.13.163"
# CC_HOST = "http://controlcenter.ds.sina.com.cn"
def get_local_ip():
local_ip = ''
try:
import socket
socket_objs = [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]
ip_from_ip_port = [(s.connect(('8.8.8.8', 53)), s.getsockname()[0], s.close()) for s in socket_objs][0][1]
ip_from_host_name = [ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith('127.')][:1]
local_ip = [l for l in (ip_from_ip_port, ip_from_host_name) if l][0]
except (Exception),e:
print('get_local_ip found exception : %s' % e)
return local_ip if('' != local_ip and None != local_ip) else socket.gethostbyname(socket.gethostname())
def result_decorator(_func_obj):
@wraps(_func_obj)
def wrapTheFunction(*args, **kwargs):
receive_time = time.time()
# RESPONSE_DICT = RESPONSE_DICT_TEMPLATE.copy()
RESPONSE_DICT = {
RESPONSE_RESULT_KEY : {},
RESPONSE_CODE_KEY : RESPONSE_SUCCESS_CODE,
RESPONSE_MESSAGE_KEY : RESPONSE_SUCCESS_MESSAGE,
RESPONSE_PARAMS_KEY : {}
}
RESPONSE_DICT[RESPONSE_RESULT_KEY]["server_ip"] = get_local_ip()
RESPONSE_DICT[RESPONSE_RESULT_KEY]["time"] = time.strftime(datefmt,time.localtime())
result_dict = {}
try:
result_dict = _func_obj(*args, **kwargs)
except Exception,e:
finish_time = time.time()
RESPONSE_DICT[RESPONSE_RESULT_KEY]["duration"] = str(finish_time - receive_time) + "ms"
RESPONSE_DICT[RESPONSE_PARAMS_KEY].update(**kwargs)
RESPONSE_DICT[RESPONSE_CODE_KEY] = RESPONSE_FAILED_CODE
RESPONSE_DICT[RESPONSE_MESSAGE_KEY] = e.message
return json.dumps(RESPONSE_DICT)
RESPONSE_DICT[RESPONSE_RESULT_KEY].update(result_dict)
RESPONSE_DICT[RESPONSE_PARAMS_KEY].update(**kwargs)
finish_time = time.time()
RESPONSE_DICT[RESPONSE_RESULT_KEY]["duration"] = str(finish_time - receive_time) + "ms"
RESPONSE_DICT[RESPONSE_CODE_KEY] = RESPONSE_SUCCESS_CODE
RESPONSE_DICT[RESPONSE_MESSAGE_KEY] = RESPONSE_SUCCESS_MESSAGE
return json.dumps(RESPONSE_DICT)
return wrapTheFunction
def wei_logger(_cls_name = "logger_helper",
_log_level = logging.INFO):# !/usr/bin/python
_format_str = '[%(levelname)s] : %(asctime)s %(filename)s[line:%(lineno)d] %(message)s'
logging.basicConfig(level=logging.DEBUG,
format=_format_str,
filename=logger_file_name,
datefmt=logger_time_fmt)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console_formatter = logging.Formatter(_format_str)
console.setFormatter(console_formatter)
rotating_file_handler = RotatingFileHandler(logger_file_name, maxBytes=1024 * 1024 * 50, backupCount=5)
rotating_file_handler.setLevel(logging.DEBUG)
rotating_file_handler.setFormatter(console_formatter)
logger = logging.getLogger(_cls_name)
logger.addHandler(console)
logger.addHandler(rotating_file_handler)
logger.debug('%s logger init success!!' % _cls_name)
return logger
logger = wei_logger("GitManageEntry")
def get_http_session(pool_connections=1, pool_maxsize=10, max_retries=3):
'''
http连接池
pool_connections 要缓存的 urllib3 连接池的数量。
pool_maxsize 要保存在池中的最大连接数。
max_retries 每个连接的最大重试次数
'''
session =requests.session()
# 创建适配器
adapter = requests.adapters.HTTPAdapter(pool_connections=pool_connections,
pool_maxsize=pool_maxsize, max_retries=max_retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
session = get_http_session()
class CCInterfaceHandler(object):
def __init__(self):
self.cc_host = CC_HOST
self.url_sep = "/"
self.git_project_query_url = "waic/v1/data/git_project"
self.git_project_add_url = "waic/v1/data/git_project/add"
self.git_project_update_url = "waic/v1/data/git_project/update"
self.git_project_member_query_url = "waic/v1/data/git_project_member"
self.git_project_member_add_url = "waic/v1/data/git_project_member/add"
self.git_project_member_update_url = "waic/v1/data/git_project_member/update"
self.interface_git_project_query = self.url_sep.join([self.cc_host, self.git_project_query_url ])
self.interface_git_project_add = self.url_sep.join([self.cc_host, self.git_project_add_url ])
self.interface_git_project_update = self.url_sep.join([self.cc_host, self.git_project_update_url ])
self.interface_git_project_member_query = self.url_sep.join([self.cc_host, self.git_project_member_query_url])
self.interface_git_project_member_add = self.url_sep.join([self.cc_host, self.git_project_member_add_url ])
self.interface_git_project_member_update = self.url_sep.join([self.cc_host, self.git_project_member_update_url ])
self.response_format_param = {"responseFormat" : "json"}
self.role_mapping = {
git_const.ACCESS_GUEST : "guest",
git_const.ACCESS_REPORTER : "reporter",
git_const.ACCESS_DEVELOPER : "developer",
git_const.ACCESS_MASTER : "master",
git_const.ACCESS_OWNER : "owner",
}
def get_value(self, _key_path = "cluster.json.weibox_path", _dict = {}):
if(_dict == None):
return ""
tmp_obj = _dict.get(_key_path)
if(tmp_obj == None):
point_idx = _key_path.find(".")
if(point_idx < 0):
return ""
tail_dict = _dict.get(_key_path[0:point_idx])
if(isinstance(tail_dict,list)):
tail_dict = tail_dict[0] if len(tail_dict)>0 else None
tmp_obj = self.get_value(_key_path[point_idx+1:], tail_dict)
return tmp_obj
def _git_project_report_interface(self, _id):
self.cc_project_count = 0
try:
self.id_param = self.response_format_param.copy()
self.id_param.update({"id" : _id})
r = session.get(self.interface_git_project_query, params = self.id_param)
self.cc_project_count = self.get_value("result.totalCount",r.json())
except Exception,e:
logger.warn(str(e) + e.message)
return self.interface_git_project_update if(self.cc_project_count > 0) else self.interface_git_project_add
def git_project_report(self, _project_data = {}):
self.id = _project_data.get("id")
try:
self.interface_git_project_report = self._git_project_report_interface(_id=self.id)
self.report_param = self.response_format_param.copy()
self.report_param.update(_project_data)
r = session.post(self.interface_git_project_report, self.report_param)
return self.get_value("code",r.json())
except Exception,e:
logger.warn(str(e) + e.message)
def _git_project_member_query(self, _id):
try:
self.id_param = self.response_format_param.copy()
self.id_param.update({"id" : _id})
r = session.get(self.interface_git_project_member_query, params = self.id_param)
self.cc_project_member = self.get_value("result.data",r.json())
self.cc_project_member_username_set = set([
user_dict.get("user_id", "") for user_dict in self.cc_project_member
])
return self.cc_project_member_username_set
except Exception,e:
logger.warn(str(e) + e.message)
def _git_project_member_report_interface(self, _project_id, _user_id):
self.cc_project_member_count = 0
try:
self.id_param = self.response_format_param.copy()
self.id_param.update({"project_id" : _project_id, "user_id" : _user_id})
r = session.get(self.interface_git_project_member_query, params = self.id_param)
self.cc_project_count = self.get_value("result.totalCount",r.json())
except Exception,e:
logger.warn(str(e) + e.message)
return self.interface_git_project_member_update if(self.cc_project_count > 0) else self.interface_git_project_member_add
def git_project_member_report(self, _project_id, _project_member_data = []):
self.id = _project_id
try:
for project_user in _project_member_data:
cc_project_user_role = self.role_mapping.get(project_user.attributes.get("access_level",git_const.ACCESS_DEVELOPER))
cc_project_user_id = project_user.attributes.get("username")
cc_project_member_id = "".join([str(_project_id), "0", str(project_user.attributes.get("id"))])
self.interface_git_project_member_report = self._git_project_member_report_interface(
_project_id=_project_id, _user_id=cc_project_user_id)
self.id_param = self.response_format_param.copy()
self.id_param.update({
"id" : cc_project_member_id,
"project_id" : _project_id,
"user_id" : cc_project_user_id,
"role" : cc_project_user_role
})
r = session.post(self.interface_git_project_member_report, self.id_param)
except Exception,e:
logger.warn(str(e) + e.message)
class GitProjectEntry(CCInterfaceHandler):
def __init__(self, _project):
super(GitProjectEntry, self).__init__()
self.logger = logger
self.project = _project
self.attribute = self.project.attributes
self.logger.debug("parse project name : {PROJECT_NAME} with id : {PROJECT_ID}.".format(
PROJECT_NAME = self._self_attribute_get("name"),
PROJECT_ID = self.project.get_id()
))
# id, name, description, owner, web_url, last_activity_at, created_at, num_commits, member_list
self.KEY_ID = "id"
self.KEY_NAME = "name"
self.KEY_DESCRIPTION = "description"
self.KEY_OWNER = "owner"
self.KEY_OWNER_WEB_URL = "owner_web_url"
self.KEY_LAST_ACTIVITY_AT = "last_activity_at"
self.KEY_CREATED_AT = "created_at"
self.KEY_ACTIVE_STATUS = "status"
self.KEY_NAMESPACE_PATH = "namespace_path"
self.KEY_NAMESPACE_WEB_URL = "namespace_web_url"
self.KEY_MEMBER_USERNAME_LIST = "member_username_list"
self.KEY_WEB_URL = "web_url"
self.KEY_HTTP_URL_TO_REPO = "http_url_to_repo"
self.logger.debug(str(self._self_init()))
def _date_formatter(self, _git_date_stamp_str):
try:
return date_parser.parse(_git_date_stamp_str).strftime(database_datefmt)
except Exception,e:
self.logger.warn("date stamp convert failed. with {INNER_GIT_STAMP}".format(INNER_GIT_STAMP = _git_date_stamp_str))
def _git_status_active_judge(self, _git_last_active_at_dt):
# status 1 活跃,0不活跃
KEY_STATUS_ACTIVE = 1
KEY_STATUS_INACTIVE = 0
today=datetime.datetime.today()
month_time_delta=datetime.timedelta(days = 30, hours=1)
activity_threshold = today - month_time_delta
activity_status = KEY_STATUS_ACTIVE if(
date_parser.parse(_git_last_active_at_dt) > activity_threshold) else KEY_STATUS_INACTIVE
return activity_status
def _self_init(self):
self.member_list = []
self.guess_owner = None
self.last_activity_at = self._date_formatter(self._self_attribute_get("last_activity_at"))
self.data = {
self.KEY_ID : self.project.get_id(), # self.project.id
self.KEY_NAME : self._self_attribute_get("name"), # self.project.name
self.KEY_DESCRIPTION : self._self_attribute_get("description"),
self.KEY_OWNER : self._self_attribute_get("owner", {}).get("username"),
self.KEY_OWNER_WEB_URL : self._self_attribute_get("owner", {}).get("web_url"),
self.KEY_LAST_ACTIVITY_AT : self.last_activity_at,
self.KEY_CREATED_AT : self._date_formatter(self._self_attribute_get("created_at")),
self.KEY_ACTIVE_STATUS : self._git_status_active_judge(self.last_activity_at),
self.KEY_NAMESPACE_PATH : self._self_attribute_get("namespace", {}).get("path"),
self.KEY_NAMESPACE_WEB_URL : self._self_attribute_get("namespace", {}).get("web_url"),
self.KEY_MEMBER_USERNAME_LIST : self._self_member_username_list(),
self.KEY_WEB_URL : self._self_attribute_get("web_url"),
self.KEY_HTTP_URL_TO_REPO : self._self_attribute_get("description")
}
if(self.data.get(self.KEY_OWNER) == None):
self.data[self.KEY_OWNER] = self._self_member_top_access_user_name()
if(self.data.get(self.KEY_OWNER_WEB_URL) == None):
self.data[self.KEY_OWNER_WEB_URL] = self._self_member_top_access_user_name()
self.git_project_member_report(self.project.get_id(),self.member_list)
if('200' != str(self.git_project_report(self.data))):
self.logger.warn("reported git_project failed! with project_id : {PR_ID} and project_name : {PR_NAME} .
"
"while data is {PR_DATA}".format(PR_ID = str(self.data.get(self.KEY_ID, "NULL_ID")),
PR_NAME = str(self.data.get(self.KEY_NAME, "NULL_NAME")),
PR_DATA = str(self.data)
)
)
else:
self.logger.info("reported git_project succeed! with project_id : {PR_ID} and project_name : {PR_NAME} .
"
"while data is {PR_DATA}".format(PR_ID = str(self.data.get(self.KEY_ID, "NULL_ID")),
PR_NAME = str(self.data.get(self.KEY_NAME, "NULL_NAME")),
PR_DATA = str(self.data)
)
)
return self.data
def _self_member_list(self):
if(self.member_list != None and len(self.member_list) > 0):
return self.member_list
self.member_list = self.project.members.list(all=True)
if(len(self.member_list)<1):
self.member_list = self.project.members.all(all=True)
return self.member_list
def _self_member_top_access_user(self):
if(not self.guess_owner):
self.member_list = self._self_member_list()
self.sorted_member_list = sorted(self.member_list,key=lambda x : x.attributes.get("access_level",0),reverse=True)
self.guess_owner = self.sorted_member_list[0] if(len(self.sorted_member_list) > 0) else None
return self.guess_owner
def _self_member_top_access_user_weburl(self):
self.guess_owner = self._self_member_top_access_user()
self.top_access_username = self.guess_owner.attributes.get("web_url", None) if(self.guess_owner) else None
return self.top_access_username
def _self_member_top_access_user_name(self):
self.guess_owner = self._self_member_top_access_user()
self.top_access_username = self.guess_owner.attributes.get("username", None) if(self.guess_owner) else None
return self.top_access_username
def _self_member_username_list(self):
'''
gitlab document:
Examples
List the project members:
members = project.members.list()
List the project members recursively (including inherited members through ancestor groups):
members = project.members.all(all=True)
Search project members matching a query string:
members = project.members.list(query='bar')
Get a single project member:
member = project.members.get(user_id)
Add a project member:
member = project.members.create({'user_id': user.id, 'access_level':
gitlab.DEVELOPER_ACCESS})
Modify a project member (change the access level):
member.access_level = gitlab.MAINTAINER_ACCESS
member.save()
Remove a member from the project team:
project.members.delete(user.id)
# or
member.delete()
Share/unshare the project with a group:
project.share(group.id, gitlab.DEVELOPER_ACCESS)
project.unshare(group.id)
:return:
'''
self.member_username_list = [
member.attributes.get("username") for member in self._self_member_list()
]
return self.member_username_list
def _self_attribute_get(self, _key, _default_value = ""):
return self.attribute.get(_key, _default_value)
def to_json(self):
self.json_string = json.dumps(self.data)
return self.json_string
class GitManageEntry(object):
def __init__(self, _private_token, _fetch_all = False, _exclude_project = True, _visibility = git_const.VISIBILITY_PRIVATE):
'''
gitlab document:
The API provides several filtering parameters for the listing methods:
archived: if True only archived projects will be returned
visibility: returns only projects with the specified visibility (can be public, internal or private)
search: returns project matching the given pattern
Results can also be sorted using the following parameters:
order_by: sort using the given argument. Valid values are id, name, path, created_at, updated_at and last_activity_at. The default is to sort by created_at
sort: sort order (asc or desc)
# List all projects (default 20)
projects = gl.projects.list(all=True)
# Archived projects
projects = gl.projects.list(archived=1)
# Limit to projects with a defined visibility
projects = gl.projects.list(visibility='public')
# List owned projects
projects = gl.projects.list(owned=True)
# List starred projects
projects = gl.projects.list(starred=True)
# Search projects
projects = gl.projects.list(search='keyword')
:param _private_token:
:param _fetch_all:
:param _exclude_project:
:param _visibility:
'''
self.url = "http://git.intra.weibo.com/"
self.private_token = _private_token
self.fetch_all = _fetch_all
self.exclude_project = _exclude_project
self.logger = logger
self.visibility = _visibility
self.git = gitlab.Gitlab(self.url, self.private_token)
self.git.auth()
self.username = self.git.user.username
self.user_id = self.git.user.id
self.logger_info = "parse private token for username : {USER_NAME} with id : {USER_ID}.".format(
USER_NAME = self.username,
USER_ID = self.user_id
)
print (self.logger_info)
self.logger.info(self.logger_info)
self.user_last_sign_in_at = self.git.user.last_sign_in_at
self.user_last_activity_on = self.git.user.last_activity_on
self.logger.info("start retrieve git projects")
self.projects = self.git.projects.list(all=True, visibility = self.visibility)
if(self.fetch_all) else
self.git.projects.list(visibility = self.visibility)
self.logger.info("retrieve git projects done!")
self.gpe = [GitProjectEntry(_project=project) for project in self.projects]
self.gpe_data = [_gpe.data for _gpe in self.gpe]
self.data = {
"url" : self.url ,
"private_token" : self.private_token ,
"username" : self.username ,
"user_id" : self.user_id ,
"user_last_sign_in_at" : self.user_last_sign_in_at ,
"user_last_activity_on" : self.user_last_activity_on,
"count_of_projects" : len(self.gpe)
}
if(not self.exclude_project):
self.data.update({"projects" : self.gpe_data})
self.logger.debug(str(self.data))
def to_json(self):
self.json_string = json.dumps(self.data)
return self.json_string
@app.route('/waic/git-retrieve/user-auth/<private_token>',methods = ['POST','GET'])
@result_decorator
def git_user_auth(private_token):
gme = GitManageEntry(_private_token=private_token)
return gme.data
# return gme.to_json()
@app.route('/waic/git-retrieve/preview/<private_token>',methods = ['POST','GET'])
@result_decorator
def git_retrieve_preview(private_token):
gme = GitManageEntry(_private_token=private_token, _exclude_project=False)
return gme.data
# return gme.to_json()
@app.route('/waic/git-retrieve/all/<private_token>',methods = ['POST','GET'])
@result_decorator
def git_retrieve_all_projects(private_token):
gme = GitManageEntry(_private_token=private_token, _fetch_all=True, _exclude_project=False)
return gme.data
# return gme.to_json()
@app.route('/waic/git-retrieve/all-projects/<private_token>',methods = ['POST','GET'])
@result_decorator
def git_retrieve_all_internal_projects(private_token):
gme = GitManageEntry(_private_token=private_token, _visibility=git_const.VISIBILITY_INTERNAL, _fetch_all=True, _exclude_project=False)
return gme.data
# return gme.to_json()
if __name__ == '__main__':
app.run(host=get_local_ip())
# job_state("12345678")
token = "anNN8srBMewKzvokyxaL"
# token = "yk-1v6J_txopP7x7xtif"
# gme = GitManageEntry(_private_token=token, _fetch_all=True, _exclude_project=False)
gme = GitManageEntry(_private_token=token, _exclude_project=False)
code_stage = '''
# -*- coding: utf-8 -*-
import gitlab
from gitlab.v4.objects import Project
url = "http://git.intra.weibo.com/"
token = "fv29Z9seyCs7xRhYSUt8"
token = "anNN8srBMewKzvokyxaL"
gl = gitlab.Gitlab(url, token, per_page=50)
projects = gl.projects.list(all=True)
class GitManageEntry(Project):
def toJson(self):
"url" : self.url ,
"private_token" : self.private_token ,
"git" : self.git ,
"username" : self.username ,
"user_id" : self.user_id ,
"user_last_sign_in_at" : self.user_last_sign_in_at ,
"user_last_activity_on" : self.user_last_activity_on ,
# 登录
gl = gitlab.Gitlab(url, token)
# ---------------------------------------------------------------- #
# 获取第一页project
projects = gl.projects.list()
# 获取所有的project
projects = gl.projects.list(all=True)
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 获取所有project的name,id
for p in gl.projects.list(all=True, as_list=False):
print(p.name, p.id)
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 获取第一页project的name,id
for p in gl.projects.list(page=1):
print(p.name, p.id)
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 通过指定id 获取 project 对象
project = gl.projects.get(501)
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 查找项目
projects = gl.projects.list(search='keyword')
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 创建一个项目
project = gl.projects.create({'name':'project1'})
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 获取公开的项目
projects = gl.projects.list(visibility='public') # public, internal or private
# ---------------------------------------------------------------- #
# 获取 project 对象是以下操作的基础
# ---------------------------------------------------------------- #
# 通过指定project对象获取该项目的所有分支
branches = project.branches.list()
print(branches)
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 获取指定分支的属性
branch = project.branches.get('master')
print(branch)
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 创建分支
branch = project.branches.create({'branch_name': 'feature1',
'ref': 'master'})
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 删除分支
project.branches.delete('feature1')
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 分支保护/取消保护
branch.protect()
branch.unprotect()
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 获取指定项目的所有tags
tags = project.tags.list()
# 获取某个指定tag 的信息
tags = project.tags.list('1.0')
# 创建一个tag
tag = project.tags.create({'tag_name':'1.0', 'ref':'master'})
# 设置tags 说明:
tag.set_release_description('awesome v1.0 release')
# 删除tags
project.tags.delete('1.0')
# or
tag.delete()
# ---------------------------------------------------------------- #
# 获取所有commit info
commits = project.commits.list()
for c in commits:
print(c.author_name, c.message, c.title)
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 获取指定commit的info
commit = project.commits.get('e3d5a71b')
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 获取指定项目的所有merge request
mrs = project.mergerequests.list()
print(mrs)
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 获取 指定mr info
mr = project.mergesession.get(mr_id)
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 创建一个merge request
mr = project.mergerequests.create({'source_branch':'cool_feature',
'target_branch':'master',
'title':'merge cool feature', })
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 更新一个merge request 的描述
mr.description = 'New description'
mr.save()
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 开关一个merge request (close or reopen):
mr.state_event = 'close' # or 'reopen'
mr.save()
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# Delete a MR:
project.mergerequests.delete(mr_id)
# or
mr.delete()
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# Accept a MR:
mr.merge()
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 指定条件过滤 所有的merge request
# state: state of the MR. It can be one of all, merged, opened or closed
# order_by: sort by created_at or updated_at
# sort: sort order (asc or desc)
mrs = project.mergerequests.list(state='merged', sort='asc') # all, merged, opened or closed
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# 创建一个commit
data = {
'branch_name': 'master', # v3
'commit_message': 'blah blah blah',
'actions': [
{
'action': 'create',
'file_path': 'blah',
'content': 'blah'
}
]
}
commit = project.commits.create(data)
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# Compare two branches, tags or commits:
result = project.repository_compare('develop', 'feature-20180104')
print(result)
# get the commits
for commit in result['commits']:
print(commit)
#
# get the diffs
for file_diff in result['diffs']:
print(file_diff)
# ---------------------------------------------------------------- #
# ---------------------------------------------------------------- #
# get the commits
for commit in result['commits']:
print(commit)
#
# get the diffs
for file_diff in result['diffs']:
print(file_diff)
# ---------------------------------------------------------------- #
'''