zoukankan      html  css  js  c++  java
  • k8s device plugin

    基本概念入门:

    Device Manager Proposal

    Device plugin offical Doc中文 )

    device-plugins offical Doc(En)

    Go through Intel FPGA Plugin code

    1.  cmd/fpga_plugin/fpga_plugin.go

    生成一个新的puglin, pulgin传入的信息sysfs,devfs和mode(共两种mode: af or region

    plugin, err := newDevicePlugin(sysfsDirectory, devfsDirectory, mode)
    if err != nil {
    fatal(err)
    }
    
    fmt.Println("FPGA device plugin started in ", mode, " mode")
    
    manager := dpapi.NewManager(namespace, plugin)
    manager.Run()
    

    2.  internal/deviceplugin/manager.go

    会生成一个server, 然后run, 主要就是devicePlugin.Scan (具体到某个device),扫描设备信息,然后启动grpc Serve(handleUpdate)

    // Manager manages life cycle of device plugins and handles the scan results
    // received from them.
    type Manager struct {
            devicePlugin Scanner
            namespace    string
            servers      map[string]devicePluginServer
            createServer func(string, func(*pluginapi.AllocateResponse) error) devicePluginServer
    }
    
    // NewManager creates a new instance of Manager
    func NewManager(namespace string, devicePlugin Scanner) *Manager {
            return &Manager{
                    devicePlugin: devicePlugin,
                    namespace:    namespace,
                    servers:      make(map[string]devicePluginServer),
                    createServer: newServer,
            }
    }
    
    // Run prepares and launches event loop for updates from Scanner
    func (m *Manager) Run() {
            updatesCh := make(chan updateInfo)
    
            go func() {
                    err := m.devicePlugin.Scan(newNotifier(updatesCh))
                    if err != nil {
                            fmt.Printf("Device scan failed: %+v
    ", err)
                            os.Exit(1)
                    }
                    close(updatesCh)
            }()
    
            for update := range updatesCh {
                    m.handleUpdate(update)
            }
    }
                           
    

    handleUpdate 启动grpc 服务 m.servers[dt].Serve(m.namespace)

    func (m *Manager) handleUpdate(update updateInfo) {
            debug.Print("Received dev updates:", update)
            for devType, devices := range update.Added {
                    var postAllocate func(*pluginapi.AllocateResponse) error
    
                    if postAllocator, ok := m.devicePlugin.(PostAllocator); ok {
                            postAllocate = postAllocator.PostAllocate
                    }
    
                    m.servers[devType] = m.createServer(devType, postAllocate)
                    go func(dt string) {
                            err := m.servers[dt].Serve(m.namespace)
                            if err != nil {
                                    fmt.Printf("Failed to serve %s/%s: %+v
    ", m.namespace, dt, err)
                                    os.Exit(1)
                            }
                    }(devType)
                    m.servers[devType].Update(devices)
            }
            for devType, devices := range update.Updated {
                    m.servers[devType].Update(devices)
            }
            for devType := range update.Removed {
                    m.servers[devType].Stop()
                    delete(m.servers, devType)
            }
    }
    

    3. cmd/fpga_plugin/fpga_plugin.go

    获得Device的具体信息

    // Scan starts scanning FPGA devices on the host
    func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
            for {
                    devTree, err := dp.scanFPGAs()
                    if err != nil {
                            return err
                    }
    
                    notifier.Notify(devTree)
    
                    time.Sleep(5 * time.Second)
            }
    }
    

    4. 启动GRPC 服务

    // Serve starts a gRPC server to serve pluginapi.PluginInterfaceServer interface.
    func (srv *server) Serve(namespace string) error {
            return srv.setupAndServe(namespace, pluginapi.DevicePluginPath, pluginapi.KubeletSocket)
    }
    
    // setupAndServe binds given gRPC server to device manager, starts it and registers it with kubelet.
    func (srv *server) setupAndServe(namespace string, devicePluginPath string, kubeletSocket string) error {
            resourceName := namespace + "/" + srv.devType
            pluginPrefix := namespace + "-" + srv.devType
    
            for {
                    pluginEndpoint := pluginPrefix + ".sock"
                    pluginSocket := path.Join(devicePluginPath, pluginEndpoint)
    
                    if err := waitForServer(pluginSocket, time.Second); err == nil {
                            return errors.Errorf("Socket %s is already in use", pluginSocket)
                    }
                    os.Remove(pluginSocket)
    
                    lis, err := net.Listen("unix", pluginSocket)
                    if err != nil {
                            return errors.Wrap(err, "Failed to listen to plugin socket")
                    }
    
                    srv.grpcServer = grpc.NewServer()
                    pluginapi.RegisterDevicePluginServer(srv.grpcServer, srv)
    
                    // Starts device plugin service.
                    go func() {
                            fmt.Printf("Start server for %s at: %s
    ", srv.devType, pluginSocket)
                            srv.grpcServer.Serve(lis)
                    }()
    
                    // Wait for the server to start
                    if err = waitForServer(pluginSocket, 10*time.Second); err != nil {
                            return err
                    }
    
                    // Register with Kubelet.
                    err = registerWithKubelet(kubeletSocket, pluginEndpoint, resourceName)
                    if err != nil {
                            return err
                    }
                    fmt.Printf("Device plugin for %s registered
    ", srv.devType)
    
                    // Kubelet removes plugin socket when it (re)starts
                    // plugin must restart in this case
                    if err = watchFile(pluginSocket); err != nil {
                            return err
                    }
                    fmt.Printf("Socket %s removed, restarting
    ", pluginSocket)
    
                    srv.grpcServer.Stop()
                    os.Remove(pluginSocket)
            }
    }
    

      

    5. 注册GRPC server

    vendor/k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.pb.go

    func RegisterRegistrationServer(s *grpc.Server, srv RegistrationServer) {
            s.RegisterService(&_Registration_serviceDesc, srv)
    }
    

    "vendor/google.golang.org/grpc/server.go"

    // RegisterService registers a service and its implementation to the gRPC
    // server. It is called from the IDL generated code. This must be called before
    // invoking Serve.
    func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
            ht := reflect.TypeOf(sd.HandlerType).Elem()
            st := reflect.TypeOf(ss)
            if !st.Implements(ht) {
                    grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
            }
            s.register(sd, ss)
    }
    
    func (s *Server) register(sd *ServiceDesc, ss interface{}) {
            s.mu.Lock()
            defer s.mu.Unlock()
            s.printf("RegisterService(%q)", sd.ServiceName)
            if s.serve {
                    grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
            }
            if _, ok := s.m[sd.ServiceName]; ok {
                    grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
            }
            srv := &service{
                    server: ss,
                    md:     make(map[string]*MethodDesc),
                    sd:     make(map[string]*StreamDesc),
                    mdata:  sd.Metadata,
            }
            for i := range sd.Methods {
                    d := &sd.Methods[i]
                    srv.md[d.MethodName] = d
            }
            for i := range sd.Streams {
                    d := &sd.Streams[i]
                    srv.sd[d.StreamName] = d
            }
            s.m[sd.ServiceName] = srv
    }
    

    (s *Server) Serve

    // Serve accepts incoming connections on the listener lis, creating a new
    // ServerTransport and service goroutine for each. The service goroutines
    // read gRPC requests and then call the registered handlers to reply to them.
    // Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
    // this method returns.
    // Serve will return a non-nil error unless Stop or GracefulStop is called.
    func (s *Server) Serve(lis net.Listener) error {
    	s.mu.Lock()
    	s.printf("serving")
    	s.serve = true
    	if s.lis == nil {
    		// Serve called after Stop or GracefulStop.
    		s.mu.Unlock()
    		lis.Close()
    		return ErrServerStopped
    	}
    
    	s.serveWG.Add(1)
    	defer func() {
    		s.serveWG.Done()
    		select {
    		// Stop or GracefulStop called; block until done and return nil.
    		case <-s.quit:
    			<-s.done
    		default:
    		}
    	}()
    
    	ls := &listenSocket{Listener: lis}
    	s.lis[ls] = true
    
    	if channelz.IsOn() {
    		ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
    	}
    	s.mu.Unlock()
    
    	defer func() {
    		s.mu.Lock()
    		if s.lis != nil && s.lis[ls] {
    			ls.Close()
    			delete(s.lis, ls)
    		}
    		s.mu.Unlock()
    	}()
    
    	var tempDelay time.Duration // how long to sleep on accept failure
    
    	for {
    		rawConn, err := lis.Accept()
    		if err != nil {
    			if ne, ok := err.(interface {
    				Temporary() bool
    			}); ok && ne.Temporary() {
    				if tempDelay == 0 {
    					tempDelay = 5 * time.Millisecond
    				} else {
    					tempDelay *= 2
    				}
    				if max := 1 * time.Second; tempDelay > max {
    					tempDelay = max
    				}
    				s.mu.Lock()
    				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
    				s.mu.Unlock()
    				timer := time.NewTimer(tempDelay)
    				select {
    				case <-timer.C:
    				case <-s.quit:
    					timer.Stop()
    					return nil
    				}
    				continue
    			}
    			s.mu.Lock()
    			s.printf("done serving; Accept = %v", err)
    			s.mu.Unlock()
    
    			select {
    			case <-s.quit:
    				return nil
    			default:
    			}
    			return err
    		}
    		tempDelay = 0
    		// Start a new goroutine to deal with rawConn so we don't stall this Accept
    		// loop goroutine.
    		//
    		// Make sure we account for the goroutine so GracefulStop doesn't nil out
    		// s.conns before this conn can be added.
    		s.serveWG.Add(1)
    		go func() {
    			s.handleRawConn(rawConn)
    			s.serveWG.Done()
    		}()
    	}
    }
    

    gRPC tutorial

    6.  注册kebelet

    func registerWithKubelet(kubeletSocket, pluginEndPoint, resourceName string) error {
            conn, err := grpc.Dial(kubeletSocket, grpc.WithInsecure(),
                    grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
                            return net.DialTimeout("unix", addr, timeout)
                    }))
            if err != nil {
                    return errors.Wrap(err, "Cannot connect to kubelet service")
            }
            defer conn.Close()
            client := pluginapi.NewRegistrationClient(conn)
            reqt := &pluginapi.RegisterRequest{
                    Version:      pluginapi.Version,
                    Endpoint:     pluginEndPoint,
                    ResourceName: resourceName,
            }
    
            _, err = client.Register(context.Background(), reqt)
            if err != nil {
                    return errors.Wrap(err, "Cannot register to kubelet service")
            }
    
            return nil
    }
    

    7. 定义  DevicePluginServer interface   

    "vendor/k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.pb.go"  

    / Server API for DevicePlugin service
    
    type DevicePluginServer interface {
            // GetDevicePluginOptions returns options to be communicated with Device
            // Manager
            GetDevicePluginOptions(context.Context, *Empty) (*DevicePluginOptions, error)
            // ListAndWatch returns a stream of List of Devices
            // Whenever a Device state change or a Device disapears, ListAndWatch
            // returns the new list
            ListAndWatch(*Empty, DevicePlugin_ListAndWatchServer) error
            // Allocate is called during container creation so that the Device
            // Plugin can run device specific operations and instruct Kubelet
            // of the steps to make the Device available in the container
            Allocate(context.Context, *AllocateRequest) (*AllocateResponse, error)
            // PreStartContainer is called, if indicated by Device Plugin during registeration phase,
            // before each container start. Device plugin can run device specific operations
            // such as reseting the device before making devices available to the container
            PreStartContainer(context.Context, *PreStartContainerRequest) (*PreStartContainerResponse, error)
    }
    

    具体实现

    "internal/deviceplugin/server.go"  

     

    参考===============================

    prepare

    Kubernetes的Device Plugin设计解读

    深入浅出kubernetes之device-plugins

     kubernetes调度gpu

     KubeVirt:通过CRD扩展Kubernetes实现虚拟机管理

    kubernetes系列之十四:Kubernetes CRD(CustomResourceDefinition)概览

    Extend the Kubernetes API with CustomResourceDefinitions 

    用户资源定义(基本上所有的项目都用到了这个)

    example

     Kubernetes CRD (CustomResourceDefinition) 自定义资源类型

    REF:

    k8s 基本概念

    k8s 系列介绍

    API Extensions

    Schedule GPUs 

     中文分析

    KUBERNETES ON NVIDIA GPUS

    RDMA device plugin for Kubernetes

    intel-device-plugins-for-kubernetes

      

    概念:

    1. Opaque Integer Resources (OIRs) 

    Scheduling • Opaque Integer Resources (OIRs) ⽬目前已棄⽤,也將在 v1.9 版本移除。 • Extended Resources (ERs) 成為 OIRs 的替代 Resource。 • 使⽤用者能夠使⽤用 kubernetes.io/ domain 之外的任何域名前輟,不再是使 ⽤用 pod.alpha.kubernetes.io/opaque-int-resource- prefix。

    使用 Kustomize 对 Kubernetes 对象进行声明式管理  

    node-feature-discovery

    Intel®’s Kubernetes* Documentation Site!  

    为容器设置启动时要执行的命令和参数 

  • 相关阅读:
    CodeForces 450
    CodeForces 400
    CodeForces 1
    [HDU POJ] 逆序数
    [HDU 1166] 敌兵布阵
    [转] 树状数组学习
    关于1月4日到1月7日
    [HDU 1565+1569] 方格取数
    [POJ 1459] Power Network
    [转] 网络流算法--Ford-Fulkerson方法及其多种实现
  • 原文地址:https://www.cnblogs.com/shaohef/p/9478309.html
Copyright © 2011-2022 走看看