zoukankan      html  css  js  c++  java
  • etcd(实时共享配置信息)

    前言

    在分布式集群架构中各个组件之间如何解决以下2个关键问题?

    1.配置共享:共享同一份配置文件,如果这份配置文件更新之后,各个组件如何马上得知(我就是冲着watch for changes来的....)?

    2.服务注册发现:集群中新增节点如何做到自动发现?

    etcd简介

    etcd是Go语言开发的一个开源的、支持分布式的、高可用的key-value存储系统。

    可用于组册发现、配置共享中心。

    A distributed, reliable key-value store for the most critical data of a distributed system.

    是什么优势让etcd官网说 for the most critical data?

    优势:

    完全复制:etcd集群中的每个节点都可以使用完整的文档

    高可用:防止单点故障有leader和follower的选举机制

    一致性:每次读取都会返回跨多主机的最新写入

    部署简单:二进制直接运行

    安全:支持TSL

    数据可靠:使用Raft算法实现各etcd间存储的数据强一致性。

     ps:etcd使用raft算法实现了分布式锁。保证了etcd集群中节点中存储的数据永远一致,

    也就是说1个Python程序和1个Go程序同时向etcd中put同1个key是有锁的。

    架构

    从etcd的架构图中可以看到,etcd主要分为4个部分。

    http server:用户client发送的API操作请求、以及其他etcd节点的同步和心跳新校区

    store:处理etcd支持的各类功能事物,包括索引、节点变更、监控反馈、事件处理和执行,是API请求的具体实现。

    Raft:etcd集群中数据一致性的关键

    Wal(write ahead log):预写日志,是etcd数据存储的方式,实现持久化存储。

    使用etcd

    2379端口:接收client的http请求

    2380端口:用于etcd集群节点间通信

    启动

    root@zhanggen etcd-v3.3.18-linux-amd64]# ./etcd --name master1 --data-dir /data/etcd/ --wal-dir /data/etcd/wal/ --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381
    2020-05-21 01:35:01.096751 I | etcdmain: etcd Version: 3.3.18
    2020-05-21 01:35:01.096801 I | etcdmain: Git SHA: 3c8740a79
    2020-05-21 01:35:01.096806 I | etcdmain: Go Version: go1.12.9
    2020-05-21 01:35:01.096809 I | etcdmain: Go OS/Arch: linux/amd64
    2020-05-21 01:35:01.096812 I | etcdmain: setting maximum number of CPUs to 1, total number of available CPUs is 1
    2020-05-21 01:35:01.096820 I | etcdmain: advertising using detected default host "192.168.56.133"
    2020-05-21 01:35:01.096852 N | etcdmain: the server is already initialized as member before, starting as etcd member...
    2020-05-21 01:35:01.097083 I | embed: listening for peers on http://0.0.0.0:2381
    2020-05-21 01:35:01.097118 I | embed: listening for client requests on 0.0.0.0:2371
    2020-05-21 01:35:01.102854 I | etcdserver: name = master1
    2020-05-21 01:35:01.102869 I | etcdserver: data dir = /data/etcd/
    2020-05-21 01:35:01.102877 I | etcdserver: member dir = /data/etcd/member
    2020-05-21 01:35:01.102879 I | etcdserver: dedicated WAL dir = /data/etcd/wal/
    2020-05-21 01:35:01.102882 I | etcdserver: heartbeat = 100ms
    2020-05-21 01:35:01.102885 I | etcdserver: election = 1000ms
    2020-05-21 01:35:01.102887 I | etcdserver: snapshot count = 100000
    2020-05-21 01:35:01.102895 I | etcdserver: advertise client URLs = http://0.0.0.0:2371
    2020-05-21 01:35:01.102899 I | etcdserver: initial advertise peer URLs = http://192.168.56.133:2381
    2020-05-21 01:35:01.102905 I | etcdserver: initial cluster = master1=http://192.168.56.133:2381
    2020-05-21 01:35:01.112656 I | etcdserver: starting member 36d9af938bdf88c5 in cluster 24d7db2fa7e0796c
    2020-05-21 01:35:01.112685 I | raft: 36d9af938bdf88c5 became follower at term 0
    2020-05-21 01:35:01.112696 I | raft: newRaft 36d9af938bdf88c5 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0]
    2020-05-21 01:35:01.112699 I | raft: 36d9af938bdf88c5 became follower at term 1
    2020-05-21 01:35:01.115541 W | auth: simple token is not cryptographically signed
    2020-05-21 01:35:01.117263 I | etcdserver: starting server... [version: 3.3.18, cluster version: to_be_decided]
    2020-05-21 01:35:01.119207 I | etcdserver: 36d9af938bdf88c5 as single-node; fast-forwarding 9 ticks (election ticks 10)
    2020-05-21 01:35:01.119674 I | etcdserver/membership: added member 36d9af938bdf88c5 [http://192.168.56.133:2381] to cluster 24d7db2fa7e0796c
    2020-05-21 01:35:01.413844 I | raft: 36d9af938bdf88c5 is starting a new election at term 1
    2020-05-21 01:35:01.413947 I | raft: 36d9af938bdf88c5 became candidate at term 2
    2020-05-21 01:35:01.414120 I | raft: 36d9af938bdf88c5 received MsgVoteResp from 36d9af938bdf88c5 at term 2
    2020-05-21 01:35:01.414179 I | raft: 36d9af938bdf88c5 became leader at term 2
    2020-05-21 01:35:01.414206 I | raft: raft.node: 36d9af938bdf88c5 elected leader 36d9af938bdf88c5 at term 2
    2020-05-21 01:35:01.415431 I | etcdserver: setting up the initial cluster version to 3.3
    2020-05-21 01:35:01.420094 N | etcdserver/membership: set the initial cluster version to 3.3
    2020-05-21 01:35:01.420297 I | etcdserver/api: enabled capabilities for version 3.3
    2020-05-21 01:35:01.420398 I | etcdserver: published {Name:master1 ClientURLs:[http://0.0.0.0:2371]} to cluster 24d7db2fa7e0796c
    2020-05-21 01:35:01.421353 E | etcdmain: forgot to set Type=notify in systemd service file?
    2020-05-21 01:35:01.421407 I | embed: ready to serve client requests
    2020-05-21 01:35:01.424229 N | embed: serving insecure client requests on [::]:2371, this is strongly discouraged!

    测试

    [root@zhanggen etcd-v3.3.18-linux-amd64]# export ETCDCTL_API=3
    [root@zhanggen etcd-v3.3.18-linux-amd64]# ./etcdctl --endpoints=192.168.56.133:2371 put name "Martin"
    OK
    [root@zhanggen etcd-v3.3.18-linux-amd64]# ./etcdctl --endpoints=192.168.56.133:2371 get name 
    name
    Martiin
    [root@zhanggen etcd-v3.3.18-linux-amd64]# 

    go连接etcd

    D:goprojectsrcjd.cometcdDemo>go get go.etcd.io/etcd/clientv3
    go: downloading google.golang.org/grpc v1.26.0
    go: downloading github.com/golang/protobuf v1.3.2
    go: extracting github.com/golang/protobuf v1.3.2
    go: extracting google.golang.org/grpc v1.26.0

     put和get模式

    package main
    
    import (
    	"context"
    	"go.etcd.io/etcd/clientv3"
    	"fmt"
    	"time"
    )
    
    func main() {
    	//创建1个连接etcd的连接
    	client,err:=clientv3.New(clientv3.Config{
    		Endpoints: []string{"192.168.56.133:2371"},
    		DialTimeout: 5*time.Second,
    	})
    	if err!=nil{
    		fmt.Println("I try to connect etcd fiald ",err)
    	}
    	fmt.Println("The connection to etcd was connected!")
    	//记得关闭连接
    	defer client.Close()
    
    	//put值
    	ctx,cancel:=context.WithTimeout(context.Background(),time.Second)
    	//设置1秒钟超时时间
    	key:="name"
    	_,err=client.Put(ctx,key,"Martin")
    	cancel()
    	if err!=nil{
    		fmt.Println("The instraction put value to etcd was faild.")
    	}
    	//get值
    	ctx,cancel=context.WithTimeout(context.Background(),time.Second)
    	//查询所有以name开头的key(支持模糊查询)
    	response,err:=client.Get(ctx,key,clientv3.WithPrefix())
    	cancel()
    	if err!=nil{
    		fmt.Println("The instraction get value from etcd was faild.")
    	}
    	//fmt.Println(response)
    	/*
    	&{cluster_id:2654831503084648812 member_id:3952383196236056773 revision:3 raft_term:2  [key:"name" create_revision:2 mod_revision:3 version:2 value:",martin"
    	] false 1 {} [] 0}
    	*/
    	for _,ev:=range response.Kvs{
    		fmt.Printf("key:%s value:%s
    ",ev.Key,ev.Value)
    	}
    
    }
    

      

    watch模式

    排1个哨兵去监控key发生的事件,马上通知给我!(watcher模式可以实现配置文件的热加载!)

    package main
    
    import (
    	"context"
    	"go.etcd.io/etcd/clientv3"
    	"fmt"
    	"time"
    )
    
    func main() {
    	//创建1个连接etcd的连接
    	client,err:=clientv3.New(clientv3.Config{
    		Endpoints: []string{"192.168.56.133:2371"},
    		DialTimeout: 5*time.Second,
    	})
    	if err!=nil{
    		fmt.Println("I try to connect etcd fiald ",err)
    	}
    	fmt.Println("The connection to etcd was connected!")
    	//记得关闭连接
    	defer client.Close()
    
    	//put值
    	ctx,cancel:=context.WithTimeout(context.Background(),time.Second)
    	//设置1秒钟超时时间
    	key:="name"
    	_,err=client.Put(ctx,key,"Martin")
    	cancel()
    	if err!=nil{
    		fmt.Println("The instraction put value to etcd was faild.")
    	}
    	//get值
    	ctx,cancel=context.WithTimeout(context.Background(),time.Second)
    	//查询所有以name开头的key(支持模糊查询)
    	response,err:=client.Get(ctx,key,clientv3.WithPrefix())
    	cancel()
    	if err!=nil{
    		fmt.Println("The instraction get value from etcd was faild.")
    	}
    	//fmt.Println(response)
    	/*
    	&{cluster_id:2654831503084648812 member_id:3952383196236056773 revision:3 raft_term:2  [key:"name" create_revision:2 mod_revision:3 version:2 value:",martin"
    	] false 1 {} [] 0}
    	*/
    	for _,ev:=range response.Kvs{
    		fmt.Printf("key:%s value:%s
    ",ev.Key,ev.Value)
    	}
    	ctx=context.TODO()
    	//安插1个探针 监控name key,返回1个通道
    	monitorChanel:=client.Watch(ctx,key)
    	//不断从通道中获取监控事件
    	for monitor:= range monitorChanel{
    		//获取监控事件
    		for _,event:=range monitor.Events{
    			fmt.Printf("type:%v,key:%v,value:%v
    ",event.Type,string(event.Kv.Key),string(event.Kv.Value))
    		}
    
    	}
    
    
    }
    

      

    利用etcd的watcher功能实现配置信息热加载

    package taillog
    
    import (
    	"fmt"
    	"time"
    	"jd.com/logagent/etcd"
    )
    
    //定义1个全局的taskpool
    var tiallpoolObj *tailPool
    
    type tailPool struct {
    	//保存从etcd中获取的所有logAgent配置
    	logConfigs            []*etcd.LogEntry
    	taskMaping            map[string]*TaillTask
    	watchNewConfigChannel chan []*etcd.LogEntry
    }
    
    func InitTaskPool(logConfigList []*etcd.LogEntry) {
    	//初始化1个taill连接池
    	tiallpoolObj = &tailPool{
    		logConfigs:            logConfigList, //把当前获取的配置项保存起来
    		taskMaping:            make(map[string]*TaillTask,32),
    		watchNewConfigChannel: make(chan []*etcd.LogEntry), //无缓冲区通道(没有值1一直阻塞)
    
    	}
    	//在pool中初始化日志采集task
    	for _, cfg := range logConfigList {
    		//生成真正的日志采集模块
    		var taiiobj TaillTask
    		task,err:=taiiobj.NewTaillTask(cfg.Path, cfg.Topic)
    		taskKey := fmt.Sprint(cfg.Path, cfg.Topic)
    		tiallpoolObj.taskMaping[taskKey] = task
    		if err!=nil {
    			fmt.Printf("初始化%s采集日志模块失败%s",cfg.Path,err)
    		}
    
    	}
    	//开启日志采集池
    	go tiallpoolObj.run()
    
    }
    
    //watchNewConfigChannel 配置更新之后,做对应的处理
    //1.配置新增
    //2.配置删除
    //3.配置变更
    
    func (T *tailPool)seekDifference(confs[]*etcd.LogEntry,confsMap map[string]*TaillTask)(difference []*etcd.LogEntry){
    	for _, conf := range confs{
    		MK := fmt.Sprint(conf.Path, conf.Topic)
    		_, ok := confsMap[MK]
    		if !ok {
    			fmt.Println("检查到key发生变化----->:",MK)
    			difference= append(difference,conf)
    		}
    		continue
    
    	}
    	return
    }
    
    func (T *tailPool) run() {
    	for {
    		select {
    		case newConfSlice := <-T.watchNewConfigChannel:
    			fmt.Println("---------新的配置来了---------")
    			//增加
    			addList:=T.seekDifference(newConfSlice,T.taskMaping)
    			for _, conf := range addList {
    				var taiiobj TaillTask
    				task,err:=taiiobj.NewTaillTask(conf.Path, conf.Topic)
    				if err!=nil {
    					fmt.Println("初始化采集日志模块失败",err)
    				}
    				taskKey := fmt.Sprint(conf.Path, conf.Topic)
    				tiallpoolObj.taskMaping[taskKey] = task
    				fmt.Printf("增加%s日志采集模块成功
    ",taskKey)
    			}
    			//删除
    			newTaskMaping:=make(map[string]*TaillTask,32)
    			for _, cnf := range newConfSlice {
    				mk:=fmt.Sprint(cnf.Path,cnf.Topic)
    				newTaskMaping[mk]=nil
    			}
    			deleteList:=T.seekDifference(T.logConfigs,newTaskMaping)
    			for _, item := range deleteList {
    				taskKey := fmt.Sprint(item.Path, item.Topic)
    				T.taskMaping[taskKey].exit()
    				delete(T.taskMaping,taskKey)
    			}
    			//更新logConfigs
    			tiallpoolObj.logConfigs=newConfSlice
    			fmt.Println("最新日志采集任务列表",T.taskMaping)
    		default:
    			time.Sleep(time.Second)
    
    		}
    
    	}
    }
    
    //向外暴露watchNewConfigChannel
    func PushNewConfig() chan<- []*etcd.LogEntry {
    	return tiallpoolObj.watchNewConfigChannel
    }
    

      

    参考

  • 相关阅读:
    ORA-01940: cannot drop a user that is currently connected 问题解析
    Oracle11g数据库导入Oracle10g操作成功
    固态硬盘
    Oracle数据库默认的data pump dir在哪
    navicat 关于orcale新建表空间,用户和权限分配
    oracle 11g 完全卸载方法
    完全卸载oracle11g步骤
    架构设计:负载均衡层设计方案(4)——LVS原理
    C++中使用REST操作
    在C#中实现视频播放器
  • 原文地址:https://www.cnblogs.com/sss4/p/12930784.html
Copyright © 2011-2022 走看看