zoukankan      html  css  js  c++  java
  • 大数据学习——kafka+storm+hdfs整合

    1 需求

    kafka,storm,hdfs整合是流式数据常用的一套框架组合,现在
    
    根据需求使用代码实现该需求
    
    需求:应用所学技术实现,kafka接收随机句子,对接到storm中;使用storm集群统计句子中每个单词重复出现的次数(wordcount),将统计结果存入hdfs中。

    1 pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>bigdata</groupId>
        <artifactId>homework</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <!--<scope>provided</scope>-->
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka-client</artifactId>
                <!--<scope>provided</scope>-->
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-hdfs</artifactId>
                <version>1.0.2</version>
                <exclusions>
                    <exclusion>
                        <groupId>io.confluent</groupId>
                        <artifactId>kafka-avro-serializer</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.2.1</version>
                    <configuration>
                        <createDependencyReducedPom>true</createDependencyReducedPom>
                    </configuration>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>storm.StormTopologyDriver</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.7</source>
                        <target>1.7</target>
                        <skip>true</skip>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    2 PullWords.java
    package kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.*;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    /**
     * @Description
     * kafka消费者
     */
    public class PullWords {
    
        private KafkaConsumer<String, String> consumer;
        private AtomicBoolean isAutoCommit;
    
        // kafka topic
        private final static String TOPIC = "wordCount";
    
        public PullWords() {
            isAutoCommit = new AtomicBoolean(false); // 默认非自动提交
            Properties props = new Properties();
            props.put("bootstrap.servers", "mini1:2181,mini2:2181,mini3:2181");
            props.put("group.id", "wordCount"); // 设置消费者组,组内的所有消费者协调在一起来消费订阅主题
            if (isAutoCommit.get()) {
                props.put("enable.auto.commit", "true"); // 设置自动提交
                props.put("auto.commit.interval.ms", "1000"); //配置自动提交消费进度的时间
            } else {
                props.put("enable.auto.commit", "false");
            }
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(TOPIC));
            this.isAutoCommit = isAutoCommit;
        }
    
        public void subscribe(String... topic) {
            consumer.subscribe(Arrays.asList(topic));
        }
    
        public ConsumerRecords<String, String> pull() {
            ConsumerRecords<String, String> records = consumer.poll(100);
            consumer.commitSync();
            return records;
        }
    
        public ConsumerRecords<String, String> pullOneOrMore() {
            ConsumerRecords<String, String> records = null;
            List<String> values = new ArrayList<>();
            while (true) {
                records = consumer.poll(10);
                if (records != null) {
                    records.forEach(e -> values.add(e.value()));
                    if (values.size() >= 1) {
                        consumer.commitSync();
                        values.clear();
                        break;
                    }
                }
            }
            return records;
        }
    
        public void close() {
            consumer.close();
        }
    
    }

    3 PushWords.java

    package kafka;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Properties;
    import java.util.concurrent.Future;
    
    /**a
     * @Description
     * kafka生产者
     * @Author hongzw@citycloud.com.cn
     * @Date 2019-02-16 下午 7:08
     */
    public class PushWords {
    
        private Producer<String, String> producer;
    
        // kafka topic
        private final static String TOPIC = "words";
    
    
        public PushWords() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "storm01:9092,storm02:9092,storm03:9092");
            props.put("acks", "all");
            props.put("retries", 0); // 请求失败不再重试
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<>(props);
        }
    
        // 发送句子到kafka集群
        public Future<RecordMetadata> push(String key, String words) {
            return producer.send(new ProducerRecord<>(TOPIC, key, words)); // send方法为异步调用
        }
    
        public void close() {
            producer.close();
        }
    
    }

    4 WordCount.java

    package storm;
    
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    
    public class WordCount extends BaseBasicBolt {
    
        Map<String, Integer> wordCountMap = new HashMap<>();
    
        @Override
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            String word = tuple.getValueByField("word").toString();
            Integer count = Integer.valueOf(tuple.getValueByField("count").toString());
            Integer integer = wordCountMap.get(word);
            if (integer == null) {
                wordCountMap.put(word, count);
            } else {
                wordCountMap.put(word, wordCountMap.get(word) + 1);
            }
            if (wordCountMap.size() > 20) { // map里面有超过20个单词则发送hfdsBolt
                List<Object> list = new ArrayList<>();
    
    //            wordCountMap.forEach((k, v) -> {
    //                String result = new String(k + ":" + v);
    //                list.add(result);
    //            });
    
                for (Map.Entry<String, Integer> entry : wordCountMap.entrySet()) {
                    String result = new String(entry.getKey() + ":" + entry.getValue());
                    list.add(result);
                }
    
                wordCountMap.clear();
                if (list.size() > 0) {
                    basicOutputCollector.emit(new Values(list));
                }
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("total"));
        }
    }

    5 WordCountSplit.java

    package storm;
    
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    
    public class WordCountSplit extends BaseBasicBolt {
    
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String[] words = tuple.getValue(tuple.fieldIndex("value")).toString().split(" ");
            for (String word : words) {
                collector.emit(new Values(word, 1));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    6 StormTopologyDriver.java

    package storm;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.hdfs.bolt.HdfsBolt;
    import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
    import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
    import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    import org.apache.storm.hdfs.bolt.format.RecordFormat;
    import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
    import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
    import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    import org.apache.storm.kafka.spout.KafkaSpout;
    import org.apache.storm.kafka.spout.KafkaSpoutConfig;
    import org.apache.storm.topology.TopologyBuilder;
    
    public class StormTopologyDriver {
    
        public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            KafkaSpoutConfig.Builder builder = new KafkaSpoutConfig.Builder("mini1:2181", "wordCount");
            builder.setProp("group.id", "wordCount");
            builder.setProp("enable.auto.commit", "true");
            builder.setProp("auto.commit.interval.ms", "1000");
            builder.setProp("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            builder.setProp("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            topologyBuilder.setSpout("kafkaSpout", new KafkaSpout<>(builder.build()));
    
            topologyBuilder.setBolt("wordCountSplit", new WordCountSplit()).shuffleGrouping("kafkaSpout");
            topologyBuilder.setBolt("wordCount", new WordCount()).shuffleGrouping("wordCountSplit");
    
            // 将文件保存到hdfs
            // 设置输出目录
            // 输出字段分隔符
            RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");
            // 每100个tuple同步到HDFS一次
            SyncPolicy syncPolicy = new CountSyncPolicy(5);
            // 每个写出文件的大小为5MB
            FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.MB);
            FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/storm");
            HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl("hdfs://mini1:9000").withFileNameFormat(fileNameFormat)
            .withRecordFormat(format).withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);
            topologyBuilder.setBolt("hdfsBolt", hdfsBolt).shuffleGrouping("wordCount");
    
            Config config = new Config();
            config.setNumWorkers(2);
            // 本地模式
    //        LocalCluster localCluster = new LocalCluster();
    //        localCluster.submitTopology("countWords", config, topologyBuilder.createTopology());
    
            // 集群模式
            StormSubmitter.submitTopology("countWords", config, topologyBuilder.createTopology());
        }
    }
  • 相关阅读:
    9.24 Django Form组件
    9.21 form 和Ajax详解
    vue生命周期
    禁止输入框复制粘贴
    【转义字符】HTML 字符实体&lt; &gt: &amp;
    dispatch emit broadcast
    删除git项目
    新建github项目,邀请成员
    git常用命令行
    echarts用法
  • 原文地址:https://www.cnblogs.com/feifeicui/p/10441785.html
Copyright © 2011-2022 走看看