zoukankan      html  css  js  c++  java
  • 大数据-storm理论

    导读:

    第一节:基础架构

      1:编程模型

      2:架构

      3:数据传输

      4:高可靠性

      5:高维护性

      6:数据处理方式

      7:对比MR,SPARK

    第二节:计算模型

      1:spout

      2:bolt

      3:stream grouping

      4:构建拓扑与提交

    第三节:架构

    第四节:部署

    第五节:数据处理

      1:同步计算

      2:并发机制

    第六节:flume+kafka+strom

      1:kafka

      2:strom兼容kafka

      3:flume兼容kafka

    第七节:容错机制

      1:ACK

      2:事务

     

    第一节:基础架构

    1、编程模型

      DAG (Topology)

        Spout(tuple)----->bolt(tuple)--->bolt(tuple)

        Spout(数据推送源)

        Bolt(数据处理源)

     

    2、架构

      Nimbus //主节点上的守护进程

        资源管理

        任务分配

        Jar包任务的上传

      Supervisor //从节点上的监督进程

        接收nimbus分配的任务

        启动、停止自己管理的worker进程(当前supervisor上worker数量由配置文件设定)

      Worker //从节点上的工作进程

        运行具体处理运算组件的进程(每个Worker对应执行一个Topology的子集)

        worker任务类型,即spout任务、bolt任务两种

        启动executor

            (executor即worker JVM进程中的一个java线程,一般默认每个executor负责执行一个task任务)

     

    3、数据传输

      ZMQ(twitter早期产品)

      Netty(现用的)

     

    4、高可靠性

      异常处理

      消息可靠性保障机制(ACK):

        不管在哪个bolt中出现问题,都会回到spout再次走流程,但是有问题,这个情况可以忽略。

     

    5、可维护性

      StormUI 图形化监控接口

     

    6、数据处理方式

      (1)、异步:无应答

        client--->消息队列(kafka等)--->strom--->结果源(mysql,hbase,redis等)

      (2)、同步:有应答,需要调用内置的drpc服务器才会是同步的

        Client--->DRPC--->strom--->DRPC--->client

     

    7、比较:

       

     

    第二节:计算模型

      Java写,所以不管是sport还是bolt都是类,业务的处理也就是对类中方法的实现

    (1)Spout:继承BaseRichSpout或实现IRichSpout

      open: //对象创建

        This.conf = conf;

        This.context= context;

        This.collector= collector;

     

      declare方法:声明定义,即定义要向后发送的是什么数据,declare英文注释:声明

        可先通过DeclarerOutputFields中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去

        例子:

          DeclarerOutputFields

          Declarer.declare(new Fields(“num”));

     

      nextTuple方法:数据拉取,数据推送

        Spout中最核心的方法是nextTuple,该方法会被Storm线程不断调用、主动从数据源拉取数据再通过emit方法将数据生成元组(Tuple)发送给之后的Bolt计算

        例子:

          nextTuple

            i++;

            List tuple = new Values(i);

            This.collector.emit(tuple);

     

    (2)Bolt:继承BaseRichBolt或实现IRichBolt

      prepare: //同上面的open一样创建

      declare方法:声明定义,即定义要向后发送的是什么数据

        可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去

     

      execute方法:业务处理

        Bolt中最核心的方法是execute方法,该方法负责接收到一个元组(Tuple)数据、真正实现核心的业务逻辑

        例子:

          Excute

            Input.get...byField(“num”);

     

    (3)Stream Grouping – 数据流分组(即数据分发策略)将tuple发送给哪个bolt task来处理

      1. Shuffle Grouping 随机分发(常用)

        随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。

        轮询,平均分配 

        例子:.shuffleGrouping(“sport1”);

     

      2. Fields Grouping按字段分组(常用)

        按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task, 而不同的"user-id"则可能会被分配到不同的task。 

        例子:.fieldsGrouping(“sport1”,new Fields(“w”));

     

      3. All Grouping所有的节点都获取到全量的(不用)

        广播发送,对于每一个tuple,所有的bolts都会收到 

     

      4. Global Grouping(不用)

        全局分组,把tuple分配给task id最低的task 。

      

      5. None Grouping(同shuffle grouping,不用)

        不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。 有一点不同的是storm会把使用none grouping的这个bolt

        放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。 

     

      6. Direct Grouping(指定哪个接收,首先声明Direct Stream指向流,emitDirect用这种的方式发送)

        指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct Stream

        的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task

        的id (OutputCollector.emit方法也会返回task的id)  

     

      7. Local or shuffle grouping(限制在一个工作进程中的随机分发,不用)

        本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致

     

      8. customGrouping(不用)

        自定义,相当于mapreduce那里自己去实现一个partition一样。

     

    (4)构建拓扑及提交

      Main

        topologyBuilder tb = new topologyBuilder();

        Tb.setSport(“sport1”,new Sport1());

        Tb.setBolt(“bolt1”,new Bolt1(),2).shuffleGrouping(“sport1”); //这里的2是并行度,可以不设;shuffleGrouping是分发策略。

     

        If(args.length > 0){ //集群形式运行

          StormSubmitTopology.submitTopology(args[0],new conf(), tb.creatTopology());

        }else{ //本地形式运行

          localCluster lc = new localCluster();

          Lc.submitTopology(“wc”, new conf(), tb.creatTopology());

        }

     

    第三节:架构设计

       

        对比:

     

     

    Hadoop

    Storm

    主节点

    ResourceManager

    Nimbus

    从节点

    NodeManager

    Supervisor

    应用程序

    Job

    Topology

    工作进程

    Child

    Worker

    计算模型

    Map/Reduce(split,map,shuffle,reduce)

    Spout/Bolt

     

        任务提交流程:

           

      

    第四节:环境部署

    1、在storm目录中创建logs目录,出现问题时候存放日志

      mkdir logs

    2、解压

      通讯端口是8080

    3、修改配置文件 https://blog.csdn.net/l1028386804/article/details/51924272

      storm.yaml

        storm.zookeeper.servers:

          - "node1"

          - "node2"

          - "node3"

        storm.local.dir: "/tmp/storm"

        nimbus.host: “node1"

        supervisor.slots.ports: //supervisor上运行workers的端口列表.每个worker占用一个端口,且每个端口只运行一个worker.通过这项配 - 6700 置可以调整每台机器上运行的worker数.(调整slot数/每机)

            - 6701

            - 6702

            - 6703

     4、查看帮助信息

      ./storm help

     5、启动Zookeeper

      ./bin/storm dev-zookeeper >> ./logs/zk.out 2>&1 &

     6、在node1上启动Nimbus,UI

      ./bin/storm nimbus >> ./logs/nimbus.out 2>&1 &

      ./bin/storm ui >> ./logs/ui.out 2>&1 &

     7、在node2、node3上启动Supervisor(按照配置每个Supervisor上启动4个slots)

      ./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &

     8、启动Logviewer(可以在浏览器上查看日志)

      ./bin/storm logviewer &

     9、提交jar包到集群:

      strom help jar(查看如何提交jar到storm集群)

     

    第五节:数据处理方式

      1、同步计算(DRPC):实现一个demo

        集群:

          修改配置文件conf/storm.yaml

          drpc.servers:

             - "node1”

        启动:

          启动 drpc

          storm drpc >> ./logs/drpc.out 2>&1 &

         代码:

          1、自动方式:不建议使用,比较死板 

            LinearDRPCTopologyBuilder

          2、手动方式:

             

       2、并发机制:

        Worker – 进程

          一个Topology拓扑会包含一个或多个Worker(每个Worker进程只能从属于一个特定的Topology)

          这些Worker进程会并行跑在集群中不同的服务器上,即一个Topology拓扑其实是由并行运行在Storm集群中多台服务器上的进程所组成

     

        Executor – 线程

          Executor是由Worker进程中生成的一个线程

          每个Worker进程中会运行拓扑当中的一个或多个Executor线程

          一个Executor线程中可以执行一个或多个Task任务(默认每个Executor只执行一个Task任务),但是这些Task任务都是对应着同一个组件(Spout、Bolt)。

     

        Task

          实际执行数据处理的最小单元

          每个task即为一个Spout或者一个Bolt

     

        注意:

          Task数量在整个Topology生命周期中保持不变,Executor数量可以变化或手动调整

          (默认情况下,Task数量和Executor是相同的,即每个Executor线程中默认运行一个Task任务)

     

        代码中设置数量:

          设置Worker进程数

          Config.setNumWorkers(int workers)

     

        设置Executor线程数

          TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint)

          TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)

     

        设置Task数量

          ComponentConfigurationDeclarer.setNumTasks(Number val)

         

        例:

          Config conf = new Config() ;

          conf.setNumWorkers(2);

          TopologyBuilder topologyBuilder = new TopologyBuilder();

          topologyBuilder.setSpout("spout", new MySpout(), 1);

          topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)

                            .setNumTasks(4)

                            .shuffleGrouping("blue-spout);

     

        调整worker与excutor的数量:

          Rebalance:命令行动态调整,可以多设置task,以后再发生扩充的时候,就可以直接调整worker与excutor数量来实现迁移

      

    第六节:Flume+kafka+strom

      (1)Kafka(Kafka的优势

        简介

        1、角色:

          Producers ---topic----consumers

          在一个分区中的生产与消费是有序的

     

        2、搭建集群的节点:

          Broker //编号

     

        3、如何保证消费的有序性:

          消费的偏移量

     

        4、优势

          高吞吐量:零拷贝,Netty的网络传输。

          数据默认保存7天。

          offset会更新到一个kafka自带的topic【__consumer_offsets】

     

        安装:

          解压:

            Tar gz -C 目录

     

          配置:

            config/server.properties:

              Broker.id

              Log.dirs=真实数据存储路径

              Zookeeper.connect=node01:2181,node02:2181

     

          启动:

            bin/kafka-server-start.sh config/server.properties //启动kafka,且指定要读取哪个配置文件

     

        测试

          查看帮助手册:

            bin/kafka-console-consumer.sh help

          查看topic列表:

            bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --list

           查看“test”topic描述:

            bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --describe --topic test

           Topic创建

            bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --create --replication-factor 2 --partitions 3 --topic test

            (参数说明:

              --replication-factor:副本个数,默认1个

              --partitions:指定当前创建的topic分区数量,默认1个(向几个分区中分发数据,此时一个分区有序,整体无序)

              --topic:指定新建topic的名称

            )

           Producer创建(往主题中生产数据)

            bin/kafka-console-producer.sh --broker-list node06:9092,node07:9092,node08:9092 --topic test

          Consumer创建(消费某主题的数据)

            bin/kafka-console-consumer.sh --zookeeper node06:2181,node07:2181,node08:2181 --from-beginning --topic test

            (参数说明:

              --from-beginning 从头消费

            )

     

      (2)strom兼容kafka

        kafkaSpout 消费kafka数据

         

         kafkaBolt 生产kafka数据

         

        

      (3)Flume兼容kafka

        1、flume安装

          (1)解压

          (2)配置flume-env.sh

            配置Java的环境变量位置

          (3)在conf/下 创建配置文件fk.conf

            #取别名

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = avro //客户端连接source源的协议
    a1.sources.r1.bind = node06 //flume所在节点(如果是集群呢???????????????)
    a1.sources.r1.port = 41414 //与flume连接的端口
    
    # Describe the sink 对输出源的规定
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink //往哪里推送
    a1.sinks.k1.topic = testflume //主题
    a1.sinks.k1.brokerList = node06:9092,node07:9092,node08:9092
    a1.sinks.k1.requiredAcks = 1 //数据完整保障机制,1:主节点告诉消息已传递即可,0:只管推送,-1:所有的都反馈
    a1.sinks.k1.batchSize = 20
    a1.sinks.k1.channel = c1
    
    # Use a channel which buffers events in memory 内存区设定
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.transactionCapacity = 10000
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

          (4)启动flume

            bin/flume-ng agent -n a1 -c conf -f conf/fk.conf -Dflume.root.logger=DEBUG,console

            //agent是代理

            //Dflume.root.logger=DEBUG,console 日志输出格式,控制台打印debug日志

     

    第七节:容错机制

      1、容错机制(消息的完整性:速度降低了,完整度提高):

        (1) 怎么判断消息是否完整传递:

          传递数据的同时,带有16位二进制的ID,数据传递到下一个点时候,将新的ID进行xor运算,如果是全0,则说明数据正常传递。

          如果有问题,则通知spout重新发送。

        (2) Ack

          保证数据至少一次发送,但是不保证会重复。

     

      2、事务:

        要解决的问题:

          事务解决让数据有且仅有一次被处理。

        Transcation:

          tuple传递时候带着transcationID,且这个ID是线性增长,数据的处理也是强有序性的。

          将tuple存入库时,那么可根据ID判断,如果有这个ID的数据了,那么就是代表数据被处理过了,就不做处理了。

          弊端:每次的tuple频繁的与数据库交互。所以衍生出:

        Design2:

          现在是多个tuple组成batch,这个批里面带着一个transcationID,以批处理形式,进行数据传递与数据库交互。

          弊端:一个batch在运行时候,其他的是在阻塞状态,不仅影响速度,还有资源浪费。

        Design3:

        在前面的bolt阶段,所有的batch是自由竞争阶段,但是在要入库时候还是要保持强有序性。

     

     

     

  • 相关阅读:
    阿里云安装Kafka并配置外网访问
    Golang之select中time.After()为什么不生效
    macOS配置GitHub和GitLab共存
    Golang获取HTTP请求IP
    macOS下安装Python3.8并设置为默认版本
    mysql之力扣数据库题目620有趣的电影优化记录
    Pgsql之查询一个月份的天数
    Pgsql之查询一段时间内的所有年月yyyy-mm
    Pgsql之查询一段时间内的所有日期
    swiper下echarts点击事件失效
  • 原文地址:https://www.cnblogs.com/dblog/p/12164892.html
Copyright © 2011-2022 走看看