总述
自2021年6月21号由玩云原生的运维转玩云原生的开发至今,已有5月有余,除去中间的一些其他工作任务,实际参与(实际是一个人负责开发)多厂商容器平台开发应有3月有余。个人开发并规划的多厂商容器平台是根据此张由我个人设计的规划图进行的(ps:部门没有架构师级别的,能提供可行的架构图或者一点指导,所以只能根据4年的kubernetes使用经验和逛各大网站和大厂的相关文档)。
也算是我心目中的多厂商容器平台的开发思路吧,下面将重点围绕上图展开。
注:此篇为我心目中的"多厂商容器管理平台"。
依赖项
主要
- golang 1.16.6
- goland 2021.2.3
- gin 1.7.4
- gorm 1.9.16
- kubernetes 1.16 -- 1.19
- client-go 0.19.0
规划图浅解
采取模块分层开发:以适配层为分界线
1、上层为适配层,根据不同的厂商进行API封装,统一返回字段,需要根据厂商的字段进行不同的方法开发。
2、下层为核心层,直接调kubernetes接口,统一返回字段。
各容器厂商会根据实际需求,对容器平台进行适当的整改,如:
集群的创建方案[1、在创建集群的同时按照参数创建云厂商节点资源 2、导入已有的节点资源创建集群 3、等等等]
节点特性[1、根据地域就近调度 2、控制节点是否可见 3、等等等]
kubernetes附件组建[1、网络插件的按需选择或二开 2、节点的pod最大数 3、等等等]
导入第三方kubernetes集群的方案[1、导入config 2、导入secret 3、导入agent 4、等等等]
......
根据以上的考虑:
1、集群和节点的增删改采取适配器的方案开发
2、集群和节点的查询走kubernetes,其一查询是个频繁的操作而厂家平台是有APi调用次数限制,其二集群和节点的状态应以kubernetes原生为依据而不应该是以厂家的容器平台为依据,其三减少代码的无用重复便于维护
3、每个厂家的仓库(helm仓库和image仓库)都有各自的特性且不属于kubernetes核心资源且无法像kubernetes那样获取到最底层的API文档,所以增删改查都走厂家似乎没有不妥之处。
4、数据库需要维护的有:集群信息、节点信息、仓库的有关权控信息。尽可能的减少此服务的维护复杂度,能交给厂家和kubernetes的etcd维护的最好。
4.1 如没有"集群没创建成功即可查看集群的个别信息和集群没创建成功既可查看节点的有关信息"的需求,那其实集群和节点也没有数据库维护的必要性,我认为。5、kubernetes的核心资源,即可以被
kubectl api-resources
查到的资源,通过kubernetes的api即可。6、因为服务在可预见的时期是部署与虚机上,所以暂没有准备进行kubernetes 聚合api开发,但api最好为声明式的,留下可行性。
7、requests和response采取常规的json格式,没有选择采用yaml的json格式,主要是为了减轻前端的不便,且当下大厂也是各有格式,最主要的是目前还没有进行api聚合到kubernetes。
8、认为开发的重点应该在中期规划部分,这才所有创新,有所不同于众多厂家。
部分核心代码
目录结构:
.
├── Makefile
├── README.md
├── cmd
│ └── container
│ └── container.go
├── config
│ └── container.yaml
├── deploy
│ ├── docker
│ │ └── Dockerfile
│ └── vmware
├── docs
├── go.mod
├── go.sum
├── internal
│ └── container
│ ├── bootstrap
│ ├── controller
│ ├── dao
│ ├── dto
│ ├── ecode
│ ├── initialize
│ ├── job
│ ├── middleware
│ ├── models
│ ├── pkg
│ │ ├── kubernetes
│ │ │ ├── dto
│ │ │ └── service
│ │ ├── registry
│ │ │ ├── registry.go
│ │ │ └── tke
│ │ │ └── acr
│ │ │ └── harbor
│ │ └── rancher
│ │ ├── dto
│ │ └── service
│ │ └── ack
│ │ ├── dto
│ │ └── service
│ │ └── tke
│ │ ├── dto
│ │ └── service
│ ├── routers
│ └── service
├── pkg
├── scripts
│ ├── db.sh
│ └── deploy.sh
└── test
封装httpClient
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}
var (
Client HTTPClient
)
func init() {
Client = &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DisableKeepAlives: true,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second, // tcp连接超时时间
KeepAlive: 60 * time.Second, // 保持长连接的时间
DualStack: true,
}).DialContext, // 设置连接的参数
MaxIdleConns: 50, // 最大空闲连接
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100, // 每个host保持的空闲连接数
ExpectContinueTimeout: 30 * time.Second, // 等待服务第一响应的超时时间
IdleConnTimeout: 60 * time.Second, // 空闲连接的超时时间
},
}
}
// CheckRespStatus 状态检查
func CheckRespStatus(resp *http.Response) ([]byte, error) {
bodyBytes, _ := ioutil.ReadAll(resp.Body)
if resp.StatusCode >= 200 && resp.StatusCode < 400 {
return bodyBytes, nil
}
return nil, errors.New(string(bodyBytes))
}
// Request 建立http请求
func Request(url, token, body string, headerSet map[string]string, method string) (respStatusCode int, respBytes []byte, err error) {
request, err := http.NewRequest(method, url, strings.NewReader(body))
if err != nil {
return 401, nil, err
}
//添加token
if token != "" {
request.Header.Set("Authorization", "Bearer "+token)
}
// header 添加字段
if headerSet != nil {
for k, v := range headerSet {
request.Header.Set(k, v)
}
}
resp, err := Client.Do(request)
if err != nil {
return 401, nil, err
}
defer resp.Body.Close()
// 返回的状态码
respBytes, err = CheckRespStatus(resp)
respStatusCode = resp.StatusCode
return
}
封装clusterManager
type ClusterManager struct {
ClientSet *kubernetes.Clientset
Metrics *metrics.Clientset
DynamicClient dynamic.Interface
}
const (
//DefaultQPS High enough QPS to fit all expected use cases.
DefaultQPS = 1e6
//DefaultBurst High enough Burst to fit all expected use cases.
DefaultBurst = 1e6
)
func buildConfig(clusterName string) (*rest.Config, error) {
var clientConfig *rest.Config
var configV1 *clientcmdapiv1.Config
var dbCluster models.Cluster
var err error
var host string
rows := bootstrap.DB.Where("cluster_name = ?", clusterName).Find(&dbCluster).RowsAffected
if rows == 0 {
return nil, errors.New("the database does not have this information")
}
if dbCluster.KubeConfigSecret != "" {
kubeConfigBytes, err := base64.StdEncoding.DecodeString(dbCluster.KubeConfigSecret)
kubeConfigJson, err := yaml.YAMLToJSON(kubeConfigBytes)
err = json.Unmarshal(kubeConfigJson, &configV1)
if err != nil {
logrus.Error(err.Error())
}
// 切换匹配的版本
configObject, err := clientcmdlatest.Scheme.ConvertToVersion(configV1, clientcmdapi.SchemeGroupVersion)
configInternal := configObject.(*clientcmdapi.Config)
// 实例化配置信息
clientConfig, err = clientcmd.NewDefaultClientConfig(*configInternal, &clientcmd.ConfigOverrides{}).ClientConfig()
clientConfig.QPS = DefaultQPS
clientConfig.Burst = DefaultBurst
} else if dbCluster.Token != "" {
var addresses []dto.Addresses
err := json.Unmarshal([]byte(dbCluster.APIServer), &addresses)
for _, address := range addresses {
if address.Type == "Real" {
host = fmt.Sprintf("https://") + address.Host + fmt.Sprintf(":") + strconv.Itoa(address.Port)
break
}
}
if err != nil {
return nil, errors.New("request connection cluster failed")
}
clientConfig = &rest.Config{
Host: host,
APIPath: "",
ContentConfig: rest.ContentConfig{},
Username: "",
Password: "",
BearerToken: dbCluster.Token,
BearerTokenFile: "",
Impersonate: rest.ImpersonationConfig{},
AuthProvider: nil,
AuthConfigPersister: nil,
ExecProvider: nil,
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
},
UserAgent: "",
DisableCompression: false,
Transport: nil,
WrapTransport: nil,
QPS: DefaultQPS,
Burst: DefaultBurst,
RateLimiter: nil,
WarningHandler: nil,
Timeout: 0,
Dial: nil,
Proxy: nil,
}
} else {
return nil, errors.New("build client config error")
}
return clientConfig, nil
}
func BuildApiServerClient(clusterName string) (*ClusterManager, error) {
clientConfig, err := buildConfig(clusterName)
if err != nil {
return nil, err
}
if clientConfig == nil {
return nil, errors.New("err: error BuildApiServerClient")
}
clientSet, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
return nil, err
}
// 这里一定要调用Discovery().ServerVersion(),探测Kube apiServer是否可用,因为kubernetes.NewForConfig(restConfig)不会去检查服务是否可用,当服务不可用时,该方法不会返回错误的
_, err = clientSet.Discovery().ServerVersion()
if err != nil {
return nil, err
}
m, err := metrics.NewForConfig(clientConfig)
if err != nil {
return nil, err
}
d, err := dynamic.NewForConfig(clientConfig)
if err != nil {
return nil, err
}
clusterManager := &ClusterManager{
clientSet,
m,
d,
}
return clusterManager, nil
}
封装reponseApi
type ApiResponse struct {
Code int `json:"code"`
Msg string `json:"message"`
Data interface{} `json:"data"`
}
// PaginateResponse 显然这个结构体可以复用 ApiResponse, 但是 swagger 不认识!
type PaginateResponse struct {
Code int `json:"code"`
Msg string `json:"message"`
Data Paginate `json:"data"`
}
type Paginate struct {
CurPage int `json:"cur_page"` // 当前页
CurPageSize int `json:"cur_page_size"` // 每页展示数据量
Total int `json:"total"` // 总共数据量
TotalPage int `json:"total_page"` // 总共页数
Data interface{} `json:"data"` // 数据
}
// SuccessResponse API成功返回
func SuccessResponse(c *gin.Context, data interface{}) {
response(c, ecode.Success, data)
}
// SuccessPaginateResponse 分页返回
func SuccessPaginateResponse(c *gin.Context, data interface{}, total int, curPage int, curPageSize int) {
c.JSON(http.StatusOK, PaginateResponse{
Code: int(ecode.Success),
Msg: ecode.ErrMsg[ecode.Success],
Data: Paginate{CurPage: curPage, CurPageSize: curPageSize, Total: total, TotalPage: int(math.Ceil(float64(total) / float64(curPageSize))), Data: data},
})
c.Abort()
}
// ErrorResponse API失败返回
func ErrorResponse(c *gin.Context, code ecode.ErrCode, data interface{}) {
response(c, code, data)
}
func NotFoundResponse(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{
"code": 404,
"message": "页面未找到",
"data": "",
})
}
func response(c *gin.Context, code ecode.ErrCode, data interface{}) {
c.JSON(http.StatusOK, ApiResponse{
Code: int(code),
Msg: ecode.ErrMsg[code],
Data: data,
})
c.Abort()
}
几个调kubernetes的例子【认为比较有趣的例子】
# 倒序输出事件
events, err := w.workloadKubernetes.FindEvents(clientSet, namespace, name)
if err != nil {
return nil, err
}
var eventsWorkloadReps []*dto.EventsWorkloadRep
t := time.Time{}
for i := len(events) - 1; i >= 0; i-- {
if events[i].FirstTimestamp.Time == t {
events[i].FirstTimestamp.Time = events[i].EventTime.Time
}
if events[i].LastTimestamp.Time == t {
events[i].LastTimestamp.Time = events[i].EventTime.Time
}
eventsWorkloadReps = append(eventsWorkloadReps, &dto.EventsWorkloadRep{
WorkloadUUID: id,
FirstTimestamp: events[i].FirstTimestamp.Time,
LastTimestamp: events[i].LastTimestamp.Time,
Type: events[i].Type,
Kind: events[i].InvolvedObject.Kind,
Name: events[i].Name,
Reason: events[i].Reason,
Message: events[i].Message,
Count: events[i].Count,
})
}
if len(eventsWorkloadReps) == 0 {
return make([]*dto.EventsWorkloadRep, 0), nil
}
return eventsWorkloadReps, nil
# pod log
limit, _ := strconv.ParseInt(tailLines, 10, 64)
req := clientSet.CoreV1().Pods(namespace).GetLogs(name, &coreV1.PodLogOptions{Container: container, Timestamps: true, TailLines: &limit})
podLogs, err := req.Stream(context.TODO())
if err != nil {
return "error in opening stream"
}
defer podLogs.Close()
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return "error in copy information from podLogs to buf"
}
str := buf.String()
return str
# 根据kubeconfig 获取ApiServer\CertFile\Token
decoded, err := base64.StdEncoding.DecodeString(kubeConfig)
decodestr := string(decoded)
// 认证方式为kubeConfig
// 通过kubeConfig获取 api / token / certFile
kubeConfigJson, err := syaml.YAMLToJSON([]byte(decodestr))
var configV1 *clientcmdapiv1.Config
err = json.Unmarshal(kubeConfigJson, &configV1)
if err != nil {
return nil, nil, err
}
c, err := clientcmd.RESTConfigFromKubeConfig(decoded)
if err != nil {
return nil, nil, err
}
clientSet, err := kubernetes.NewForConfig(c)
if err != nil {
return nil, nil, err
}
sa, err := clientSet.CoreV1().ServiceAccounts("kube-system").Get(context.TODO(), "admin-user", metaV1.GetOptions{})
secrets, err := clientSet.CoreV1().Secrets("kube-system").Get(context.TODO(), sa.Secrets[0].Name, metaV1.GetOptions{})
if err != nil {
return nil, err
}
importClusterReq.ApiServer = configV1.Clusters[0].Cluster.Server
encoded := base64.StdEncoding.EncodeToString(configV1.Clusters[0].Cluster.CertificateAuthorityData)
importClusterReq.CertFile = encoded
importClusterReq.Token = string(secrets.Data["token"])
# 实现apply yaml 【......写成了x了】
func (y *Yaml) ApplyYaml(dynamicClient dynamic.Interface, clientSet *kubernetes.Clientset, yamlBody []byte) (interface{}, error) {
data, err := yamlutil.ToJSON(yamlBody)
var applyYaml dto.ApplyYaml
if err = json.Unmarshal(data, &applyYaml); err != nil {
return nil, err
}
var applyYamlRep string
decoder := yamlutil.NewYAMLOrJSONDecoder(bytes.NewReader(yamlBody), len(yamlBody))
var rawObj runtime.RawExtension
if err := decoder.Decode(&rawObj); err != nil {
return nil, err
}
obj, gvk, err := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, nil)
unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, err
}
unstructuredObj := &unstructured.Unstructured{Object: unstructuredMap}
// 获取支持的资源类型列表
gr, err := restmapper.GetAPIGroupResources(clientSet.Discovery())
if err != nil {
return nil, err
}
// 创建 'Discovery REST Mapper',获取查询的资源的类型
mapper := restmapper.NewDiscoveryRESTMapper(gr)
// 查找 Group/Version/Kind 的 REST 映射
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
var dri dynamic.ResourceInterface
// 需要为 namespace 范围内的资源提供不同的接口
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
if unstructuredObj.GetNamespace() == "" {
unstructuredObj.SetNamespace("default")
}
dri = dynamicClient.Resource(mapping.Resource).Namespace(unstructuredObj.GetNamespace())
} else {
dri = dynamicClient.Resource(mapping.Resource)
}
if applyYaml.Metadata.Namespace == "" {
applyYaml.Metadata.Namespace = "default"
}
// 查询k8s是否有该资源类型
switch applyYaml.Kind {
case "Deployment":
_, err = clientSet.AppsV1().Deployments(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "StatefulSet":
_, err = clientSet.AppsV1().StatefulSets(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "DaemonSet":
_, err = clientSet.AppsV1().DaemonSets(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "ReplicaSet":
_, err = clientSet.AppsV1().ReplicaSets(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "CronJob":
_, err = clientSet.BatchV1().CronJobs(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "Job":
_, err = clientSet.BatchV1().Jobs(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "Service":
_, err = clientSet.CoreV1().Services(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "ConfigMap":
_, err = clientSet.CoreV1().ConfigMaps(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "Ingress":
_, err = clientSet.ExtensionsV1beta1().Ingresses(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "ServiceAccount":
_, err = clientSet.CoreV1().ServiceAccounts(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "ClusterRole":
_, err = clientSet.RbacV1().ClusterRoles().Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "RoleBinding":
_, err = clientSet.RbacV1().RoleBindings(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "ClusterRoleBinding":
_, err = clientSet.RbacV1().ClusterRoleBindings().Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})
case "APIService":
_, err := dri.Create(context.Background(), unstructuredObj, metaV1.CreateOptions{})
if err != nil {
return nil, err
}
}
if err != nil {
if !errors.IsNotFound(err) {
return nil, err
}
// 不存在则创建
obj2, err := dri.Create(context.Background(), unstructuredObj, metaV1.CreateOptions{})
fmt.Println("obj2", obj2)
if err != nil {
return nil, err
}
applyYamlRep = fmt.Sprintf("%s/%s/%s created", obj2.GetNamespace(), obj2.GetKind(), obj2.GetName())
} else { // 已存在则更新
obj2, err := dri.Update(context.Background(), unstructuredObj, metaV1.UpdateOptions{})
if err != nil {
return nil, err
}
applyYamlRep = fmt.Sprintf("%s/%s/%s update", obj2.GetNamespace(), obj2.GetKind(), obj2.GetName())
}
return applyYamlRep, nil
}
以registry的命名空间新建为例 来个适配器的demo 【伪代码】
# controller层
func CreateContainerRegistryNamespace(c *gin.Context) {
var param dto.CreateContainerRegistryNamespaceReq
if err := c.ShouldBindJSON(¶m); err != nil {
controller.ErrorResponse(c, ecode.PARAMETER_ERR, err.Error())
return
}
cred, err := service.NewCredentialService().GetPlainTextCredential(param.CredentialID)
if err != nil {
ErrorResponse(c, ecode.ParameterErr, err.Error())
return
}
param.Token = cred.Token
if data, err := service.NewRegistryService(param.Source,param.Token).CreateContainerRegistryNamespace(param); err != nil {
controller.ErrorResponse(c, ecode.PARAMETER_ERR, err.Error())
} else {
controller.SuccessResponse(c, data)
}
}
# service层
type registryService struct {
cli registry.Registry
registryDao dao.RegistryDao
}
func NewRegistryService(source,token string) *registryService {
return ®istryService{
cli: registry.NewRegistryCli(source,token),
}
}
func (r *registryService) CreateContainerRegistryNamespace(param dto.CreateContainerRegistryNamespaceReq) (interface{}, error) {
rep, err := r.cli.CreateContainerRegistryNamespace(param.DisplayName, param.Describe, param.Visibility)
return rep, err
}
# interface层
type Registry interface {
CreateContainerRegistryNamespace(name, describe, visibility, uuid string) (*dto.ListContainerRegistryNamespacesRep, error)
}
func NewRegistryCli(source,token string) Registry {
var cli Registry
switch source {
case "tke":
cli = tke.NewTkeClient(token)
}
return cli
}
# 方法层
type tkeClient struct {
token string
}
func NewTkeClient(token) *tkeClient {
return &tkeClient{
token: token
}
}
func (t *tkeClient) CreateContainerRegistryNamespace(name, describe, visibility, uuid string) (*dto.ListContainerRegistryNamespacesRep, error) {
url := viper.GetString("url") + "/apis/registry.tkestack.io/v1/namespaces/"
createJson := tDto.CreateContainerRegistryReq{
APIVersion: "registry.tkestack.io/v1",
Kind: "Namespace",
Spec: tDto.CreateContainerRegistryReqSpec{
Name: name,
DisplayName: describe,
Visibility: visibility,
},
}
jsonData, errs := json.Marshal(createJson)
if errs != nil {
return nil, errs
}
_, rep, err := pkg.Request(url, token, string(jsonData), nil, http.MethodPost)
if err != nil {
errRep := gojsonq.New().FromString(err.Error()).Find("message")
return nil, errors.New(errRep.(string))
}
var listRep *tDto.ListRepItems
if err = json.NewDecoder(strings.NewReader(string(rep))).Decode(&listRep); err != nil {
errRep := gojsonq.New().FromString(err.Error()).Find("message")
return nil, errors.New(errRep.(string))
}
var createRep dto.ListContainerRegistryNamespacesRep
createRep.DisplayName = listRep.Spec.Name
createRep.Describe = listRep.Spec.DisplayName
createRep.Visibility = listRep.Spec.Visibility
createRep.Name = listRep.Metadata.Name
createRep.RepoCount = listRep.Status.RepoCount
return &createRep, nil
}
注:由上所述 似乎并没有牵扯到多么高深的操作,甚至是单纯的api调用、封装,也未涉及到中间件类的应用,随着开发的不断深入,此服务应逐渐复杂化
参考链接:
kubernetes 源码分析:https://jeffdingzone.com/category/k8s/
kubernetes api文档:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.19/
图解kubernetes中API聚合机制的实现: https://juejin.cn/post/6844904081438277640
单体仓库与多仓库都有哪些优势劣势,如何确定微服务落地的最佳实践?:https://ssoor.github.io/2020/03/24/mono-repo-vs-multi-repo/
kubernetes Events介绍:https://www.kubernetes.org.cn/1031.html