zoukankan      html  css  js  c++  java
  • Storm常见模式——批处理

    Storm对流数据进行实时处理时,一种常见场景是批量一起处理一定数量的tuple元组,而不是每接收一个tuple就立刻处理一个tuple,这样可能是性能的考虑,或者是具体业务的需要。

    例如,批量查询或者更新数据库,如果每一条tuple生成一条sql执行一次数据库操作,数据量大的时候,效率会比批量处理的低很多,影响系统吞吐量。

    当然,如果要使用Storm的可靠数据处理机制的话,应该使用容器将这些tuple的引用缓存到内存中,直到批量处理的时候,ack这些tuple。

    下面给出一个简单的代码示例:

    现在,假设我们已经有了一个DBManager数据库操作接口类,它至少有两个接口:

    (1)getConnection(): 返回一个java.sql.Connection对象;

    (2)getSQL(Tuple tuple): 根据tuple元组生成数据库操作语句。

    为了在Bolt中缓存一定数量的tuple,构造Bolt时传递int n参数赋给Bolt的成员变量int count,指定每个n条tuple批量处理一次。

    同时,为了在内存中缓存缓存Tuple,使用java concurrent中的ConcurrentLinkedQueue来存储tuple,每当攒够count条tuple,就触发批量处理。

    另外,考虑到数据量小(如很长时间内都没有攒够count条tuple)或者count条数设置过大时,因此,Bolt中加入了一个定时器,保证最多每个1秒钟进行一次批量处理tuple。

    下面是Bolt的完整代码(仅供参考):

    复制代码
    import java.util.Map;
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.sql.Connection;
    import java.sql.SQLException;
    import java.sql.Statement;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Tuple;
    
    public class BatchingBolt implements IRichBolt {
        private static final long serialVersionUID = 1L;
        private OutputCollector collector;
        private Queue<Tuple> tupleQueue = new ConcurrentLinkedQueue<Tuple>();
        private int count;
        private long lastTime;
        private Connection conn;
    
        public BatchingBolt(int n) {
            count = n; //批量处理的Tuple记录条数
            conn = DBManger.getConnection(); //通过DBManager获取数据库连接
            lastTime = System.currentTimeMillis(); //上次批量处理的时间戳
        }
    
        @Override
        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void execute(Tuple tuple) {
            tupleQueue.add(tuple);
            long currentTime = System.currentTimeMillis();
            // 每count条tuple批量提交一次,或者每个1秒钟提交一次
            if (tupleQueue.size() >= count || currentTime >= lastTime + 1000) {
                Statement stmt = conn.createStatement();
                conn.setAutoCommit(false);
                for (int i = 0; i < count; i++) {
                    Tuple tup = (Tuple) tupleQueue.poll();
                    String sql = DBManager.getSQL(tup); //生成sql语句
                    stmt.addBatch(sql); //加入sql
                    collector.ack(tup); //进行ack
                }
                stmt.executeBatch(); //批量提交sql
                conn.commit();
                conn.setAutoCommit(true);
                System.out.println("batch insert data into database, total records: " + count);
                lastTime = currentTime;
            }
        }
    
        @Override
        public void cleanup() {
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            // TODO Auto-generated method stub
            return null;
        }
    }
    复制代码
  • 相关阅读:
    Dom 动态添加元素节点总结
    SQLserver 获取当前时间
    Var的用法解析
    JS 转换HTML转义符
    20210602---为了养老,全力以赴,珍惜每一秒钟。决心不够大,不够担心未来,现在虽然挣得少,但是有吃有喝,满足了。
    20210601——今天开始狠狠奖励自己,而且是必须玩的这种。做事投入你就会快乐。
    20210531兴趣
    20210527学习笔记--没成功的唯一原因是,想得和说的太多 做的太少。
    20210526--今年还有半年,抓紧一切时间学习
    20210524学习笔记---从记日记开始已经有3个月了,浪费时间的痕迹渐渐清醒
  • 原文地址:https://www.cnblogs.com/yepei/p/4801903.html
Copyright © 2011-2022 走看看