1 需求
kafka,storm,hdfs整合是流式数据常用的一套框架组合,现在
根据需求使用代码实现该需求
需求:应用所学技术实现,kafka接收随机句子,对接到storm中;使用storm集群统计句子中每个单词重复出现的次数(wordcount),将统计结果存入hdfs中。
1 pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>bigdata</groupId> <artifactId>homework</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <!--<scope>provided</scope>--> <version>1.2.2</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <!--<scope>provided</scope>--> <version>1.2.2</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId> <version>1.0.2</version> <exclusions> <exclusion> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>storm.StormTopologyDriver</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> <skip>true</skip> </configuration> </plugin> </plugins> </build> </project>
2 PullWords.java
package kafka; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; /** * @Description * kafka消费者 */ public class PullWords { private KafkaConsumer<String, String> consumer; private AtomicBoolean isAutoCommit; // kafka topic private final static String TOPIC = "wordCount"; public PullWords() { isAutoCommit = new AtomicBoolean(false); // 默认非自动提交 Properties props = new Properties(); props.put("bootstrap.servers", "mini1:2181,mini2:2181,mini3:2181"); props.put("group.id", "wordCount"); // 设置消费者组,组内的所有消费者协调在一起来消费订阅主题 if (isAutoCommit.get()) { props.put("enable.auto.commit", "true"); // 设置自动提交 props.put("auto.commit.interval.ms", "1000"); //配置自动提交消费进度的时间 } else { props.put("enable.auto.commit", "false"); } props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(TOPIC)); this.isAutoCommit = isAutoCommit; } public void subscribe(String... topic) { consumer.subscribe(Arrays.asList(topic)); } public ConsumerRecords<String, String> pull() { ConsumerRecords<String, String> records = consumer.poll(100); consumer.commitSync(); return records; } public ConsumerRecords<String, String> pullOneOrMore() { ConsumerRecords<String, String> records = null; List<String> values = new ArrayList<>(); while (true) { records = consumer.poll(10); if (records != null) { records.forEach(e -> values.add(e.value())); if (values.size() >= 1) { consumer.commitSync(); values.clear(); break; } } } return records; } public void close() { consumer.close(); } }
3 PushWords.java
package kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; /**a * @Description * kafka生产者 * @Author hongzw@citycloud.com.cn * @Date 2019-02-16 下午 7:08 */ public class PushWords { private Producer<String, String> producer; // kafka topic private final static String TOPIC = "words"; public PushWords() { Properties props = new Properties(); props.put("bootstrap.servers", "storm01:9092,storm02:9092,storm03:9092"); props.put("acks", "all"); props.put("retries", 0); // 请求失败不再重试 props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); } // 发送句子到kafka集群 public Future<RecordMetadata> push(String key, String words) { return producer.send(new ProducerRecord<>(TOPIC, key, words)); // send方法为异步调用 } public void close() { producer.close(); } }
4 WordCount.java
package storm; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class WordCount extends BaseBasicBolt { Map<String, Integer> wordCountMap = new HashMap<>(); @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { String word = tuple.getValueByField("word").toString(); Integer count = Integer.valueOf(tuple.getValueByField("count").toString()); Integer integer = wordCountMap.get(word); if (integer == null) { wordCountMap.put(word, count); } else { wordCountMap.put(word, wordCountMap.get(word) + 1); } if (wordCountMap.size() > 20) { // map里面有超过20个单词则发送hfdsBolt List<Object> list = new ArrayList<>(); // wordCountMap.forEach((k, v) -> { // String result = new String(k + ":" + v); // list.add(result); // }); for (Map.Entry<String, Integer> entry : wordCountMap.entrySet()) { String result = new String(entry.getKey() + ":" + entry.getValue()); list.add(result); } wordCountMap.clear(); if (list.size() > 0) { basicOutputCollector.emit(new Values(list)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("total")); } }
5 WordCountSplit.java
package storm; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class WordCountSplit extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String[] words = tuple.getValue(tuple.fieldIndex("value")).toString().split(" "); for (String word : words) { collector.emit(new Values(word, 1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
6 StormTopologyDriver.java
package storm; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.hdfs.bolt.HdfsBolt; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.format.RecordFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.topology.TopologyBuilder; public class StormTopologyDriver { public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder topologyBuilder = new TopologyBuilder(); KafkaSpoutConfig.Builder builder = new KafkaSpoutConfig.Builder("mini1:2181", "wordCount"); builder.setProp("group.id", "wordCount"); builder.setProp("enable.auto.commit", "true"); builder.setProp("auto.commit.interval.ms", "1000"); builder.setProp("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); builder.setProp("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); topologyBuilder.setSpout("kafkaSpout", new KafkaSpout<>(builder.build())); topologyBuilder.setBolt("wordCountSplit", new WordCountSplit()).shuffleGrouping("kafkaSpout"); topologyBuilder.setBolt("wordCount", new WordCount()).shuffleGrouping("wordCountSplit"); // 将文件保存到hdfs // 设置输出目录 // 输出字段分隔符 RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(","); // 每100个tuple同步到HDFS一次 SyncPolicy syncPolicy = new CountSyncPolicy(5); // 每个写出文件的大小为5MB FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.MB); FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/storm"); HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl("hdfs://mini1:9000").withFileNameFormat(fileNameFormat) .withRecordFormat(format).withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy); topologyBuilder.setBolt("hdfsBolt", hdfsBolt).shuffleGrouping("wordCount"); Config config = new Config(); config.setNumWorkers(2); // 本地模式 // LocalCluster localCluster = new LocalCluster(); // localCluster.submitTopology("countWords", config, topologyBuilder.createTopology()); // 集群模式 StormSubmitter.submitTopology("countWords", config, topologyBuilder.createTopology()); } }