zoukankan      html  css  js  c++  java
  • Storm 分发策略+与Kafka集成

    Storm的分发策略

    Storm当中的分组策略,一共有八种:

     

    所谓的grouping策略就是在Spout与Bolt、Bolt与Bolt之间传递Tuple的方式。总共有八种方式:

     

    1)shuffleGrouping(随机分组)随机分组;将tuple随机分配到bolt中,能够保证各task中处理的数据均衡;

     

     2)fieldsGrouping(按照字段分组,在这里即是同一个单词只能发送给一个Bolt)

    按字段分组; 根据设定的字段相同值得tuple被分配到同一个bolt进行处理;

    举例:builder.setBolt("mybolt", new MyStoreBolt(),5).fieldsGrouping("checkBolt",new Fields("uid"));

    说明:该bolt由5个任务task执行,相同uid的元组tuple被分配到同一个task进行处理;该task接收的元祖字段是mybolt发射出的字段信息,不受uid分组的影响。

        该分组不仅方便统计而且还可以通过该方式保证相同uid的数据保存不重复(uid信息写入数据库中唯一);

     

     3)allGrouping(广播发送,即每一个Tuple,每一个Bolt都会收到)广播发送:所有bolt都可以收到该tuple

     

     4)globalGrouping(全局分组,将Tuple分配到task id值最低的task里面)全局分组:tuple被发送给bolt的同一个并且最小task_id的任务处理,实现事务性的topology

     

     5)noneGrouping(随机分派)不分组:效果等同于shuffle Grouping.

     

     6)directGrouping(直接分组,指定Tuple与Bolt的对应发送关系)

    直接分组:由tuple的发射单元直接决定tuple将发射给那个bolt,一般情况下是由接收tuple的bolt决定接收哪个bolt发射的Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)。

     7)Local or shuffle Grouping本地或者随机分组,优先将数据发送到本机的处理器executor,如果本机没有对应的处理器,那么再发送给其他机器的executor,避免了网络资源的拷贝,减轻网络传输的压力

     

     8)customGrouping (自定义的Grouping)

     

    kafka与storm1.1.3集成

    第一步:导入jar包


    <!--  use new kafka spout code -->

            <dependency>

                <groupId>org.apache.storm</groupId>

                <artifactId>storm-kafka-client</artifactId>

                <version>1.1.3</version>

            </dependency>

            <dependency>

                <groupId>org.apache.kafka</groupId>

                <artifactId>kafka-clients</artifactId>

                <version>1.0.0</version>

            </dependency>

    <dependency>

                   <groupId>org.apache.storm</groupId>

                   <artifactId>storm-core</artifactId>

                   <version>1.1.3</version>

                   <scope>provided</scope>

               </dependency>


     

     

    第二步:编写我们的主函数入口程序


    public class KafkStormTopo {

          public static void main(String[] args) throws Exception {

               //获取到了一个内部类

    KafkaSpoutConfig.Builder<String, String> kafkaSpoutConfigBuilder = KafkaSpoutConfig.builder("hadoop-001:9092,hadoop-002:9092,hadoop-003:9092", "four");

     

    //所有关于kafka的设置都在kafkaSpoutConfigBuilder 里面配置

    //控制kafka的offset的消费策略

    kafkaSpoutConfigBuilder.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST);

    kafkaSpoutConfigBuilder.setGroupId("kafkaToStorm");

    kafkaSpoutConfigBuilder.setOffsetCommitPeriodMs(1000L);

    KafkaSpoutConfig<String, String> kafkaSpoutConfig = kafkaSpoutConfigBuilder.build();

    //kafkaSpout需要KafkaSpoutConfig

    KafkaSpout<String, String> kafkaSpout = new KafkaSpout<String, String>(kafkaSpoutConfig);

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("kafkaSpout",kafkaSpout);

    builder.setBolt("printlnBolt",new PrintlnBolt()).localOrShuffleGrouping("kafkaSpout");

          }

    }


     

     

     

    第三步:开发我们的PrintlnBolt作为消息处理


    public class PrintlnBolt extends BaseRichBolt {


    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

    }

    @Override
    public void execute(Tuple input) {
    System.out.println("printbolt tuple"+input.getValues());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
    }

     

    创建topic


    kafka-topics.sh --create --zookeeper hadoop-001:2181 --replication-factor 3 --partitions 1 --topic four


     

    创建生产者


    kafka-console-producer.sh --broker-list hadoop-001:9092,hadoop-002:9092,hadoop-003:9092 --topic four 


    发送消息与接收消息

    发送消息

    接收消息

     

    setFirstPollOffsetStrategy:允许你设置从哪里开始消费数据. 这在故障恢复和第一次启动spout的情况下会被使用

    EARLIEST :无论之前的消费情况如何,spout会从每个kafka partition能找到的最早的offset开始的读取

    LATEST :无论之前的消费情况如何,spout会从每个kafka partition当前最新的offset开始的读取

    UNCOMMITTED_EARLIEST (默认值) :spout 会从每个partition的最后一次提交的offset开始读取. 如果offset不存在或者过期, 则会依照 EARLIEST进行读取.

    UNCOMMITTED_LATEST :spout 会从每个partition的最后一次提交的offset开始读取, 如果offset不存在或者过期, 则会依照 LATEST进行读取

  • 相关阅读:
    rabbitmq在linux下单节点部署和基本使用
    使用kafka-python客户端进行kafka kerberos认证
    python confluent kafka客户端配置kerberos认证
    linux下rocksdb的编译安装
    linux下gflags的安装
    ASP.NET:/WebResource.axd
    公司-IT-信息安全:江南天安
    信息安全-证书:数字证书
    信息安全-证书-数字证书:SSL证书
    Error-DotNet:无法为目标平台“Microsoft.Data.Tools.Schema.Sql.Sql120DatabaseSechemaProvider”创建扩展管理器
  • 原文地址:https://www.cnblogs.com/Transkai/p/10903438.html
Copyright © 2011-2022 走看看