zoukankan      html  css  js  c++  java
  • spark日志+hivesql

    windows本地读取hive,需要在resource里面将集群中的hive-site.xml下载下来。

    <?xml version="1.0" encoding="UTF-8"?>
    
    <!--Autogenerated by Cloudera Manager-->
    <configuration>
      <property>
        <name>hive.metastore.local</name>
        <value>false</value>
      </property>
      <property>
        <name>hive.metastore.uris</name>
        <value>thrift://bn00:9083</value>
      </property>
      <property>
        <name>hive.metastore.client.socket.timeout</name>
        <value>300</value>
      </property>
      <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/user/hive/warehouse</value>
      </property>
      <property>
        <name>hive.warehouse.subdir.inherit.perms</name>
        <value>true</value>
      </property>
      <property>
        <name>hive.auto.convert.join</name>
        <value>true</value>
      </property>
      <property>
        <name>hive.auto.convert.join.noconditionaltask.size</name>
        <value>20971520</value>
      </property>
      <property>
        <name>hive.optimize.bucketmapjoin.sortedmerge</name>
        <value>false</value>
      </property>
      <property>
        <name>hive.smbjoin.cache.rows</name>
        <value>10000</value>
      </property>
      <property>
        <name>hive.server2.logging.operation.enabled</name>
        <value>true</value>
      </property>
      <property>
        <name>hive.server2.logging.operation.log.location</name>
        <value>/var/log/hive/operation_logs</value>
      </property>
      <property>
        <name>mapred.reduce.tasks</name>
        <value>-1</value>
      </property>
      <property>
        <name>hive.exec.reducers.bytes.per.reducer</name>
        <value>67108864</value>
      </property>
      <property>
        <name>hive.exec.copyfile.maxsize</name>
        <value>33554432</value>
      </property>
      <property>
        <name>hive.exec.reducers.max</name>
        <value>1099</value>
      </property>
      <property>
        <name>hive.vectorized.groupby.checkinterval</name>
        <value>4096</value>
      </property>
      <property>
        <name>hive.vectorized.groupby.flush.percent</name>
        <value>0.1</value>
      </property>
      <property>
        <name>hive.compute.query.using.stats</name>
        <value>true</value>
      </property>
      <property>
        <name>hive.vectorized.execution.enabled</name>
        <value>true</value>
      </property>
      <property>
        <name>hive.vectorized.execution.reduce.enabled</name>
        <value>false</value>
      </property>
      <property>
        <name>hive.merge.mapfiles</name>
        <value>true</value>
      </property>
      <property>
        <name>hive.merge.mapredfiles</name>
        <value>false</value>
      </property>
      <property>
        <name>hive.cbo.enable</name>
        <value>true</value>
      </property>
      <property>
        <name>hive.fetch.task.conversion</name>
        <value>minimal</value>
      </property>
      <property>
        <name>hive.fetch.task.conversion.threshold</name>
        <value>268435456</value>
      </property>
      <property>
        <name>hive.limit.pushdown.memory.usage</name>
        <value>0.1</value>
      </property>
      <property>
        <name>hive.merge.sparkfiles</name>
        <value>true</value>
      </property>
      <property>
        <name>hive.merge.smallfiles.avgsize</name>
        <value>16777216</value>
      </property>
      <property>
        <name>hive.merge.size.per.task</name>
        <value>268435456</value>
      </property>
      <property>
        <name>hive.optimize.reducededuplication</name>
        <value>true</value>
      </property>
      <property>
        <name>hive.optimize.reducededuplication.min.reducer</name>
        <value>4</value>
      </property>
      <property>
        <name>hive.map.aggr</name>
        <value>true</value>
      </property>
      <property>
        <name>hive.map.aggr.hash.percentmemory</name>
        <value>0.5</value>
      </property>
      <property>
        <name>hive.optimize.sort.dynamic.partition</name>
        <value>false</value>
      </property>
      <property>
        <name>hive.execution.engine</name>
        <value>mr</value>
      </property>
      <property>
        <name>spark.executor.memory</name>
        <value>1277794713</value>
      </property>
      <property>
        <name>spark.driver.memory</name>
        <value>966367641</value>
      </property>
      <property>
        <name>spark.executor.cores</name>
        <value>6</value>
      </property>
      <property>
        <name>spark.yarn.driver.memoryOverhead</name>
        <value>102</value>
      </property>
      <property>
        <name>spark.yarn.executor.memoryOverhead</name>
        <value>135</value>
      </property>
      <property>
        <name>spark.dynamicAllocation.enabled</name>
        <value>true</value>
      </property>
      <property>
        <name>spark.dynamicAllocation.initialExecutors</name>
        <value>1</value>
      </property>
      <property>
        <name>spark.dynamicAllocation.minExecutors</name>
        <value>1</value>
      </property>
      <property>
        <name>spark.dynamicAllocation.maxExecutors</name>
        <value>2147483647</value>
      </property>
      <property>
        <name>hive.metastore.execute.setugi</name>
        <value>true</value>
      </property>
      <property>
        <name>hive.support.concurrency</name>
        <value>true</value>
      </property>
      <property>
        <name>hive.zookeeper.quorum</name>
        <value>bn00,bn01,bn02</value>
      </property>
      <property>
        <name>hive.zookeeper.client.port</name>
        <value>2181</value>
      </property>
      <property>
        <name>hive.zookeeper.namespace</name>
        <value>hive_zookeeper_namespace_hive</value>
      </property>
      <property>
        <name>hbase.zookeeper.quorum</name>
        <value>bn00,bn01,bn02</value>
      </property>
      <property>
        <name>hbase.zookeeper.property.clientPort</name>
        <value>2181</value>
      </property>
      <property>
        <name>hive.cluster.delegation.token.store.class</name>
        <value>org.apache.hadoop.hive.thrift.MemoryTokenStore</value>
      </property>
      <property>
        <name>hive.server2.enable.doAs</name>
        <value>true</value>
      </property>
      <property>
        <name>hive.server2.use.SSL</name>
        <value>false</value>
      </property>
      <property>
        <name>spark.shuffle.service.enabled</name>
        <value>true</value>
      </property>
    </configuration>
    

    代码部分如下:

    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.log4j.Level;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.hive.HiveContext;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    import scala.Tuple2;
    
    import com.google.common.collect.Lists;
    
    public class HiveAndSparkSQLApp {
    
    	private static final Logger logger = LogManager.getLogger(App.class);
    	static {
    		// 设置日志级别清理
    		org.apache.log4j.Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
    		org.apache.log4j.Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF);
    	}
    
    	@SuppressWarnings("serial")
    	public static void main(String[] args) {
    		// 调试环境,spark UI:http://localhost:4040/executors/
    		SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("test")
    				.set("spark.testing.memory", "1147480000");
    		// spark streaming context
    		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
    		// spark hive context
    		final HiveContext hiveContext = new HiveContext(jssc.sparkContext());
    		// spark SQL context
    //		final SQLContext sqlContext = SQLContext.getOrCreate(jssc.sparkContext().sc());
    
    		/**
    		 *  远程的socket监听
    		 *  在节点上,执行nc -lk 9998
    		 *  若节点上没有安装nc工具,执行yum install nc.x86_64
    		 *  之后直接发送消息即可
    		 */
    		JavaDStream<String> lines = jssc.socketTextStream("node0", 9998);
    
    		lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
    
    			@Override
    			public void call(JavaRDD<String> rdd) throws Exception {
    
    				// SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
    
    				JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
    
    					@Override
    					public Row call(String t) throws Exception {
    						String[] splited = new String[] { System.currentTimeMillis() + "",
    								System.currentTimeMillis() + "", System.currentTimeMillis() + "" };
    
    						// 1.Row构建
    						return RowFactory.create(Long.valueOf(splited[0]), splited[1], Long.valueOf(splited[2]));
    					}
    
    				});
    				// 2.DF metadata专用结构体
    				// 对Row具体指定元数据信息。
    				List<StructField> structFields = new ArrayList<StructField>();
    				// 列名称 列的具体类型(Integer Or String) 是否为空一般为true,实际在开发环境是通过for循环,而不是手动添加
    				structFields.add(DataTypes.createStructField("id", DataTypes.LongType, true));
    				structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
    				structFields.add(DataTypes.createStructField("age", DataTypes.LongType, true));
    				// 构建StructType,用于最后DataFrame元数据的描述
    				StructType structType = DataTypes.createStructType(structFields);
    				// 3.构建DF
    				DataFrame personsDF = hiveContext.createDataFrame(rowRDD, structType);
    				// 4.注册为临时表
    				personsDF.registerTempTable("test");
    				
    				
    				DataFrame result = hiveContext.sql("select * from test");
    				/**
    				* 对结果进行处理,包括由DataFrame转换成为RDD<Row>,以及结果的持久化
    				*/
    				List<Row> listRow = result.javaRDD().collect();
    				for (Row row : listRow) {
    					logger.error("row:" + row);
    				}
    				hiveContext.sql("insert into recommendation_system.t111  select id from test");
    			}
    		});
    
    		// 测试流,需要存在感~
    		lines.flatMap(new FlatMapFunction<String, String>() {
    
    			public Iterable<String> call(String msg) {
    				System.err.println(msg);
    				logger.error(msg);
    				return Lists.newArrayList(" ".split(msg));
    			}
    		}).mapToPair(new PairFunction<String, String, Integer>() {
    
    			@Override
    			public Tuple2<String, Integer> call(String t) throws Exception {
    				return new Tuple2<String, Integer>(t, 1);
    			}
    
    		}).reduceByKey(new Function2<Integer, Integer, Integer>() {
    
    			@Override
    			public Integer call(Integer v1, Integer v2) throws Exception {
    				return v1 + v2;
    			}
    
    		}).print();
    		jssc.start();
    		jssc.awaitTermination();
    
    	}
    }
    
  • 相关阅读:
    深入Apache NiFi 之源码学习
    Apache NiFi 核心概念和关键特性
    运营商手机视频流量包业务日志ETL及统计分析
    HDP Hive性能调优
    redis 实现登陆次数限制
    Hadoop和Spark的Shuffer过程对比解析
    Scala-基础知识
    Python基础知识问答
    Python基础知识+计算器练习
    Sqoop架构原理及常用命令参数
  • 原文地址:https://www.cnblogs.com/hexu105/p/8085769.html
Copyright © 2011-2022 走看看