zoukankan      html  css  js  c++  java
  • Spark2.0基于广播变量broadcast实现实时数据按天统计

    package com.gm.hive.SparkHive;
    
    import java.text.SimpleDateFormat;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import org.apache.spark.Partition;
    
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.Optional;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.broadcast.Broadcast;
    import org.apache.spark.rdd.RDD;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    import org.apache.spark.streaming.kafka010.KafkaUtils;
    import org.apache.spark.streaming.kafka010.LocationStrategies;
    
    import scala.Tuple2;
    import scala.reflect.ClassManifestFactory;
    
    
    public class App {
    
    	private static volatile Broadcast<Map<String,Boolean>> bcMap = null;
    	
    	public static void main(String[] args) {
    		// TODO Auto-generated method stub
    		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
    				"SparkStreaming");
    		
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		sc.setLogLevel("ERROR");
    		sc.setCheckpointDir("./checkpoint");
    		
    		JavaStreamingContext ssc = new JavaStreamingContext(sc,
    				Durations.seconds(10));
    		
    		Date date = new Date();
    		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
    		
    		Map<String,Boolean> map = new HashMap<String,Boolean>();
    		map.put(sdf.format(date), true);
    		bcMap = sc.broadcast(map);//初始广播变量
    	
    		// kafka相关参数,必要!缺了会报错
    		Map<String, Object> kafkaParams = new HashMap<>();
    		kafkaParams.put("bootstrap.servers", "192.168.174.200:9092");
    		kafkaParams.put("key.deserializer", StringDeserializer.class);
    		kafkaParams.put("value.deserializer", StringDeserializer.class);
    		kafkaParams.put("group.id", "newgroup2");
    		kafkaParams.put("auto.offset.reset", "latest");
    		kafkaParams.put("enable.auto.commit", false);
    
    		Collection<String> topics = Arrays.asList("test");
    
    		JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
    				.createDirectStream(ssc, LocationStrategies.PreferConsistent(),
    						ConsumerStrategies.<String, String> Subscribe(topics,
    								kafkaParams));
    
    		// 注意这边的stream里的参数本身是个ConsumerRecord对象
    		JavaPairDStream<String, Integer> counts = stream
    				.flatMap(
    						x -> Arrays.asList(x.value().toString().split(" "))
    								.iterator())
    				.mapToPair(x -> new Tuple2<String, Integer>(x, 1))
    				.reduceByKey((x, y) -> x + y);
    		//counts.print();
    
    		
    		stream.foreachRDD(rdd -> {
    			
    			Map<String,Boolean> map1 = bcMap.value();
    			Date newDate = new Date();
    			SimpleDateFormat newsdf = new SimpleDateFormat("yyyy-MM-dd");
    			String newDay = newsdf.format(newDate);
    			
    			if (map1.get(newDay) != null) {//存在当前天
    				if (bcMap.value().get(newDay)) {//当前天的值为true,更新为false并更新到广播变量中
    					map1.put(newDay, false);
    					bcMap = rdd.context().broadcast(map1,ClassManifestFactory.classType(Map.class));	
    				}
    			} else {
    				if (bcMap != null) {//不存在当前天,将新的一天添加并更新到广播变量中
    					bcMap.unpersist();
    				}
    				map1.put(newDay, true);
    				bcMap = rdd.context().broadcast(map1,ClassManifestFactory.classType(Map.class));
    			}	
    		});
    		
    		
    		JavaPairDStream<String, Integer> result = counts
    				.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
    
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public Optional<Integer> call(List<Integer> values,
    							Optional<Integer> state) throws Exception {
    						/**
    						 * values:经过分组最后 这个key所对应的value,如:[1,1,1,1,1]
    						 * state:这个key在本次之前之前的状态
    						 */
    						Integer updateValue = 0;
    						
    						Date newDate = new Date();
    						SimpleDateFormat newsdf = new SimpleDateFormat("yyyy-MM-dd");
    						String newDay = newsdf.format(newDate);
    						
    						Map<String,Boolean> map1 = bcMap.value();
    						if (map1.get(newDay) != null) {
    							if(map1.get(newDay)){//新的一天开始,将计算结果更新为0
    								for (Integer value : values) {
    									updateValue += value;
    								}
    							} else {//新的一天进行中,已计算过数据,正常运算
    								if (state.isPresent()) {
    									updateValue = state.get();		
    								}
    								for (Integer value : values) {
    									updateValue += value;
    								}
    							}
    						}
    						return Optional.of(updateValue);
    					}
    				});
    		
    		
    		//数据库内容
    		String url = "jdbc:postgresql://192.168.174.200:5432/postgres?charSet=utf-8";
    		Properties connectionProperties = new Properties();
    		connectionProperties.put("user","postgres");
    		connectionProperties.put("password","postgres");
    		connectionProperties.put("driver","org.postgresql.Driver");
    		
    		result.print();
    
    		result.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>(){
    			public void call(JavaPairRDD<String, Integer> rdd)
    					throws Exception {
    				// TODO Auto-generated method stub
    				JavaRDD<ResultRow> rowRDD = rdd.map(new Function<Tuple2<String, Integer>,ResultRow>(){
    
    					public ResultRow call(Tuple2<String, Integer> arg0)
    							throws Exception {
    						// TODO Auto-generated method stub
    						Date newDate = new Date();
    						SimpleDateFormat newsdf = new SimpleDateFormat("yyyy-MM-dd");
    						String newDay = newsdf.format(newDate);
    						
    						ResultRow rr = new ResultRow();
    						rr.setTypeid(arg0._1+"_"+newDay);
    						rr.setKczs(arg0._2);
    						return rr;
    					}
    					
    				});
    				SparkSession spark = SparkSession.builder().config(rdd.context().getConf()).getOrCreate();
    				Dataset<Row>  data = spark.createDataFrame(rowRDD, ResultRow.class);
    				//将数据通过覆盖的形式保存在数据表中
    				data.write().mode(SaveMode.Append).jdbc(url, "kcssqktj", connectionProperties);
    			}	
    		});
    		
    		ssc.start();
    		try {
    			ssc.awaitTermination();
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		ssc.close();
    	}
    
    }
    
    package com.gm.hive.SparkHive;
    
    import java.io.Serializable;
    
    public class ResultRow implements Serializable {
    	private static final long serialVersionUID = 6681372116317508248L;
    	String typeid;
    	int kczs;
    
    	public String getTypeid() {
    		return typeid;
    	}
    
    	public void setTypeid(String typeid) {
    		this.typeid = typeid;
    	}
    
    	public int getKczs() {
    		return kczs;
    	}
    
    	public void setKczs(int kczs) {
    		this.kczs = kczs;
    	}
    
    }
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.test</groupId>
    	<artifactId>kcssqktj_spark</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>junit</groupId>
    			<artifactId>junit</artifactId>
    			<version>3.8.1</version>
    			<scope>test</scope>
    		</dependency>
    
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-log4j12</artifactId>
    			<version>1.7.22</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-client</artifactId>
    			<version>2.8.0</version>
    			<exclusions>
    				<exclusion>
    					<groupId>javax.servlet</groupId>
    					<artifactId>*</artifactId>
    				</exclusion>
    			</exclusions>
    		</dependency>
    
    		<dependency>
    			<groupId>org.apache.spark</groupId>
    			<artifactId>spark-sql_2.11</artifactId>
    			<version>2.0.0</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.spark</groupId>
    			<artifactId>spark-hive_2.11</artifactId>
    			<version>2.0.0</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.apache.spark</groupId>
    			<artifactId>spark-streaming_2.11</artifactId>
    			<version>2.0.0</version>
    			<exclusions>
    				<exclusion>
    					<artifactId>slf4j-log4j12</artifactId>
    					<groupId>org.slf4j</groupId>
    				</exclusion>
    			</exclusions>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.spark</groupId>
    			<artifactId>spark-core_2.11</artifactId>
    			<version>2.0.0</version>
    		</dependency>
    
    		<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
    		<dependency>
    			<groupId>org.apache.hive</groupId>
    			<artifactId>hive-jdbc</artifactId>
    			<version>2.1.1</version>
    		</dependency>
    
    		<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
    		<dependency>
    			<groupId>org.apache.hive</groupId>
    			<artifactId>hive-exec</artifactId>
    			<version>2.1.1</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.postgresql</groupId>
    			<artifactId>postgresql</artifactId>
    			<version>9.4-1201-jdbc4</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.apache.spark</groupId>
    			<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    			<version>2.0.0</version>
    		</dependency>
    	</dependencies>
    	<build>
    
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<configuration>
    					<source>1.8</source>
    					<target>1.8</target>
    				</configuration>
    			</plugin>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-shade-plugin</artifactId>
    				<configuration>
    					<source>1.8</source>
    					<target>1.8</target>
    				</configuration>
    				<executions>
    					<execution>
    						<phase>package</phase>
    						<goals>
    							<goal>shade</goal>
    						</goals>
    						<configuration>
    							<shadedArtifactAttached>true</shadedArtifactAttached>
    							<shadedClassifierName>allinone</shadedClassifierName>
    							<artifactSet>
    								<includes>
    									<include>*:*</include>
    								</includes>
    							</artifactSet>
    							<filters>
    								<filter>
    									<artifact>*:*</artifact>
    									<excludes>
    										<exclude>META-INF/*.SF</exclude>
    										<exclude>META-INF/*.DSA</exclude>
    										<exclude>META-INF/*.RSA</exclude>
    									</excludes>
    								</filter>
    							</filters>
    							<transformers>
    								<transformer
    									implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    									<resource>reference.conf</resource>
    								</transformer>
    								<transformer
    									implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    									<resource>META-INF/spring.handlers</resource>
    								</transformer>
    								<transformer
    									implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    									<resource>META-INF/spring.schemas</resource>
    								</transformer>
    								<transformer
    									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    									<manifestEntries>
    										<Main-Class></Main-Class>
    									</manifestEntries>
    								</transformer>
    							</transformers>
    						</configuration>
    					</execution>
    				</executions>
    			</plugin>
    		</plugins>
    	</build>
    </project>
  • 相关阅读:
    mysql常用指令
    mysql数据库文件简介和应用
    redis配置参数简介
    shell输入输出重定向
    memcached添加日志输出
    java 随机数种子
    统计学习方法——第四章朴素贝叶斯及c++实现
    统计学习方法——第二章的c++实现
    python函数带不带括号的问题
    numpy中的range()
  • 原文地址:https://www.cnblogs.com/gmhappy/p/9472433.html
Copyright © 2011-2022 走看看