zoukankan      html  css  js  c++  java
  • Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十六)Structured Streaming中ForeachSink的用法

    Structured Streaming默认支持的sink类型有File sink,Foreach sink,Console sink,Memory sink。

    ForeachWriter实现:

    以写入redis为例

    package com.dx.streaming.producer;
    
    import org.apache.spark.sql.ForeachWriter;
    import org.apache.spark.sql.Row;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    public class TestForeachWriter extends ForeachWriter<Row> {
        private static final long serialVersionUID = 1801843595306161029L;
    
        public static JedisPool jedisPool;
        public Jedis jedis;
        static {
                JedisPoolConfig config = new JedisPoolConfig();
                config.setMaxTotal(20);
                config.setMaxIdle(5);
                config.setMaxWaitMillis(1000);
                config.setMinIdle(2);
                config.setTestOnBorrow(false);
                jedisPool = new JedisPool(config, "127.0.0.1", 6379);
        }
    
        public static synchronized Jedis getJedis() {
            return jedisPool.getResource();
        }
    
    
        @Override
        public boolean open(long partitionId, long version) {
               jedis = getJedis();
               return true;
        }
    
        @Override
        public void process(Row row) {
             jedis.set("row.key","row.value");
        }
    
        @Override
        public void close(Throwable arg0) {
             jedis.close();
        }
    }

    Structured Streaming中使用ForeachWriter示例:

    package com.dx.streaming.producer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.avro.Schema;
    import org.apache.spark.SparkConf;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Encoders;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.streaming.StreamingQuery;
    import org.apache.spark.sql.streaming.StreamingQueryException;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructType;
    
    import com.databricks.spark.avro.SchemaConverters;
    
    public class TestConsumer {
        private static final String avroFilePath = "D:\Java_Study\workspace\kafka-streaming-learn\conf\avro\userlog.avsc";
        //private static final String avroFilePath = "/user/dx/conf/avro/userlog.avsc";
        private static final String topic = "t-my";
    
        public static void main(String[] args) throws Exception {
            String appName = "Test Avro";
            SparkConf conf = new SparkConf().setMaster("local[*]").setAppName(appName);
            SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
    
            Map<String, String> kafkaOptions = new HashMap<String, String>();
            kafkaOptions.put("kafka.bootstrap.servers", "192.168.0.121:9092");
            
            Schema schema = SchemaUtil.getAvroSchemaFromHDFSFile(avroFilePath);    
            AvroParserUDF udf = new AvroParserUDF(avroFilePath);
            StructType type = (StructType) SchemaConverters.toSqlType(schema).dataType();        
            sparkSession.udf().register("deserialize", udf, DataTypes.createStructType(type.fields()));
    
            Dataset<Row> stream = sparkSession.readStream().format("kafka").options(kafkaOptions).option("subscribe", topic).option("startingOffsets", "earliest").load().select("value").as(Encoders.BINARY())
                    .selectExpr("deserialize(value) as row").select("row.*");
    
            stream.printSchema();
    
            // Print new data to console
            StreamingQuery query = stream.writeStream().foreach(new TestForeachWriter()).outputMode("update").start();
            
            try {
                query.awaitTermination();
                sparkSession.streams().awaitAnyTermination();
            } catch (StreamingQueryException e) {
                e.printStackTrace();
            }
        }
    }

    参考:Spark的那些事(二)Structured streaming中Foreach sink的用法

  • 相关阅读:
    【leetcode】1295. Find Numbers with Even Number of Digits
    【leetcode】427. Construct Quad Tree
    【leetcode】1240. Tiling a Rectangle with the Fewest Squares
    【leetcode】1292. Maximum Side Length of a Square with Sum Less than or Equal to Threshold
    【leetcode】1291. Sequential Digits
    【leetcode】1290. Convert Binary Number in a Linked List to Integer
    【leetcode】1269. Number of Ways to Stay in the Same Place After Some Steps
    【leetcode】1289. Minimum Falling Path Sum II
    【leetcode】1288. Remove Covered Intervals
    【leetcode】1287. Element Appearing More Than 25% In Sorted Array
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9307701.html
Copyright © 2011-2022 走看看