zoukankan      html  css  js  c++  java
  • Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十六)Structured Streaming中ForeachSink的用法

    Structured Streaming默认支持的sink类型有File sink,Foreach sink,Console sink,Memory sink。

    ForeachWriter实现:

    以写入redis为例

    package com.dx.streaming.producer;
    
    import org.apache.spark.sql.ForeachWriter;
    import org.apache.spark.sql.Row;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    public class TestForeachWriter extends ForeachWriter<Row> {
        private static final long serialVersionUID = 1801843595306161029L;
    
        public static JedisPool jedisPool;
        public Jedis jedis;
        static {
                JedisPoolConfig config = new JedisPoolConfig();
                config.setMaxTotal(20);
                config.setMaxIdle(5);
                config.setMaxWaitMillis(1000);
                config.setMinIdle(2);
                config.setTestOnBorrow(false);
                jedisPool = new JedisPool(config, "127.0.0.1", 6379);
        }
    
        public static synchronized Jedis getJedis() {
            return jedisPool.getResource();
        }
    
    
        @Override
        public boolean open(long partitionId, long version) {
               jedis = getJedis();
               return true;
        }
    
        @Override
        public void process(Row row) {
             jedis.set("row.key","row.value");
        }
    
        @Override
        public void close(Throwable arg0) {
             jedis.close();
        }
    }

    Structured Streaming中使用ForeachWriter示例:

    package com.dx.streaming.producer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.avro.Schema;
    import org.apache.spark.SparkConf;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Encoders;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.streaming.StreamingQuery;
    import org.apache.spark.sql.streaming.StreamingQueryException;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructType;
    
    import com.databricks.spark.avro.SchemaConverters;
    
    public class TestConsumer {
        private static final String avroFilePath = "D:\Java_Study\workspace\kafka-streaming-learn\conf\avro\userlog.avsc";
        //private static final String avroFilePath = "/user/dx/conf/avro/userlog.avsc";
        private static final String topic = "t-my";
    
        public static void main(String[] args) throws Exception {
            String appName = "Test Avro";
            SparkConf conf = new SparkConf().setMaster("local[*]").setAppName(appName);
            SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
    
            Map<String, String> kafkaOptions = new HashMap<String, String>();
            kafkaOptions.put("kafka.bootstrap.servers", "192.168.0.121:9092");
            
            Schema schema = SchemaUtil.getAvroSchemaFromHDFSFile(avroFilePath);    
            AvroParserUDF udf = new AvroParserUDF(avroFilePath);
            StructType type = (StructType) SchemaConverters.toSqlType(schema).dataType();        
            sparkSession.udf().register("deserialize", udf, DataTypes.createStructType(type.fields()));
    
            Dataset<Row> stream = sparkSession.readStream().format("kafka").options(kafkaOptions).option("subscribe", topic).option("startingOffsets", "earliest").load().select("value").as(Encoders.BINARY())
                    .selectExpr("deserialize(value) as row").select("row.*");
    
            stream.printSchema();
    
            // Print new data to console
            StreamingQuery query = stream.writeStream().foreach(new TestForeachWriter()).outputMode("update").start();
            
            try {
                query.awaitTermination();
                sparkSession.streams().awaitAnyTermination();
            } catch (StreamingQueryException e) {
                e.printStackTrace();
            }
        }
    }

    参考:Spark的那些事(二)Structured streaming中Foreach sink的用法

  • 相关阅读:
    ubuntu国内镜像源
    windows安装Pygraphviz
    python dict与collections.defaultdict的区别
    python生成 requirements.txt文件
    python list 和 dict前加星号
    Ubuntu安装Docker
    Zookeeper核心概念及读写流程
    docker安装mysql5和mysql8
    ubuntu docker更改默认镜像和容器存储位置
    训练篇-胸
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9307701.html
Copyright © 2011-2022 走看看