zoukankan      html  css  js  c++  java
  • spark入门备忘---1

    import org.apache.spark.{SparkContext, SparkConf}
    import scala.math.random
    /**
      * 利用spark进行圆周率的计算
      */
    object test {
      def main(args: Array[String]) {
    // 这个是远程连接集群主机测试代码 val conf
    = new SparkConf().setAppName("SparkPai").setMaster("spark://192.168.1.116:7077").setJars(List("D:\IntelliJ IDEA 15.0.2\workplace\test\out\artifacts\test_jar\test.jar")) val sc = new SparkContext(conf) //分片数 val slices = if (args.length > 0) args(0).toInt else 2 //为避免溢出,n不超过int的最大值 val n = math.min(10000L*slices, Int.MaxValue).toInt //计数 val count = sc.parallelize(1 until n, slices).map{ lines => //小于1的随机数 val x = random*2 - 1 //小于1的随机数 val y = random*2 - 1 //点到圆心的的值,小于1计数一次,超出1就不计算 if (x*x + y*y < 1) 1 else 0 }.reduce(_+_) //汇总累加落入的圆中的次数 //count / n是概率,count落入圆中次的数,n是总次数; println("Pai is roughly " + 4.0 * count / n) sc.stop() } }

    2.本地测试。(一般基本没有多大用)

    import java.io.File
    
    import org.apache.spark.{SparkContext, SparkConf}
    import scala.math.random
    /**
      * 利用spark进行圆周率的计算
      * Created by 汪本成 on 2016/6/10.
      */
    object T1 {
      def main(args: Array[String]) {
        //---
        val path = new File(".").getCanonicalPath()
        //File workaround = new File(".");
        System.getProperties().put("hadoop.home.dir", path);
        new File("./bin").mkdirs();
        new File("./bin/winutils.exe").createNewFile();
        //---
    
        val conf = new SparkConf().setAppName("SparkPai").setMaster("local[4]")
        val sc = new SparkContext(conf)
    
        //分片数
        val slices = if (args.length > 0) args(0).toInt else 2
        //为避免溢出,n不超过int的最大值
        val n = math.min(10000L*slices, Int.MaxValue).toInt
        //计数
        val count = sc.parallelize(1 until n, slices).map{
          lines =>
            //小于1的随机数
            val x = random*2 - 1
            //小于1的随机数
            val y = random*2 - 1
            //点到圆心的的值,小于1计数一次,超出1就不计算
            if (x*x + y*y < 1) 1 else 0
        }.reduce(_+_)    //汇总累加落入的圆中的次数
    
        //count / n是概率,count落入圆中次的数,n是总次数;
        println("Pai is roughly " + 4.0 * count / n)
        sc.stop()
      }
    }

    -------------------------------------------------------------------------------------以下是kafka的生产者和消费者----------------------------------------------------------------------------------

    生产者

    import java.util.Map;
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.Metric;
    import org.apache.kafka.common.MetricName;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class ProducerKafKa {
        private KafkaProducer<String, String> producer;
        private Properties properties;
    
        public ProducerKafKa() {
            properties = new Properties();
            properties.put("bootstrap.servers", "192.168.1.116:9092,192.168.1.118:9092,168.1.119:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            producer = new KafkaProducer<String, String>(properties);
        }
    
        
        public void sendRecorder(String key, String value) {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("world", key, value);
            producer.send(record);
    
        }
        
        
        public void assignPartitionSend(String key,String value){
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("world", key, value);
            producer.send(record);
        }
        
        public void sendRecordWithCallback(String key, String value) {
            final Logger logger = LoggerFactory.getLogger(ProducerKafKa.class);
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("world", key, value);
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        logger.info("存储位置:partition:" + metadata.partition() + ",offset:" + metadata.offset() + ",ts:");
                        ;
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
    
        public void close() {
            producer.flush();
            producer.close();
        }
        
        public void getMetrics(){
            Logger logger = LoggerFactory.getLogger(ProducerKafKa.class);
            
            Map<MetricName, Metric> metrics = (Map<MetricName, Metric>) producer.metrics();
            for (MetricName name : metrics.keySet()) {
                logger.info(name.name()+":"+metrics.get(name).value());
            }
        }
     
        public static void main(String[] args) {
            ProducerKafKa client = new ProducerKafKa();
            for (int i = 0; i < 100; i++) {
                client.sendRecorder("key" + i, "value" + i);
            }
            client.close();
        }
    
    }

    消费者(类型Ⅰ)

    package test;

    /**
    * Created by guest2 on 2018/3/10.
    * 创建消费者代码
    */
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;

    /**
    * @author Joker
    * 自己控制偏移量提交
    * 很多时候,我们是希望在获得消息并经过一些逻辑处理后,才认为该消息已被消费,这可以通过自己控制偏移量提交来实现。
    */
    public class ManualOffsetConsumer {
    private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class);

    public static void main(String[] args) {
    // TODO Auto-generated method stub
    Properties props = new Properties();
    //设置brokerServer(kafka)ip地址
    props.put("bootstrap.servers", "192.168.1.116:9092,192.168.1.117:9092,192.168.1.119:9092");
    //设置consumer group name
    props.put("group.id","mygroup11");
    props.put("enable.auto.commit", "false");
    //设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
    //如果采用latest,消费者只能得道其启动后,生产者生产的消息
    props.put("auto.offset.reset", "earliest");
    //设置心跳时间
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
    consumer.subscribe(Arrays.asList("test"));//主题?
    final int minBatchSize = 5; //批量提交数量
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(30000);
    for (ConsumerRecord<String, String> record : records) {
    System.out.println("consumer message values is----: "+record.value()+" and the offset is "+ record.offset());
    buffer.add(record);
    }
    if (buffer.size() >= minBatchSize) {
    System.out.println("---now commit offset"+buffer.size());
    consumer.commitSync();
    buffer.clear();
    }
    }
    }
    }

    linux和idea消费的信息图下

    消费者(类型Ⅱ)

    package com.you.bd17;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    
    public class MenulConsumer {
        private Properties properties = new Properties();
        private KafkaConsumer<String, String> consumer;
        
        public MenulConsumer(){
            properties.setProperty("bootstrap.servers", "master:9092,slave1:9092");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("group.id", "java_group");
            // 设置取消自动提交offset
            properties.setProperty("enable.auto.commit", "false");
            properties.setProperty("auto.offset.reset", "none");
            consumer = new KafkaConsumer<String, String>(properties);
        }
        
        
        // 获取topic的offset值
        public void getOffsets(){
            OffsetAndMetadata offsets = consumer.committed(new TopicPartition("from-java", 1));
            System.out.println(offsets + ":" + offsets.offset());
        }
        
        
        public void subscribeTopic(){
            List<String> topics = new ArrayList<String>();
            topics.add("from-java");
            consumer.subscribe(topics);
            while(true){
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("partition:" + record.partition() + ",offset:" + record.offset() +",key:" + record.key() + ",value:" + record.value());
                }
                consumer.commitSync();
            }
        }
    
        // 指定分区消费,指定offset的值处开始消费
        // 对topic 的消费有两种方式,第一是:consumer.subscribe(topics);
        //                         第二是:consumer.assign(topicPartitions);
        // 两种方式互斥,只能选择一种
        public  void consumerAssignerd(){
            /*List<String> topics = new  ArrayList<String>();
            topics.add("from-java");
            consumer.subscribe(topics);*/
            // 指定分区
            List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
            topicPartitions.add(new TopicPartition("from-java", 0));
            consumer.assign(topicPartitions);
            // 指定的分区的offset消费
            consumer.seek(new TopicPartition("from-java", 0), 20);
            while(true){
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("partition:" + record.partition() + ",offset:" + record.offset() +",key:" + record.key() + ",value:" + record.value());
                }
            }
        }
        
        // 设置提交的offset
        public void setCommitOffset() {
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
            offsets.put(new TopicPartition("from-java", 0), new OffsetAndMetadata(20));
            // 指定位置提交某个分区的offset的值,这个会在下一次拉取数据之前生效
            consumer.commitSync(offsets);
            List<String> topics = new ArrayList<String>();
            topics.add("from-java");
            consumer.subscribe(topics);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    if (record.partition() == 0) {
                        System.out.println("partition:" + record.partition() + ",offset:" + record.offset() + ",key:"
                                + record.key() + ",value:" + record.value());
                    }
                }
            }
        }
        
        // 
        public void exactlyOnceConsumer(){
            // 1 配置上参数
            properties.setProperty("enable.auto.commit", "false");
            
            // 2 订阅主题或者分区
            // consumer.subscribe(topics);
            // 重设offset (offset的值需要从mysql中获取)
            
            // 3 从mysql 中获取perxon topic每个分区的值  , 使用:
            // 4.1 consumer.commitSync(offset);提交到kafka服务器上
            // 或者使用
            // 4.2 consumer.seek(new TopicPartition("from-java", 0), 20);
            // 来指定要从kafka中高消费数据的初始位置
            
            
            
            // 5 poll 数据
            // records = consumer.poll(1000);
             
            // 6 遍历数据进行分析计算
            
            // 7 计算结束之后用consumer.committed(new TopicPartition("from-java", 1));方法
            // 获取当前已经消费的offset计算
            
            // 8 把计算结果和offset的值以原子的操作(事务)的形式保存到mysql数据库
            
            // 9 重新调到第5步循环执行,进行下一次的poll和下一次计算
            
        } 
        
        
        
        public static void main(String[] args) {
            MenulConsumer menulConsumer = new MenulConsumer();
            //menulConsumer.subscribeTopic();
            //menulConsumer.getOffsets();
            //menulConsumer.consumerAssignerd();
            menulConsumer.setCommitOffset();
        }
    }

    maven依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.zhiyou.bd17</groupId>
        <artifactId>KafkaTest</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    
        <dependencies>
            <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-sinks/flume-ng-kafka-sink --> <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-kafka-sink</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> </dependency> </dependencies> </project>

    消费者(类型Ⅲ)

    package com.zhiyou.bd17;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    public class ProducerConsumer {
    
        private Properties properties = new Properties();
        private KafkaConsumer<String, String> consumer;
    
        // 初始化配置
        public ProducerConsumer(){
            properties.setProperty("bootstrap.servers", "192.168.1.116:9092,192.168.1.118:9092,168.1.119:9092");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("group.id", "java_group");
            consumer = new KafkaConsumer<String, String>(properties);
        }
        
        // 订阅topic
        public void subscribeTopic(){
            List<String> topics = new ArrayList<String>();
            topics.add("kafkademo");
            consumer.subscribe(topics);
            // 循环从kafka中拉取数据
            while(true){
                // 从kafka中拉取数据
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("接收到的消息:partition:" + record.partition() + ",offset:" + record.offset()
                            + ",key" + record.key() + ",value:" + record.value());
                }
            }
        }
        
    
        
        public static void main(String[] args) {
            ProducerConsumer producerConsumer = new ProducerConsumer();
            producerConsumer.subscribeTopic();
        }
    
    }

     -------------------------------上比较乱.下面创建一个生产者,一个消费者.它俩是配套------------------------------

    消费者

    /**
     * Created by guest2 on 2018/3/10.
     * 创建消费者代码
     */
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * @author Joker
     * 自己控制偏移量提交
     * 很多时候,我们是希望在获得消息并经过一些逻辑处理后,才认为该消息已被消费,这可以通过自己控制偏移量提交来实现。
     */
    public class ManualOffsetConsumer {
        private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class);
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            Properties props = new Properties();
            //设置brokerServer(kafka)ip地址
            props.put("bootstrap.servers", "192.168.1.116:9092,192.168.1.117:9092,192.168.1.119:9092");
            //设置consumer group name
            props.put("group.id","mygroup11");
            props.put("enable.auto.commit", "false");
            //设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
            //如果采用latest,消费者只能得道其启动后,生产者生产的消息
            props.put("auto.offset.reset", "earliest");
            //设置心跳时间
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
            consumer.subscribe(Arrays.asList("test"));//主题?
            final int minBatchSize = 5;  //批量提交数量
            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(30000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("consumer message values is----: "+record.value()+" and the offset is "+ record.offset());
                    buffer.add(record);
                }
                if (buffer.size() >= minBatchSize) {
                    System.out.println("---now commit offset"+buffer.size());
                    consumer.commitSync();
                    buffer.clear();
                }
            }
        }
    }

    消费者

    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import kafka.serializer.StringEncoder;
    import kafka.javaapi.producer.Producer;
    
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Created by zhanghuayan on 2017/1/22.
     */
    public class ProducerTest extends Thread {
        private String topic;
    
        public ProducerTest(String topic) {
            super();
            this.topic = topic;
        }
    
        public void run() {
            Producer producer = createProducer();
            int i=0;
            while(true){
                producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private Producer<Integer, String> createProducer() {
            Properties properties = new Properties();
            properties.put("zookeeper.connect", "192.168.1.116:2181");//声明zk
            properties.put("serializer.class", StringEncoder.class.getName());
            properties.put("metadata.broker.list", "192.168.1.116:9092");// 声明kafka broker
            return new Producer<Integer, String>(new ProducerConfig(properties));
        }
    
        public static void main(String[] args){
            new ProducerTest("test").start();
        }
    }
  • 相关阅读:
    2012年决胜HTML5 十四大Web预测盘点
    王海波:Discuz! X 社区功能架构
    史应生:Linux操作系统的性能优化技术
    虚拟还原原理解析
    金山张宴:PHP在金山游戏运营中的应用
    java方法的参数传递
    java逻辑连接词总结
    java名称命名规范
    java引用包的两种方式
    java信息的封装和隐藏
  • 原文地址:https://www.cnblogs.com/kaiwen1/p/8574845.html
Copyright © 2011-2022 走看看