zoukankan      html  css  js  c++  java
  • Kratos 读源码笔记一(配置加载)

    从入口文件看配置(初始化/加载/绑定/热加载)

    main.go

    
    //main.go 初始化配置
    c := config.New(
    		config.WithSource(
    			file.NewSource(flagconf), //文件配置源
    			//也可以自己实现远程配置中心数据源
    		),
    	)
    	//加载配置数据
    	if err := c.Load(); err != nil {
    		panic(err)
    	}
    	var bc conf.Bootstrap
    	//将配置绑定到数据结构
    	if err := c.Scan(&bc); err != nil {
    		panic(err)
    	}
    	// watch key
    	if err := c.Watch("service.name", func(key string, value config.Value) {
    		log.Printf("config changed: %s = %v
    ", key, value)
    	}); err != nil {
    		panic(err)
    	}
    

    由入口文件可以看到,首先声明了数据源,再使用Kratos的配置接口加载/绑定/监控等操作,大胆猜测,c.Load() - c.Scan(&bc) - c.Watch() 一定调用了配置源的相关方法

    先来看看实现一个配置源需要实现哪些接口

    //config/source.go
    //通过config.WithSource 知道了 配置来源需要实现以下两个方法
    type Source interface {
    	Load() ([]*KeyValue, error)
    	Watch() (Watcher, error)
    }
    

    暂时不去看配置源的具体实现,回到入口文件,看看 c.Load() 的具体实现

    func (c *config) Load() error {
    	for _, src := range c.opts.sources {
    		kvs, err := src.Load()  //这里验证了我们的猜测,调用了具体配置源的 Load() 方法
    		if err != nil {
    			return err
    		}
    		if err := c.reader.Merge(kvs...); err != nil {
    			c.log.Errorf("failed to merge config source: %v", err)
    			return err
    		}
    		w, err := src.Watch()  //调用了具体配置源的 Watch() 方法
    		if err != nil {
    			c.log.Errorf("failed to watch config source: %v", err)
    			return err
    		}
    		c.watchers = append(c.watchers, w)
    		go c.watch(w) //此处开启了一个协程,处理热加载
    	}
    	if err := c.reader.Resolve(); err != nil {
    		c.log.Errorf("failed to resolve config source: %v", err)
    		return err
    	}
    	return nil
    }
    

    c.Load() 中开启了一个协程监控配置源变更,稍后我们看看这个具体实现,先看 c.Watch() 中发生了什么

    func (c *config) Watch(key string, o Observer) error {
    	if v := c.Value(key); v.Load() == nil {
    		return ErrNotFound
    	}
    	//将要监控的配置假如观察者中
    	c.observers.Store(key, o)
    	return nil
    }
    

    好,现在回过头去看上面提到的协程

    func (c *config) watch(w Watcher) {
    	for {
    		kvs, err := w.Next()
    		if errors.Is(err, context.Canceled) {
    			c.log.Infof("watcher's ctx cancel : %v", err)
    			return
    		}
    		if err != nil {
    			time.Sleep(time.Second)
    			c.log.Errorf("failed to watch next config: %v", err)
    			continue
    		}
    		if err := c.reader.Merge(kvs...); err != nil {
    			c.log.Errorf("failed to merge next config: %v", err)
    			continue
    		}
    		if err := c.reader.Resolve(); err != nil {
    			c.log.Errorf("failed to resolve next config: %v", err)
    			continue
    		}
    		c.cached.Range(func(key, value interface{}) bool {
    			k := key.(string)
    			v := value.(Value)
    			if n, ok := c.reader.Value(k); ok && !reflect.DeepEqual(n.Load(), v.Load()) {
    				v.Store(n.Load())
    				if o, ok := c.observers.Load(k); ok {
    					o.(Observer)(k, v)
    				}
    			}
    			return true
    		})
    	}
    }
    

    以上主要能看到 Kratos 的配置是怎样加载和监控的了,具体的细节还需要去看每一个方法的实现。这里我们主要讨论,怎样实现配置源

    以文件配置源举例:

    实现 Source 接口即可。

    type Source interface {
    	Load() ([]*KeyValue, error)
    	Watch() (Watcher, error)
    }
    

    具体实现:

    Load() ([]*KeyValue, error)

    config/file/file.go

    //本地文件或远程配置中心只要实现以上两个方法就可以,以本地文件配置为例
    func (f *file) Load() (kvs []*config.KeyValue, err error) {
    	fi, err := os.Stat(f.path)
    	if err != nil {
    		return nil, err
    	}
    	if fi.IsDir() {
    		return f.loadDir(f.path)
    	}
    	kv, err := f.loadFile(f.path)
    	if err != nil {
    		return nil, err
    	}
    	return []*config.KeyValue{kv}, nil
    }
    
    //先来看loadFile
    func (f *file) loadFile(path string) (*config.KeyValue, error) {
    	file, err := os.Open(path)
    	if err != nil {
    		return nil, err
    	}
    	defer file.Close()
    	data, err := ioutil.ReadAll(file)
    	if err != nil {
    		return nil, err
    	}
    	info, err := file.Stat()
    	if err != nil {
    		return nil, err
    	}
    	return &config.KeyValue{
    		Key:    info.Name(),
    		Format: format(info.Name()),
    		Value:  data,
    	}, nil
    }
    
    //loadDir 读取里循环调用了loadFile, 不支持子目录和隐藏文件
    func (f *file) loadDir(path string) (kvs []*config.KeyValue, err error) {
    	files, err := ioutil.ReadDir(f.path)
    	if err != nil {
    		return nil, err
    	}
    	for _, file := range files {
    		// ignore hidden files
    		if file.IsDir() || strings.HasPrefix(file.Name(), ".") {
    			continue
    		}
    		kv, err := f.loadFile(filepath.Join(f.path, file.Name()))
    		if err != nil {
    			return nil, err
    		}
    		kvs = append(kvs, kv)
    	}
    	return
    }
    
    

    Watch() (Watcher, error)

    config/file/file.go

    func (f *file) Watch() (config.Watcher, error) {
    	return newWatcher(f)
    }
    

    config/source.go Watcher的定义如下

    // Watcher watches a source for changes.
    type Watcher interface {
    	Next() ([]*KeyValue, error)
    	Stop() error
    }
    

    config/file/watcher.go

    //Next() ([]*KeyValue, error) 实现
    func (w *watcher) Next() ([]*config.KeyValue, error) {
    	select {
    	case <-w.ctx.Done():
    		return nil, w.ctx.Err()
    	case event := <-w.fw.Events:
    		if event.Op == fsnotify.Rename {
    			if _, err := os.Stat(event.Name); err == nil || os.IsExist(err) {
    				if err := w.fw.Add(event.Name); err != nil {
    					return nil, err
    				}
    			}
    		}
    		fi, err := os.Stat(w.f.path)
    		if err != nil {
    			return nil, err
    		}
    		path := w.f.path
    		if fi.IsDir() {
    			path = filepath.Join(w.f.path, filepath.Base(event.Name))
    		}
    		kv, err := w.f.loadFile(path)
    		if err != nil {
    			return nil, err
    		}
    		return []*config.KeyValue{kv}, nil
    	case err := <-w.fw.Errors:
    		return nil, err
    	}
    }
    
    //Stop() error 实现
    func (w *watcher) Stop() error {
    	w.cancel()
    	return w.fw.Close()
    }
    
    func newWatcher(f *file) (config.Watcher, error) {
    	fw, err := fsnotify.NewWatcher()
    	if err != nil {
    		return nil, err
    	}
    	if err := fw.Add(f.path); err != nil {
    		return nil, err
    	}
    	ctx, cancel := context.WithCancel(context.Background())
    	return &watcher{f: f, fw: fw, ctx: ctx, cancel: cancel}, nil
    }
    
  • 相关阅读:
    CocoaPods的安装使用和常见问题
    超全iOS面试资料,看完你还担心面试吗?
    IOS--多线程之线程间通讯
    iOS开发网络篇—发送GET和POST请求(使用NSURLSession)
    java之NIO编程
    libthrift0.9.0解析(四)之TThreadPoolServer&ServerContext
    android开发笔记
    rtsp转发服务器设计
    神经网络文献资料
    deep learning in nlp 资料文献
  • 原文地址:https://www.cnblogs.com/yangqi7/p/15194279.html
Copyright © 2011-2022 走看看