zoukankan      html  css  js  c++  java
  • (三)storm-kafka源代码走读之怎样构建一个KafkaSpout

    上一节介绍了config的相关信息,这一节说下,这些參数各自是什么。在zookeeper中的存放路径是如何的,之前QQ群里有非常多不知道该怎么传入正确的參数来new 一个kafkaSpout,其主要还是參数传递正确就可。


    看SpoutConfig的构造函数

    public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
            super(hosts, topic);
            this.zkRoot = zkRoot;
            this.id = id;
        }

    须要一个BrokerHosts,看代码:

    public class ZkHosts implements BrokerHosts {
        private static final String DEFAULT_ZK_PATH = "/brokers";
    
        public String brokerZkStr = null;
        public String brokerZkPath = null; // e.g., /kafka/brokers
        public int refreshFreqSecs = 60;
    
        public ZkHosts(String brokerZkStr, String brokerZkPath) {
            this.brokerZkStr = brokerZkStr;
            this.brokerZkPath = brokerZkPath;
        }
    
        public ZkHosts(String brokerZkStr) {
            this(brokerZkStr, DEFAULT_ZK_PATH);
        }
    }
    须要brokerZKStr,这个事实上就是hosts列表,多个host以逗号隔开。由于zookeeper解析string时是以逗号分隔的。这里附上zookeeper的解析代码

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                boolean canBeReadOnly)
            throws IOException
        {
            LOG.info("Initiating client connection, connectString=" + connectString
                    + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
    
            watchManager.defaultWatcher = watcher;
    
            ConnectStringParser connectStringParser = new ConnectStringParser(
                    connectString);
            HostProvider hostProvider = new StaticHostProvider(
                    connectStringParser.getServerAddresses());
            cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                    hostProvider, sessionTimeout, this, watchManager,
                    getClientCnxnSocket(), canBeReadOnly);
            cnxn.start();
        }

    当中主要StringParser做解析的,看俺怎样解析的就知道了

    public ConnectStringParser(String connectString) {
            // parse out chroot, if any
            int off = connectString.indexOf('/');
            if (off >= 0) {
                String chrootPath = connectString.substring(off);
                // ignore "/" chroot spec, same as null
                if (chrootPath.length() == 1) {
                    this.chrootPath = null;
                } else {
                    PathUtils.validatePath(chrootPath);
                    this.chrootPath = chrootPath;
                }
                connectString = connectString.substring(0, off);
            } else {
                this.chrootPath = null;
            }
    
            String hostsList[] = connectString.split(",");
            for (String host : hostsList) {
                int port = DEFAULT_PORT;
                int pidx = host.lastIndexOf(':');
                if (pidx >= 0) {
                    // otherwise : is at the end of the string, ignore
                    if (pidx < host.length() - 1) {
                        port = Integer.parseInt(host.substring(pidx + 1));
                    }
                    host = host.substring(0, pidx);
                }
                serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
            }
        }

    好了,这里就讲到这了。

    刚才说到brokerZKStr须要。另一个參数就是zkpath,这个能够自己定,也有个默认值 “/brokers”

    SpoutConfig还有个zkroot,这个zkroot事实上就是Consumer端消费的信息存放地方,好了给个样例:

    String topic = “test”;  //
            String zkRoot = “/kafkastorm”; //
            String spoutId = “id”; //读取的status会被存在,/kafkastorm/id以下,所以id相似consumer group
            
            BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181"); // 这里使用默认的/brokers
    
            SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 下一节介绍 scheme
            
            /*spoutConfig.zkServers = new ArrayList<String>(){{ // 仅仅有在local模式下须要记录读取状态时。才须要设置
                add("10.118.136.107");
            }};
            spoutConfig.zkPort = 2181;*/
    spoutConfig.forceFromStart = true; 
    spoutConfig.startOffsetTime = -1;//从最新的開始消费 
    spoutConfig.metricsTimeBucketSizeInSecs = 6; 
    builder.setSpout(SqlCollectorTopologyDef.KAFKA_SPOUT_NAME, new KafkaSpout(spoutConfig), 1);


    By default, the offsets will be stored in the same Zookeeper cluster that Storm uses. You can override this via your spout config like this:

    spoutConfig.zkServers = ImmutableList.of("otherserver.com");
    spoutConfig.zkPort = 2191;

    这里就成功建了一个KafkaSpout。假设项目执行成功的话,

    能够到zk master上看下相关信息,

    ./bin/zkCli.sh -server 10.1.110.24:2181



    而对于StaticHosts来说,看官方解释:

    StaticHosts

    This is an alternative implementation where broker -> partition information is static. In order to construct an instance of this class you need to first construct an instance of GlobalPartitionInformation.

        Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
        Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
        Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
        GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
        partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0
        partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1
        partitionInfo.addPartition(2, brokerForPartition2);//mapping form partition 2 to brokerForPartition2
        StaticHosts hosts = new StaticHosts(partitionInfo);
    个人觉得是须要开发者,自己知道partition与broker之间的相应关系,正确关联起来。而storm-kafka 0.9.0.1的版本号是。不须要指定,我仅仅须要传入zkServer list,partition总数,由kafkautil利用两个for(遍历全部broker和partition)循环,暂时生成Consumer去连接消费一下试试,假设有数据,那么就把partitionId和brokerHost关系存到Map中去。可想而知0.9.3-rc1为什么要改成这样了。

    假设该broker没有该partition信息,后果会如何???笔者没有測试过,有測试过的请留言,说一下情况。

    Reference

    http://www.cnblogs.com/fxjwind/p/3808346.html

  • 相关阅读:
    注意力机制在CV领域的应用之SEnet
    知识蒸馏技术原理
    batchsize如何影响模型的性能
    docker网络 macvlan
    docker pull使用 代理
    auditd重启失败
    3.Golang的包导入
    2.golang应用目录结构和GOPATH概念
    使用git版本管理时的免密问题
    1.go语言目录结构
  • 原文地址:https://www.cnblogs.com/gcczhongduan/p/5319763.html
Copyright © 2011-2022 走看看