zoukankan      html  css  js  c++  java
  • 浅谈storm

    storm
    分布式,可容错的实时计算框架,低延迟能做到毫秒级的响应,storm进程是常驻内存,Hadoop是不断启停的,storm中的数据不经过磁盘,都在内存中,处理完成后就没有了,但是可以写到数据库中,数据的交换经过网络,避免了磁盘io的开销
    storm的集群需要设置多大还有计算能力如何,一般是看数据吞吐量(每秒处理多少数据)
    Hadoop和storm的区别
        Hadoop是批处理而且每次处理前需要进行资源分配,storm是实时处理一致运行在内存中,都可以进行分布式计算。
        storm适用于的场景:数据一直源源不断的供storm处理
        Hadoop适用场景:一定量的比较大的数据直接交给Hadoop进行处理
        Hadoop的数据来源一般来自于hdfs中,storm的数据来源一般是流式的数据
    storm可以用来做监控或者实时业务的计算
    架构
        nimbus主节点,进程,进行集群管理,nimbus定时通过zookeeper获取supervisor和worker的心跳信息,从而知道集群中有多少节点处于什么状态,运行着什么任务,如果发现worker挂了会重新启动一个;调度topology(也可以说是任务),因为client提交任务是通过nimbus进行提交的,然后将topology名字、启动时间、状态、几个worker执行每个worker中需要几个executor都发送给zk,supervisor然后从zk中获得任务启动worker并执行;处理一些接口请求,比如任务的sumbit、kill、rebalance等;提供查询集群状态的thrift接口,可通过这个接口返回一些信息,stormUI就是从该结构获取数据的;提供文件的上传下载的功能
        supervisor从节点,进程(一个节点上可以启动多个supervisor,但是也可以看作是节点级别的,因为启动多个的话使用的槽数和一个是一样的),当接到任务后用于启停worker,监控worker,会定期将自己的信息发送给zk
        worker处理数据的节点,一个jvm进程,如果不kill掉就不会停止,会一直运行下去,用来启动executor(一个或多个);会定期去zk上查阅nimbus写的调度信息,然后看看有没有它要做的事,如果有就启动(先下载任务的jar包并解压,然后创建相应目录,后面就可以用这个jar包了);worker还负责worker和worker之间的数据传输,分布式肯定会有数据传输;将自己的信息定期往本地系统和zk中各写一份
        executor实际干活的线程,它的个数就是并行度;它会创建spout和bolt对象并启动执行线程(执行nextTuple和execute)和传输线程(用来进行一个worker中executor之间的数据传输)
            如果是A-worker中executor往B-worker中传输数据过程:A-worker中executor把数据先放到线程队列中然后传给A-worker的worker_transfer_thread消息队列,然后把数据传给B-worker的worker_receive_thread消息队列,然后B-worker将消息发送给B-worker中executor的消息队列,最后executor从这个消息队列中获取使用
        zookeeper,主从之间用zookeeper进行连接,但是需要3.4.5以上的版本,因为支持磁盘快照和namenode的定期删除,避免磁盘被打满,进行消息共享作用,里面存储了状态信息,调度信息,心跳
        zookeeper目录中关心storm信息的存放位置
            /storm/supervisor/supervisor-id        supervisor心跳
            /storm/storms/storm-id        topology基本信息
            /storm/assignments/storm-id        topology调度信息
            /storm/workerbeats/storm-id/node-port    worker心跳
            /storm/errors/storm-id/component-id        spout/bolt错误信息
    编程模型
        DAG有向无环图,描述的是分布式框架计算任务的流程(有方向,没有环状流程)
        spout数据来源,循环调度,每次调用都会从某个地方获取数据
        bolt处理业务逻辑的单元
        stream就是一个流,每个流会有一个id,如果没有指定id就是default流,每个spout/bolt都有一个默认的流,在代码emit中的第一个参数就是流ID(在ack消息保证机制中会用到)
    数据传输基于netty网络框架,效率比较高
    高可靠性
        异常处理
        消息可靠性的保证机制ack
    可维护性
        可通过webUI界面对程序进行监控
    storm两种模式
        实时请求,同步应答服务的请求,实时发消息实时收到结果,往storm中发送消息可以秒级之内得到结果。需要用DRPCServer(也是storm的一个组件)。
        client将数据发送给DRPCServer,DRPCServer再把数据发送给spout,然后数据再经过一系列的bolt的逻辑处理,最后得到结果,这时会将结果传回给DRPCServer,DRPCServer再将结果返回给client。
        DRPCServer的功能就是接收和返回请求给client,还有就是把数据交给spout,获取storm计算的结果。
        异步流式处理,不需要实时得到结果
            client发送一条数据交给消息队列,消息队列再把消息发送给spout,经过一系列的bolt后得到结果,将结果可以存储在数据库中。这种情况client不需要得到结果(比如client产生的日志发送给storm,只需要发送就可以了如果要做报表,client可以从数据库中获取数据展现在页面上)
    分组策略
        shuffleGrouping随机分
        fieldsGrouping按照指定的词取hash,然后取模,这样就保证相同的field会分到相同的线程中,如果某个词的出现量非常大就会产生数据倾斜问题,那么可以使用shuffleGrouping让每个线程中都有这个词,然后后面再用filedGrouping这样filedGrouping只是统计一下各线程中词的个数没有其他计算逻辑
        AllGrouping广播发送,所有的bolts都会收到一份完整的数据
        globalGrouping全局分组,整个stream被分配到storm中的一个bolt的id最低的task(一个spout/bolt的对象,每个executor中都有多个对象,这个对象就是task)
        NoneGrouping不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把使用nonegrouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(如果可能的话)。
        DirectGrouping指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 DirectStream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)
        Localorshufflegrouping本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致
    并发模型
        cluster集群
        supervisor节点,虽然是一个进程但是启动多个占用的槽位和一个是一样的,所以可以看作是一个节点
        worker是jvm进程,启动一个topology就会启动几个worker
        executor是一个线程,可以构建一个或多个task
        task对象
    单点性能问题
        当supervisor个数比较多的时候,nimbus还是会有瓶颈的,网卡很快会打满,遇到这种问题可以将文件分发变成p2p的.
        如果nimbus挂掉其实也是没有关系的,只要任务已经提交给supervisor中的worker进行执行了,那么就和nimbus无关了,发现之后再启动起来就可以了,因为storm是松耦合的建构设计nimbus和supervisor没有直接联系,是通过zk进行信息转递的
    ack机制
        在storm中一个任务,spout会通过emit向下游发送一个数据,而这条数据往往需要经过多个bolt处理,如果跑到某个bolt失败或者超时(默认30s,具体参看defaults.yaml中的 topology.message.timeout.secs: 30也可以在定义 topology时,通过conf.setMessageTimeoutSecs设置,代码的优先级高一些)了,
        那么可以通过ack机制进而保证每个tuple都能被toplology处理,或者说保证能跑完一次完整的toplology。
        只要检测到数据没有在所有touple上都跑完就重新跑一次就可以了,ack会先记录一下跑过哪几个bolt,如果没跑完所有的bolt就让spout重新发一次,它是用了一个很小的数据作为了ack的值,
        只需要20字节进行追踪数据有没有把所有的bolt都跑一次
        acker跟踪算法的原理:acker对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0,然后每发射一个Tuple或Ack一个Tuple时,这个Tuple的id就要跟这个校验值异或一下,并且把得到的值更新为ack-val的新值。那么假设每个发射出去的Tuple都被ack了,那么最后ack-val的值就一定是0。Acker就根据ack-val是否为0来判断是否完全处理,如果为0则认为已完全处理。
        Storm的Bolt有BasicBolt和RichBolt,在BasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack(有一定的条件,需要stormID(不是默认的)才会自动执行ack)。
        在使用RichBolt时要实现ack,则需要在emit数据的时候,显示指定该数据的源tuple,即collector.emit(oldTuple, newTuple);并且需要在execute执行成功后调用源tuple的ack进行ack。
        ack会消耗资源,速度会慢一些,如果可靠性对你来说不是那么重要 — 你不太在意在一些失败的情况下损失一些数据, 那么你可以通过不跟踪这些tuple树来获取更好的性能
        有两种方法可以去掉可靠性:
        第一是把Config.TOPOLOGY_ACKERS 设置成 0. 在这种情况下, storm会在spout发射一个tuple之后马上调用spout的ack方法。也就是说这个tuple树不会被跟踪。
        第二个方法是在tuple层面去掉可靠性。 你可以在发射tuple的时候不指定messageid来达到不跟踪某个特定的spout tuple的目的。
    搭建(需要依赖zookeeper)
    vi conf/storm.yaml
    修改storm.zookeeper.servers的值,指定storm节点
    修改nimbus.seeds的值,指定nimbus主节点
    mkdir logs
    分发节点
    ./bin/storm nimbus >> logs/nimbus.out 2>&1 &     后台启动numbus,并将所有日志写到nimbus.out文件中,启动后后台有nimbus进程
    ./bin/storm ui >> logs/ui.out 2>&1 &        后台启动StormUI(依赖于nimbus),默认端口8080,并将所有日志写到ui.out文件中,启动后后台有core进程
    ./bin/storm supervisor >> logs/supervisor.out 2>&1 &       后台启动supervisor,并将所有日志写到supervisor.out文件中,启动后后台有supervisor、worker、logworker进程
    如果需要drpc还需要配置
    修改drpc.servers的值,指定drpc节点(也可以指定多个drpc节点)
    ./bin/storm drpc >> logs/drpc.out 2>&1 &        后台启动drpc,并将所有日志写到drpc.out文件中
    集群模式启动
    ./bin/storm jar jar包 类路径 名字(可加,代码中设置了可不加)

    java操作storm
    实例一:异步模式wordcount(一个spout发送到一个bolt再发送到一个bolt中)
    public class TestStorm {
        public static  class SetenceSpout extends BaseRichSpout {
            SpoutOutputCollector collector;
            @Override//定义输出数据的名字,下游可以通过这个名字get到数据
            public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
                outputFieldsDeclarer.declare(new Fields("sentence"));
            }
            @Override//初始化spout,构造spout对象
            public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
                collector = spoutOutputCollector;
            }
            @Override//该方法启动后不停的调用,不停的推送数据
            public void nextTuple() {
                Utils.sleep(100);//如果下游没处理完,会休眠100毫秒,如果执行的足够快,那么这句是不用执行的
                String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
                        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
                String sentence = sentences[new Random().nextInt(sentences.length)];
                collector.emit(new Values(sentence));//将数据发送给下游,declare和emit方法中的对象要一一对应,declare里面定义名字,emit里面定义值
            }
        }
        public static class SplitBolt extends BaseBasicBolt {
            @Override//该方法只要上游有数据发送过来就会被调用
            public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
                String sentence = tuple.getStringByField("sentence");//或tuple.getString(0)方式
                String[] split = sentence.split(" ");
                for(String word:split){
                    basicOutputCollector.emit(new Values(word));
                }
            }
            @Override//定义输出数据的名字,下游可以通过这个名字get到数据,要和emit里面的数据一一对应
            public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
                outputFieldsDeclarer.declare(new Fields("word"));
            }
        }
        public static  class WordCount extends BaseBasicBolt{
            Map<String, Integer> counts = new HashMap<String, Integer>();
            @Override
            public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
                String word = tuple.getStringByField("word");
                Integer count = counts.get(word);
                if(count == null){
                    count=0;
                }
                count++;
                counts.put(word,count);
                System.out.println(word+":"+count);
                //这句代码该逻辑中没有用了,如果后续还需要下游bolt接收,方便一些,建议写上
                basicOutputCollector.emit(new Values(word,count));
            }
            @Override
            public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
                //这句代码该逻辑中没有用了,如果后续还需要下游bolt接收,方便一些,建议写上
                outputFieldsDeclarer.declare(new Fields("word","count"));
            }
        }
        public static void main(String[] args){
            TopologyBuilder builder = new TopologyBuilder();//定义一个拓扑对象
            builder.setSpout("spout",new SetenceSpout(),5);//设置spout,并设置有几个并行度(executor,每个executor默认启动1个task)
            builder.setBolt("split",new SplitBolt(),8).shuffleGrouping("spout");
            builder.setBolt("count",new WordCount(),12).setNumTasks(4).fieldsGrouping("split",new Fields("word"));

            Config conf = new Config();
            conf.setDebug(true);//true记录每个组件所发送的消息,线上会影响性能
            conf.setMaxTaskParallelism(3);//设置task最大的并行度
            conf.setNumWorkers(3);//配置3个worker运行程序
            
            //集群模式下运行,需要打jar包放到nimbus节点上取执行
            try {
                StormSubmitter.submitTopologyWithProgressBar("WordCount",conf,builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
            //本地多线程模拟
    //        LocalCluster cluster = new LocalCluster();
    //        cluster.submitTopology("WordCount", conf, builder.createTopology());//提交拓扑图开始运行,第一个参数是名字
        }
    }

    实例二:和实例一一样,只是这个是一个spout发送到两个bolt中,然后这两个bolt再合并到一个bolt中(只需要改main中部分代码)
        builder.setSpout("spout",new SetenceSpout(),5);//设置spout,并设置有几个并行度(executor,每个executor默认启动1个task)
        builder.setBolt("split1",new SplitBolt(),8).shuffleGrouping("spout");
        builder.setBolt("split2",new SplitBolt(),8).shuffleGrouping("spout");
        builder.setBolt("count",new WordCount(),12).fieldsGrouping("split1",new Fields("word")).fieldsGrouping("split2",new Fields("word"));

    实例三:DRPC同步的方式返回数据,本地多线程模式
    public class TestStorm {
        public static class ExclamationBolt extends BaseBasicBolt {
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("result", "return-info"));
            }
            @Override //获取结果
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                String string = tuple.getString(0);//数据aaa
                Object retInfo = tuple.getValue(1);//主机,端口,流id的json数据
                collector.emit(new Values(string+"======="+retInfo, retInfo));//最少两个值,前面是返回值
            }
        }
        public static void main(String[] args){
            LocalDRPC drpc = new LocalDRPC();//为了本地测试用,构建了LocalDRPC对象
            TopologyBuilder builder = new TopologyBuilder();//定义一个拓扑对象
            //设置spout名字和drpc,接收的数据从DRPC上来的,DRPC上数据是从客户端上来拿数据
            DRPCSpout spout = new DRPCSpout("exclamation",drpc);
            builder.setSpout("drpcSpout", spout);
            builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpcSpout");
            //最后一个bolt一定是ReturnResults,这样才会返回结果
            builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
            Config conf = new Config();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("exclaim", conf, builder.createTopology());
            //exclamation是spout的名字,aaa是数据
            System.out.println("++++++"+drpc.execute("exclamation", "aaa"));
            //返回结果
            //++++++aaa======={"port":0,"host":"e0089355-3b92-414c-a6e2-3ed91415bb8b","id":"1"}
        }
    }
    实例四:drpc同步模式下本地测试LinearDRPCTopologyBuilder方式,LinearDRPCTopologyBuilder的好处:不用写DRPCSpout和ReturnResults,里面已经自己封装了
    public class TestStorm {
        public static class ExclamationBolt extends BaseBasicBolt {
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("return-info","result"));
            }
            @Override //获取结果
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                Object retInfo = tuple.getValue(0);//流id
                String string = tuple.getString(1);//数据aaa
                collector.emit(new Values(retInfo,string+"======="+retInfo));//最少两个值,后面是返回值
            }
        }
        public static void main(String[] args){
            LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
            builder.addBolt(new ExclamationBolt(), 3).shuffleGrouping();
            Config conf = new Config();
            LocalDRPC drpc = new LocalDRPC();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
            System.out.println("+++++++++++++++++++++"+drpc.execute("reach", "hello"));//reach拓扑名字,hallo数据
        }
    }
    实例五:drpc同步模式下在本地向集群中提交storm程序的jar包,同上只改了main方法,client将jar包上传到nimbus上,supervisor再将jar包下载到自己本地再跑
            LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
            builder.addBolt(new ExclamationBolt(), 3).shuffleGrouping();
            Config conf = new Config();
            conf.put(Config.NIMBUS_HOST, "node1");
            conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[]{"node1","node2","node3"}));
            System.setProperty("storm.jar","C:\Users\wanghao\Desktop\a.jar");
            conf.setNumWorkers(6);
            try {
                StormSubmitter.submitTopologyWithProgressBar("reach", conf, builder.createRemoteTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
    实例六:drpc同步模式下本地构建DRPCClient提交到上面再集群中运行的jar包程序
    public class Test {
        public static void main(String[] args){
            Map conf = Utils.readDefaultConfig();
            try {
                DRPCClient client = new DRPCClient(conf, "node1", 3772);
                System.out.println(client.execute("reach", "aaa"));
            } catch (TException e) {
                e.printStackTrace();
            }
        }
    }

  • 相关阅读:
    电信网络拓扑图自动布局之总线
    长短链接区别-2
    TCP长连接与短连接的区别
    memset(&a, 0, sizeof(struct customer))函数
    linker command failed with exit code 1
    iOS “[App] if we're in the real pre-commit handler we can't actually add any new fences due
    iOS 获取屏幕某个区域的截图-b
    iOS-集成支付宝支付、微信支付简单总结
    邓白氏码的申请-iOS公司开发者账号准备
    iOS 自定义导航栏 和状态栏
  • 原文地址:https://www.cnblogs.com/timeTraveler/p/10743448.html
Copyright © 2011-2022 走看看