zoukankan      html  css  js  c++  java
  • Spark集成Kafka实时流计算Java案例

    package com.test;
    
    import java.util.*;
    import org.apache.spark.SparkConf;
    import org.apache.spark.TaskContext;
    import org.apache.spark.api.java.*;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.*;
    import org.apache.spark.streaming.kafka010.*;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.spark.api.java.Optional;
    import scala.Tuple2;
    
    public class Test5 {
    
    	public static void main(String[] args) throws InterruptedException {
    		// 接收数据的地址和端口
    		final JavaPairRDD<String, Integer>[] lastRdd = new JavaPairRDD[1];
    
    		SparkConf conf = new SparkConf().setMaster("local").setAppName(
    				"streamingTest");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		sc.setLogLevel("ERROR");
    		sc.setCheckpointDir("./checkpoint");
    		JavaStreamingContext ssc = new JavaStreamingContext(sc,
    				Durations.seconds(10));
    
    		// kafka相关参数,必要!缺了会报错
    		Map<String, Object> kafkaParams = new HashMap<>();
    		kafkaParams.put("bootstrap.servers", "192.168.174.200:9092");
    		kafkaParams.put("key.deserializer", StringDeserializer.class);
    		kafkaParams.put("value.deserializer", StringDeserializer.class);
    		kafkaParams.put("group.id", "newgroup2");
    		kafkaParams.put("auto.offset.reset", "latest");
    		kafkaParams.put("enable.auto.commit", false);
    
    		Collection<String> topics = Arrays.asList("test");
    
    		JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
    				.createDirectStream(ssc, LocationStrategies.PreferConsistent(),
    						ConsumerStrategies.<String, String> Subscribe(topics,
    								kafkaParams));
    
    		// 注意这边的stream里的参数本身是个ConsumerRecord对象
    		JavaPairDStream<String, Integer> counts = stream
    				.flatMap(
    						x -> Arrays.asList(x.value().toString().split(" "))
    								.iterator())
    				.mapToPair(x -> new Tuple2<String, Integer>(x, 1))
    				.reduceByKey((x, y) -> x + y);
    		//counts.print();
    
    		JavaPairDStream<String, Integer> result = counts
    				.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
    
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public Optional<Integer> call(List<Integer> values,
    							Optional<Integer> state) throws Exception {
    						/**
    						 * values:经过分组最后 这个key所对应的value,如:[1,1,1,1,1]
    						 * state:这个key在本次之前之前的状态
    						 */
    						Integer updateValue = 0;
    						if (state.isPresent()) {
    							updateValue = state.get();
    						}
    
    						for (Integer value : values) {
    							updateValue += value;
    						}
    						return Optional.of(updateValue);
    					}
    				});
    
    		result.print();
    
    		ssc.start();
    		ssc.awaitTermination();
    		ssc.close();
    	}
    }
    

  • 相关阅读:
    C++闭包到C函数指针转化
    是否使用预编译头文件
    多线程模型一:只完成最新任务
    关于“函数针对入参判空并返回”
    C++函数参数的编写
    .Net Core(二) 下
    微信接口本地调试(IIS服务器)
    .Net Core 学习(二)上篇
    .Net Core学习(一)
    博客园的第一个博客
  • 原文地址:https://www.cnblogs.com/gmhappy/p/9472441.html
Copyright © 2011-2022 走看看