zoukankan      html  css  js  c++  java
  • kafkaspout以及kafkabolt的最简实例

            这个实例中有一个KafkaSpout,一个KafkaBolt,一个自定义Bolt QueryBolt。数据流程是KafkaSpout从topic为recommend的消息队列中取出String类型的消息,发送给QueryBolt。QueryBolt不做任何处理,直接转发给KafkaBolt,只把经过的消息存储在list。QueryBolt中自定义了cleanup方法,该方法在topology被杀死时调用,方法中把list中的所有数据打印在"C://"+this+".txt"文件中。KafkaBolt将接收到的数据直接转存在主题为recevier的kafka消息队列中。
            代码结构:
             
            以下是详细代码:
    首先是topology.java
    import java.util.HashMap;
    import java.util.Map;
     
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    //import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.topology.TopologyBuilder;
    import storm.kafka.BrokerHosts;
    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.ZkHosts;
    import storm.kafka.bolt.KafkaBolt;
     
    public class topology {
        public static void main(String [] args) throws Exception{
            //配置zookeeper 主机:端口号
            BrokerHosts brokerHosts =new ZkHosts("110.64.76.130:2181,110.64.76.131:2181,110.64.76.132:2181");
            //接收消息队列的主题
            String topic="recommend";
            //zookeeper设置文件中的配置,如果zookeeper配置文件中设置为主机名:端口号 ,该项为空
            String zkRoot="";
            //任意
            String spoutId="zhou";
            SpoutConfig spoutConfig=new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
            //设置如何处理kafka消息队列输入流
            spoutConfig.scheme=new SchemeAsMultiScheme(new MessageScheme());
            Config conf=new Config();
            //不输出调试信息
            conf.setDebug(false);
            //设置一个spout task中处于pending状态的最大的tuples数量
            conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
            Map<String, String> map=new HashMap<String,String>();
            // 配置Kafka broker地址
            map.put("metadata.broker.list""master:9092,slave1:9092,slave2:9092");
            // serializer.class为消息的序列化类
            map.put("serializer.class""kafka.serializer.StringEncoder");
            conf.put("kafka.broker.properties", map);
            // 配置KafkaBolt生成的topic
            conf.put("topic""receiver");
            TopologyBuilder builder =new TopologyBuilder();
            builder.setSpout("spout"new KafkaSpout(spoutConfig),1);
            builder.setBolt("bolt1"new QueryBolt(),1).setNumTasks(1).shuffleGrouping("spout");
            builder.setBolt("bolt2"new KafkaBolt<String, String>(),1).setNumTasks(1).shuffleGrouping("bolt1");
            if(args.length==0){
                LocalCluster cluster = new LocalCluster();
                //提交本地集群
                cluster.submitTopology("test", conf, builder.createTopology());
                //等待6s之后关闭集群
                Thread.sleep(6000);
                //关闭集群
                cluster.shutdown();
            }
            StormSubmitter.submitTopology("test", conf, builder.createTopology());
        }
    }
    然后是MessageScheme.java
    import java.io.UnsupportedEncodingException;
    import java.util.List;
     
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
     
    import backtype.storm.spout.Scheme;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
     
    public class MessageScheme implements Scheme {
        private static final Logger LOGGER = LoggerFactory.getLogger(MessageScheme.class);
        public List<Object> deserialize(byte[] ser) {
            try {
                //从kafka中读取的值直接序列化为UTF-8的str
                String mString=new String(ser, "UTF-8");
                return new Values(mString);
            catch (UnsupportedEncodingException e) {
                // TODO Auto-generated catch block
                LOGGER.error("Cannot parse the provided message");
                 
            }
            return null;
        }
     
        public Fields getOutputFields() {
            // TODO Auto-generated method stub
            return new Fields("msg");
        }
     
    }
    最后是QueryBolt.java
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.PrintStream;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.Vector;
     
     
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
     
    public class QueryBolt implements IRichBolt {
         
        List<String> list;
        OutputCollector collector;
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
             
            list=new ArrayList<String>();
            this.collector=collector;
             
        }
     
        public void execute(Tuple input) {
            // TODO Auto-generated method stub
            String str=(String) input.getValue(0);
            //将str加入到list
            list.add(str);
            //发送ack
            collector.ack(input);
            //发送该str
            collector.emit(new Values(str));
        }
     
        public void cleanup() {//topology被killed时调用
            //将list的值写入到文件
            try {
                FileOutputStream outputStream=new FileOutputStream("C://"+this+".txt");
                PrintStream p=new PrintStream(outputStream);
                p.println("begin!");
                p.println(list.size());
                for(String tmp:list){
                    p.println(tmp);
                }
                p.println("end!");
                try {
                    p.close();
                    outputStream.close();
                catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                 
            catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
     
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
         
            declarer.declare(new Fields("message"));
             
        }
     
        public Map<String, Object> getComponentConfiguration() {
            // TODO Auto-generated method stub
            return null;
        }
     
    }
    问题1:zkRoot如何设置?非常重要,设置错误无法正确从kafka消息队列中取出数据。
    观察 server.properties 文件:
    zookeeper.connect=master:2181,slave1:2181,slave2:2181
    此时zkRoot="";
    如果zookeeper.connect=master:2181,slave1:2181,slave2:2181/ok
    此时zkRoot等于"/ok"
    问题2:为什么KafkaSpout启动之后,不能从头开始读起,而是自动跳过了kafka消息队列之前的内容,只处理KafkaSpout启动之后消息队列中新增的值?
    因为KafkaSpout默认跳过了Kafka消息队列之前就存在的值,如果要从头开始处理,那么需要设置spoutConfig.forceFromStart=true,即从offset最小的开始读起。
     
    附录:KafkaSpout中关于 SpoutConfig的相关定义
    SpoutConfig继承自KafkaConfig。由于SpoutConfig和KafkaConfig所有的instance field全是public, 因此在使用构造方法后,可以直接设置各个域的值。
     
    public class SpoutConfig extends KafkaConfig implements Serializable {
        public List<String> zkServers = null//记录Spout读取进度所用的zookeeper的host
        public Integer zkPort = null;//记录进度用的zookeeper的端口
        public String zkRoot = null;//进度信息记录于zookeeper的哪个路径下
        public String id = null;//进度记录的id,想要一个新的Spout读取之前的记录,应把它的id设为跟之前的一样。
        public long stateUpdateIntervalMs = 2000;//用于metrics,多久更新一次状态。
     
        public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
            super(hosts, topic);
            this.zkRoot = zkRoot;
            this.id = id;
        }
    }
    public class KafkaConfig implements Serializable {
     
        public final BrokerHosts hosts; //用以获取Kafka broker和partition的信息
        public final String topic;//从哪个topic读取消息
        public final String clientId; // SimpleConsumer所用的client id
     
        public int fetchSizeBytes = 1024 1024//发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小
        public int socketTimeoutMs = 10000;//与Kafka broker的连接的socket超时时间
        public int fetchMaxWait = 10000;   //当服务器没有新消息时,消费者会等待这些时间
        public int bufferSizeBytes = 1024 1024;//SimpleConsumer所使用的SocketChannel的读缓冲区大小
        public MultiScheme scheme = new RawMultiScheme();//从Kafka中取出的byte[],该如何反序列化
        public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起
        public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offset
        public long maxOffsetBehind = 100000;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息
        public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使用startOffsetTime
        public int metricsTimeBucketSizeInSecs = 60;//多长时间统计一次metrics
     
        public KafkaConfig(BrokerHosts hosts, String topic) {
            this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
        }
     
        public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
            this.hosts = hosts;
            this.topic = topic;
            this.clientId = clientId;
        }
     
    }
  • 相关阅读:
    fatal error LNK1123: 转换到 COFF 期间失败:文件无效或损坏
    CI:模拟进化与遗传算法
    贝叶斯网络
    朴素贝叶斯模型
    概率与不确定性(乘法法则与贝叶斯法则)
    关于“启发式”搜索的一个形象解释
    Python 字符串与数字拼接报错
    无法启动此程序因为计算机中丢失 xxx.dll
    Python 以指定的概率选取元素
    验证码之SimpleCaptcha (二)
  • 原文地址:https://www.cnblogs.com/zhoudayang/p/5066956.html
Copyright © 2011-2022 走看看