zoukankan      html  css  js  c++  java
  • Zookeeper【概述、安装、原理、使用】

    第1章 Zookeeper入门

    1.1 概述

    Zookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。


    1.2 特点


    1.3 数据结构

    1.4应用场景

    统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等


    第2章 Zookeep安装

    2.1 下载地址

    1.官网首页:https://zookeeper.apache.org/

    2.2 本地模式安装

    1. 安装前准备

    (1)安装Jdk

    (2)拷贝Zookeeper安装包到Linux系统下

    (3)解压到指定目录

    [atguigu@hadoop102 software]$ tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/
    
    2. 配置修改

    (1)将/opt/module/zookeeper-3.4.10/conf这个路径下的zoo_sample.cfg修改为zoo.cfg;

    [atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
    

    (2)打开zoo.cfg文件,修改dataDir路径:

    [atguigu@hadoop102 zookeeper-3.4.10]$ vim zoo.cfg
    

    修改如下内容:

    dataDir=/opt/module/zookeeper-3.4.10/zkData
    

    (3)在/opt/module/zookeeper-3.4.10/这个目录上创建zkData文件夹

    [atguigu@hadoop102 zookeeper-3.4.10]$ mkdir zkData
    
    3. 操作Zookeeper

    (1)启动Zookeeper

    [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh start
    

    (2)查看进程是否启动

    [atguigu@hadoop102 zookeeper-3.4.10]$ jps
    4020 Jps
    4001 QuorumPeerMain
    

    (3)查看状态:

    [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
    Mode: standalone
    

    (4)启动客户端:

    [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkCli.sh
    

    (5)退出客户端:

    [zk: localhost:2181(CONNECTED) 0] quit
    

    (6)停止Zookeeper

    [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh stop
    

    2.3 分布式模式安装

    1. 集群规划

    在hadoop102、hadoop103和hadoop104三个节点上部署Zookeeper。

    2. 解压安装

    (1)解压Zookeeper安装包到hadoop102的/opt/module/目录下

    [atguigu@hadoop102 software]$ tar -zxvf zookeeper-3.5.7.tar.gz -C /opt/module/
    
    3. 配置参数解读

    Zookeeper中的配置文件zoo.cfg中参数含义解读如下:

    1)tickTime =2000:通信心跳数,Zookeeper服务器与客户端心跳时间,单位毫秒

    Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒。

    它用于心跳机制,并且设置最小的session超时时间为两倍心跳时间。(session的最小超时时间是2*tickTime)

    2)initLimit =10:LF初始通信时限

    集群中的Follower跟随者服务器与Leader领导者服务器之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的Zookeeper服务器连接到Leader的时限。

    3)syncLimit =5:LF同步通信时限

    集群中Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。

    4)dataDir:数据文件目录+数据持久化路径

    主要用于保存Zookeeper中的数据。

    5)clientPort =2181:客户端连接端口

    监听客户端连接的端口。

    4. 配置服务器编号**

    (1)在/opt/module/zookeeper-3.5.7/这个目录下创建zkData

    [atguigu@hadoop102 zookeeper-3.5.7]$ mkdir -p zkData
    

    (2)在/opt/module/zookeeper-3.5.7/zkData目录下创建一个myid的文件

    [atguigu@hadoop102 zkData]$ touch myid
    

    注意:添加myid文件,注意一定要在linux里面创建,在notepad++里面很可能乱码

    (3)编辑myid文件

    [atguigu@hadoop102 zkData]$ vi myid
    

    在文件中添加与server对应的编号:

    2
    
    5. 配置zoo.cfg文件

    (1)重命名/opt/module/zookeeper-3.5.7/conf这个目录下的zoo_sample.cfg为zoo.cfg

    [atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
    

    (2)打开zoo.cfg文件

    [atguigu@hadoop102 conf]$ vim zoo.cfg
    

    修改数据存储路径配置

    dataDir=/opt/module/zookeeper-3.5.7/zkData
    

    增加如下配置

    #######################cluster##########################
    server.2=hadoop102:2888:3888
    server.3=hadoop103:2888:3888
    server.4=hadoop104:2888:3888
    

    (3)拷贝配置好的zookeeper到其他机器上

    [atguigu@hadoop102 module]$ xsync zookeeper-3.5.7/
    

    并分别在hadoop103、hadoop104上修改myid文件中内容为3、4

    (4)配置参数解读

    server.A=B:C:D。

    A是一个数字,表示这个是第几号服务器;

    集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。

    B是这个服务器的地址;

    C是这个服务器Follower与集群中的Leader服务器交换信息的端口;

    D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

    6. 集群操作

    (1)分别启动Zookeeper

    [atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start
    [atguigu@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh start
    [atguigu@hadoop104 zookeeper-3.5.7]$ bin/zkServer.sh start
    

    (2)查看状态

    [atguigu@hadoop102 zookeeper-3.5.7]# bin/zkServer.sh status
    JMX enabled by default
    Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
    Mode: follower
    
    [atguigu@hadoop103 zookeeper-3.5.7]# bin/zkServer.sh status
    JMX enabled by default
    Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
    Mode: leader
    
    [atguigu@hadoop104 zookeeper-3.5.7]# bin/zkServer.sh status
    JMX enabled by default
    Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
    Mode: follower
    
    7. zk.sh脚本
    #!/bin/bash
    if [ $# -lt 1 ]
    then
        echo "No Args Input..."
        exit ;
    fi
    
    case $1 in
    "start")
            for i in hadoop102 hadoop103 hadoop104
        do
            echo "=====================  $i  ======================="
            ssh $i "source /etc/profile && /opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
        done
    ;;
    "stop")
            for i in hadoop102 hadoop103 hadoop104
        do
            echo "=====================  $i  ======================="
            ssh $i "source /etc/profile && /opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
        done
    ;;
    "status")
            for i in hadoop102 hadoop103 hadoop104
        do
            echo "=====================  $i  ======================="
            ssh $i "source /etc/profile && /opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
        done
    ;;
    *)
        echo "Input Args Error..."
    ;;
    esac
    

    第3章 客户端命令行操作

    命令基本语法 功能描述
    help 显示所有操作命令
    ls path 使用 ls 命令来查看当前znode的子节点 -w 监听子节点变化 -s 附加次级信息
    create 普通创建 -s 含有序列 -e 临时(重启或者超时消失)
    get path 获得节点的值 -w 监听节点内容变化 -s 附加次级信息
    set 设置节点的具体值
    stat 查看节点状态
    delete 删除节点
    deleteall 递归删除节点

    3.1 启动客户端

    [atguigu@hadoop103 zookeeper-3.4.10]$ bin/zkCli.sh
    

    3.2 显示所有操作命令

    [zk: localhost:2181(CONNECTED) 1] help
    

    3.3 查看当前znode中所包含的内容

    [zk: localhost:2181(CONNECTED) 0] ls /
    [zookeeper]
    

    3.4 查看当前节点详细数据

    [zk: localhost:2181(CONNECTED) 1] ls2 /
    [zookeeper]
    cZxid = 0x0
    ctime = Thu Jan 01 08:00:00 CST 1970
    mZxid = 0x0
    mtime = Thu Jan 01 08:00:00 CST 1970
    pZxid = 0x0
    cversion = -1
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 0
    numChildren = 1
    

    3.5 分别创建2个普通节点

    [zk: localhost:2181(CONNECTED) 3] create /sanguo "jinlian"
    Created /sanguo
    [zk: localhost:2181(CONNECTED) 4] create /sanguo/shuguo "liubei"
    Created /sanguo/shuguo
    

    3.6 获得节点的值

    [zk: localhost:2181(CONNECTED) 6] get /sanguo/shuguo
    liubei
    cZxid = 0x100000004
    ctime = Wed Aug 29 00:04:35 CST 2018
    mZxid = 0x100000004
    mtime = Wed Aug 29 00:04:35 CST 2018
    pZxid = 0x100000004
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 6
    numChildren = 0
    

    3.7 创建短暂节点

    [zk: localhost:2181(CONNECTED) 7] create -e /sanguo/wuguo "zhouyu"
    Created /sanguo/wuguo
    

    (1)在当前客户端是能查看到的

    [zk: localhost:2181(CONNECTED) 3] ls /sanguo 
    [wuguo, shuguo]
    

    (2)退出当前客户端然后再重启客户端

    [zk: localhost:2181(CONNECTED) 12] quit
    [atguigu@hadoop104 zookeeper-3.4.10]$ bin/zkCli.sh
    

    (3)再次查看根目录下短暂节点已经删除

    [zk: localhost:2181(CONNECTED) 0] ls /sanguo
    [shuguo]
    

    3.8 创建带序号的节点

    ​ (1)先创建一个普通的根节点/sanguo/weiguo

    [zk: localhost:2181(CONNECTED) 1] create /sanguo/weiguo "caocao"
    Created /sanguo/weiguo
    

    ​ (2)创建带序号的节点

    [zk: localhost:2181(CONNECTED) 2] create -s /sanguo/weiguo/xiaoqiao "jinlian"
    Created /sanguo/weiguo/xiaoqiao0000000000
    [zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo/daqiao "jinlian"
    Created /sanguo/weiguo/daqiao0000000001
    [zk: localhost:2181(CONNECTED) 4] create -s /sanguo/weiguo/diaocan "jinlian"
    Created /sanguo/weiguo/diaocan0000000002
    

    如果原来没有序号节点,序号从0开始依次递增。如果原节点下已有2个节点,则再排序时从2开始,以此类推。

    3.9 修改节点数据值

    [zk: localhost:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"
    

    3.10 节点的值变化监听

    ​ (1)在hadoop104主机上注册监听/sanguo节点数据变化

    [zk: localhost:2181(CONNECTED) 26] [zk: localhost:2181(CONNECTED) 8] get /sanguo watch
    

    ​ (2)在hadoop103主机上修改/sanguo节点的数据

    [zk: localhost:2181(CONNECTED) 1] set /sanguo "xisi"
    

    ​ (3)观察hadoop104主机收到数据变化的监听

    WATCHER::
    WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo
    

    3.11 节点的子节点变化监听(路径变化)

    ​ (1)在hadoop104主机上注册监听/sanguo节点的子节点变化

    [zk: localhost:2181(CONNECTED) 1] ls /sanguo watch
    [aa0000000001, server101]
    

    ​ (2)在hadoop103主机/sanguo节点上创建子节点

    [zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi"
    Created /sanguo/jin
    

    ​ (3)观察hadoop104主机收到子节点变化的监听

    WATCHER::
    WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo
    

    3.12 删除节点

    [zk: localhost:2181(CONNECTED) 4] delete /sanguo/jin
    

    3.13 递归删除节点

    [zk: localhost:2181(CONNECTED) 15] rmr /sanguo/shuguo
    

    3.14 查看节点状态

    [zk: localhost:2181(CONNECTED) 17] stat /sanguo
    cZxid = 0x100000003
    ctime = Wed Aug 29 00:03:23 CST 2018
    mZxid = 0x100000011
    mtime = Wed Aug 29 00:21:23 CST 2018
    pZxid = 0x100000014
    cversion = 9
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 4
    numChildren = 1
    

    第4章 Zookeeper的API应用

    4.1 创建一个Maven工程

    4.2 添加pom文件

    <dependencies>
    		<dependency>
    			<groupId>junit</groupId>
    			<artifactId>junit</artifactId>
    			<version>RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.logging.log4j</groupId>
    			<artifactId>log4j-core</artifactId>
    			<version>2.8.2</version>
    		</dependency>
    		<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
    		<dependency>
    			<groupId>org.apache.zookeeper</groupId>
    			<artifactId>zookeeper</artifactId>
    			<version>3.4.10</version>
    		</dependency>
    </dependencies>
    

    拷贝log4j.properties文件到项目根目录

    需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

    log4j.rootLogger=INFO, stdout 
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n 
    log4j.appender.logfile=org.apache.log4j.FileAppender 
    log4j.appender.logfile.File=target/spring.log 
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout 
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]
    

    4.3 API应用

    创建客户端
    创建子节点
    查看子节点不监听
    查看子节点并且监听子节点变化
    获取节点的值
    获取节点的值,并监听
    设置节点的值
    删除没有子节点的节点
    删除非空节点,利用递归实现
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    import java.io.IOException;
    import java.util.List;
    
    /**
     * @author layne
     * @create 2020-06-22 15:24
     */
    public class Zookeeper {
        private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
        private int sessionTimeout = 10000;
        private ZooKeeper zkClient;
    
    
        @Before
        public void init() throws IOException {
            //参数解读 1 链接字符串,逗号分隔,中间不能有空格  2 会话超时时间,自定义,一般给5个心跳  3 当前客户端默认监听器对象,一般不用
            zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
    
                }
            });
        }
    
    
        @After
        public void after() throws InterruptedException {
            zkClient.close();
        }
    
        /**
         * 创建子节点
         */
        @Test
        public void create() throws KeeperException, InterruptedException {
            //参数解读  1 创建的节点路径  2 节点的值的字节数组   3 节点的权限,这个直接选open  4 节点类型
            String path = zkClient.create("/atguigu1", "shangguigu".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println(path);
    
            Thread.sleep(Long.MAX_VALUE);
        }
    
        /**
         * 查看子节点不监听
         */
        @Test
        public void ls() throws KeeperException, InterruptedException {
            List<String> children = zkClient.getChildren("/", false);
            System.out.println(children);
        }
    
        /**
         * 查看子节点并且监听子节点变化
         *
         */
        @Test
        public void lsAndWatch() throws KeeperException, InterruptedException {
            List<String> children = zkClient.getChildren("/atguigu", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println(event.toString());
                }
            });
    
            System.out.println(children);
    
            Thread.sleep(Long.MAX_VALUE);
    
        }
    
        /**
         * 获取节点的值
         */
        @Test
        public void get() throws KeeperException, InterruptedException {
            //获取节点的值之前,应该先判断你要获取的节点是否存在
            Stat stat = zkClient.exists("/atguigu1", false);
            if (stat == null) {
                System.out.println("节点不存在。。。");
                return;
            }
    
            byte[] data = zkClient.getData("/atguigu1", false, stat);
            String datas = new String(data);
            System.out.println(datas);
        }
    
    
        /**
         * 获取节点的值,并监听
         */
        @Test
        public void getAndWatch() throws KeeperException, InterruptedException {
            //获取节点的值之前,应该先判断你要获取的节点是否存在
            Stat stat = zkClient.exists("/atguigu", false);
            if (stat == null) {
                System.out.println("节点不存在。。。");
                return;
            }
    
            byte[] data = zkClient.getData("/atguigu", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("0421 is the best of atguigu...");
                }
            }, stat);
    
            String datas = new String(data);
            System.out.println(datas);
    
            //线程阻塞,保证控制态能看到当前回调函数执行的结果
            Thread.sleep(Long.MAX_VALUE);
        }
    
        /**
         * 设置节点的值
         */
    
        @Test
        public void set() throws KeeperException, InterruptedException {
            Stat stat = zkClient.exists("/atguigu", false);
            if (stat == null) {
                System.out.println("节点不存在");
                return;
            }
    
            zkClient.setData("/atguigu","sgg".getBytes(),stat.getVersion());
        }
    
        /**
         * 删除没有子节点的节点
         */
    
        @Test
        public void delete() throws KeeperException, InterruptedException {
            Stat stat = zkClient.exists("/test", false);
            if (stat == null) {
                System.out.println("节点不存在");
                return;
            }
    
            zkClient.delete("/test",stat.getVersion());
        }
    
    
        /**
         * 删除非空节点,利用递归实现
         */
        @Test
        public void testDeleteAll() throws KeeperException, InterruptedException {
            deleteAll("/sanguo",zkClient);
        }
    
        public void deleteAll(String path,ZooKeeper zkClient) throws KeeperException, InterruptedException {
            Stat stat = zkClient.exists(path, false);
            if (stat == null) {
                System.out.println("节点不存在");
                return;
            }
    
            //先获取传入的节点的子节点,看下是否有子节点
            List<String> children = zkClient.getChildren(path, false);
    
            if (children.size() == 0) {
                //说明传入的节点没有子节点,可以直接删除
                zkClient.delete(path,stat.getVersion());
            }else {
                //如果传入的节点有子节点,循环子节点,判断子节点还有没有子节点
                for (String child : children) {
                    //删除子节点,但是不知道子节点下面还有没有子节点,所以要递归调用本方法
                    deleteAll(path+"/"+child,zkClient);     // /atguigu   child=aaa   /atguigu/aaa
                }
                //删除完所有的子节点以后,记得删除传入的节点
                zkClient.delete(path,stat.getVersion());
            }
    
        }
    }
    
    

    第5章 Zookeeper的内部原理

    5.1 节点类型

    1. 持久型节点
    2. 临时节点

    5.2 Stat结构体

    1)czxid-创建节点的事务zxid

    每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID。

    事务ID是ZooKeeper中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。

    2)ctime - znode被创建的毫秒数(从1970年开始)

    3)mzxid - znode最后更新的事务zxid

    4)mtime - znode最后修改的毫秒数(从1970年开始)

    5)pZxid-znode最后更新的子节点zxid

    6)cversion - znode子节点变化号,znode子节点修改次数

    7)dataversion - znode数据变化号

    8)aclVersion - znode访问控制列表的变化号

    9)ephemeralOwner- 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。

    10)dataLength- znode的数据长度

    11)numChildren - znode子节点数量

    5.3监听器原理(面试重点)

    5.4 选举机制(面试重点)

    投票原则
    自私原则:第一轮投票肯定投自己
    墙头草原则:第一轮投票完以后,大家会交换选票信息,这个时候会把票改投给那个厉害的人
    厉害怎么判断?
    (myid,zxid)
    先判断zxid,zxid大的赢,因为zxid越大,说明当前服务器的数据越完整
    如果zxid相等,那就看myid
    
    1)服务器一台一台的按照顺序启动
    
    机器1启动:会发起一轮选举,机器1把票投给自己,然后发现选票个数不够集群个数的半数以上,机器1选举失败,然后状态改为LOOKING
    机器2启动:会发起一轮选举,第一轮投票,机器1投自己,机器2投自己,然后会交换选票,这时,机器1发现机器2myid比自己大,更改选票信息
    将自己的票改投给机器2,然后机器2发现自己手里得了两票,不够集群半数机制,选举失败,两台机器重新变为looking
    机器3启动:会发起一轮选举,第一轮投票,各自投各自(自私原则),然后交换选票信息,机器1和2发现机器3更厉害(zxid一直,但是人家的myid更大),
    此时机器1和2将自己的选票改为3,机器3得票3票,超过了集群个数的半数,机器3当选leader,把自己的状态改为LEADING,机器1和2FOLLOWING
    机器4启动;会发起一轮选举,第一轮投票,机器4投自己,但是因为机器1,2,3的状态已经不再是LOOKING,因此机器1 2 3不再更改选票信息,机器4少数
    服从多数,将自己的状态改为FOLLOWING
    机器5启动:会发起一轮选举,第一轮投票,机器5投给自己,但是因为机器1 2 3 4的状态不再是LOOKING,不会更改自己的选票信息,机器5少数服从多数
    将自己的状态改为FOLLOWING。
    
    2)五台机器一起启动
    五台机器发起一轮选举,第一次投票,各自投各自(自私原则),然后交换选票信息,结果 1 2 3 4号机器发现5号机器最厉害,就会
    把自己的选票信息改投5号,5号已绝对优势当选leader
    

    5.5 写数据流程

        1)客户端连接zk集群的任意一台机器,发送写请求
        2)如果客户端连接的服务器不是leader。则连接的这台服务器,会将写请求发送给leader
        3)当leader接收到写请求以后,会将当次写操作构造成一个事务,对应一个zxid,然后将这个写的操作广播给所有服务器
        4)当各台服务器接收到leader发送的写请求后,会将此次写操作维护到一个待写列表里,然后给leader回复可以写
        5)leader收到半数以上太服务器回复的可以写的信息后,会再次广播,执行的操作
        6)此时各个服务器真正的执行写操作
        7)最后由客户端连接的那台服务器通知客户端,写操作成功。
       
    
  • 相关阅读:
    use paramiko to connect remote server and execute command
    protect golang source code
    adjust jedi vim to python2 and python3
    install vim plugin local file offline
    add swap file if you only have 1G RAM
    datatables hyperlink in td
    django rest framework custom json format
    【JAVA基础】网络编程
    【JAVA基础】多线程
    【JAVA基础】String类的概述和使用
  • 原文地址:https://www.cnblogs.com/wh984763176/p/13179158.html
Copyright © 2011-2022 走看看