import org.apache.spark.{SparkContext, SparkConf} import scala.math.random /** * 利用spark进行圆周率的计算 */ object test { def main(args: Array[String]) {
// 这个是远程连接集群主机测试代码 val conf = new SparkConf().setAppName("SparkPai").setMaster("spark://192.168.1.116:7077").setJars(List("D:\IntelliJ IDEA 15.0.2\workplace\test\out\artifacts\test_jar\test.jar")) val sc = new SparkContext(conf) //分片数 val slices = if (args.length > 0) args(0).toInt else 2 //为避免溢出,n不超过int的最大值 val n = math.min(10000L*slices, Int.MaxValue).toInt //计数 val count = sc.parallelize(1 until n, slices).map{ lines => //小于1的随机数 val x = random*2 - 1 //小于1的随机数 val y = random*2 - 1 //点到圆心的的值,小于1计数一次,超出1就不计算 if (x*x + y*y < 1) 1 else 0 }.reduce(_+_) //汇总累加落入的圆中的次数 //count / n是概率,count落入圆中次的数,n是总次数; println("Pai is roughly " + 4.0 * count / n) sc.stop() } }
2.本地测试。(一般基本没有多大用)
import java.io.File import org.apache.spark.{SparkContext, SparkConf} import scala.math.random /** * 利用spark进行圆周率的计算 * Created by 汪本成 on 2016/6/10. */ object T1 { def main(args: Array[String]) { //--- val path = new File(".").getCanonicalPath() //File workaround = new File("."); System.getProperties().put("hadoop.home.dir", path); new File("./bin").mkdirs(); new File("./bin/winutils.exe").createNewFile(); //--- val conf = new SparkConf().setAppName("SparkPai").setMaster("local[4]") val sc = new SparkContext(conf) //分片数 val slices = if (args.length > 0) args(0).toInt else 2 //为避免溢出,n不超过int的最大值 val n = math.min(10000L*slices, Int.MaxValue).toInt //计数 val count = sc.parallelize(1 until n, slices).map{ lines => //小于1的随机数 val x = random*2 - 1 //小于1的随机数 val y = random*2 - 1 //点到圆心的的值,小于1计数一次,超出1就不计算 if (x*x + y*y < 1) 1 else 0 }.reduce(_+_) //汇总累加落入的圆中的次数 //count / n是概率,count落入圆中次的数,n是总次数; println("Pai is roughly " + 4.0 * count / n) sc.stop() } }
-------------------------------------------------------------------------------------以下是kafka的生产者和消费者----------------------------------------------------------------------------------
生产者
import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ProducerKafKa { private KafkaProducer<String, String> producer; private Properties properties; public ProducerKafKa() { properties = new Properties(); properties.put("bootstrap.servers", "192.168.1.116:9092,192.168.1.118:9092,168.1.119:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(properties); } public void sendRecorder(String key, String value) { ProducerRecord<String, String> record = new ProducerRecord<String, String>("world", key, value); producer.send(record); } public void assignPartitionSend(String key,String value){ ProducerRecord<String, String> record = new ProducerRecord<String, String>("world", key, value); producer.send(record); } public void sendRecordWithCallback(String key, String value) { final Logger logger = LoggerFactory.getLogger(ProducerKafKa.class); ProducerRecord<String, String> record = new ProducerRecord<String, String>("world", key, value); producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { logger.info("存储位置:partition:" + metadata.partition() + ",offset:" + metadata.offset() + ",ts:"); ; } else { exception.printStackTrace(); } } }); } public void close() { producer.flush(); producer.close(); } public void getMetrics(){ Logger logger = LoggerFactory.getLogger(ProducerKafKa.class); Map<MetricName, Metric> metrics = (Map<MetricName, Metric>) producer.metrics(); for (MetricName name : metrics.keySet()) { logger.info(name.name()+":"+metrics.get(name).value()); } } public static void main(String[] args) { ProducerKafKa client = new ProducerKafKa(); for (int i = 0; i < 100; i++) { client.sendRecorder("key" + i, "value" + i); } client.close(); } }
消费者(类型Ⅰ)
package test;
/**
* Created by guest2 on 2018/3/10.
* 创建消费者代码
*/
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
* @author Joker
* 自己控制偏移量提交
* 很多时候,我们是希望在获得消息并经过一些逻辑处理后,才认为该消息已被消费,这可以通过自己控制偏移量提交来实现。
*/
public class ManualOffsetConsumer {
private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class);
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
//设置brokerServer(kafka)ip地址
props.put("bootstrap.servers", "192.168.1.116:9092,192.168.1.117:9092,192.168.1.119:9092");
//设置consumer group name
props.put("group.id","mygroup11");
props.put("enable.auto.commit", "false");
//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//设置心跳时间
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
consumer.subscribe(Arrays.asList("test"));//主题?
final int minBatchSize = 5; //批量提交数量
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(30000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("consumer message values is----: "+record.value()+" and the offset is "+ record.offset());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
System.out.println("---now commit offset"+buffer.size());
consumer.commitSync();
buffer.clear();
}
}
}
}
linux和idea消费的信息图下
消费者(类型Ⅱ)
package com.you.bd17; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; public class MenulConsumer { private Properties properties = new Properties(); private KafkaConsumer<String, String> consumer; public MenulConsumer(){ properties.setProperty("bootstrap.servers", "master:9092,slave1:9092"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("group.id", "java_group"); // 设置取消自动提交offset properties.setProperty("enable.auto.commit", "false"); properties.setProperty("auto.offset.reset", "none"); consumer = new KafkaConsumer<String, String>(properties); } // 获取topic的offset值 public void getOffsets(){ OffsetAndMetadata offsets = consumer.committed(new TopicPartition("from-java", 1)); System.out.println(offsets + ":" + offsets.offset()); } public void subscribeTopic(){ List<String> topics = new ArrayList<String>(); topics.add("from-java"); consumer.subscribe(topics); while(true){ ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.println("partition:" + record.partition() + ",offset:" + record.offset() +",key:" + record.key() + ",value:" + record.value()); } consumer.commitSync(); } } // 指定分区消费,指定offset的值处开始消费 // 对topic 的消费有两种方式,第一是:consumer.subscribe(topics); // 第二是:consumer.assign(topicPartitions); // 两种方式互斥,只能选择一种 public void consumerAssignerd(){ /*List<String> topics = new ArrayList<String>(); topics.add("from-java"); consumer.subscribe(topics);*/ // 指定分区 List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>(); topicPartitions.add(new TopicPartition("from-java", 0)); consumer.assign(topicPartitions); // 指定的分区的offset消费 consumer.seek(new TopicPartition("from-java", 0), 20); while(true){ ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.println("partition:" + record.partition() + ",offset:" + record.offset() +",key:" + record.key() + ",value:" + record.value()); } } } // 设置提交的offset public void setCommitOffset() { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>(); offsets.put(new TopicPartition("from-java", 0), new OffsetAndMetadata(20)); // 指定位置提交某个分区的offset的值,这个会在下一次拉取数据之前生效 consumer.commitSync(offsets); List<String> topics = new ArrayList<String>(); topics.add("from-java"); consumer.subscribe(topics); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { if (record.partition() == 0) { System.out.println("partition:" + record.partition() + ",offset:" + record.offset() + ",key:" + record.key() + ",value:" + record.value()); } } } } // public void exactlyOnceConsumer(){ // 1 配置上参数 properties.setProperty("enable.auto.commit", "false"); // 2 订阅主题或者分区 // consumer.subscribe(topics); // 重设offset (offset的值需要从mysql中获取) // 3 从mysql 中获取perxon topic每个分区的值 , 使用: // 4.1 consumer.commitSync(offset);提交到kafka服务器上 // 或者使用 // 4.2 consumer.seek(new TopicPartition("from-java", 0), 20); // 来指定要从kafka中高消费数据的初始位置 // 5 poll 数据 // records = consumer.poll(1000); // 6 遍历数据进行分析计算 // 7 计算结束之后用consumer.committed(new TopicPartition("from-java", 1));方法 // 获取当前已经消费的offset计算 // 8 把计算结果和offset的值以原子的操作(事务)的形式保存到mysql数据库 // 9 重新调到第5步循环执行,进行下一次的poll和下一次计算 } public static void main(String[] args) { MenulConsumer menulConsumer = new MenulConsumer(); //menulConsumer.subscribeTopic(); //menulConsumer.getOffsets(); //menulConsumer.consumerAssignerd(); menulConsumer.setCommitOffset(); } }
maven依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zhiyou.bd17</groupId> <artifactId>KafkaTest</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-sinks/flume-ng-kafka-sink --> <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-kafka-sink</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> </dependency> </dependencies> </project>
消费者(类型Ⅲ)
package com.zhiyou.bd17; import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class ProducerConsumer { private Properties properties = new Properties(); private KafkaConsumer<String, String> consumer; // 初始化配置 public ProducerConsumer(){ properties.setProperty("bootstrap.servers", "192.168.1.116:9092,192.168.1.118:9092,168.1.119:9092"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("group.id", "java_group"); consumer = new KafkaConsumer<String, String>(properties); } // 订阅topic public void subscribeTopic(){ List<String> topics = new ArrayList<String>(); topics.add("kafkademo"); consumer.subscribe(topics); // 循环从kafka中拉取数据 while(true){ // 从kafka中拉取数据 ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.println("接收到的消息:partition:" + record.partition() + ",offset:" + record.offset() + ",key" + record.key() + ",value:" + record.value()); } } } public static void main(String[] args) { ProducerConsumer producerConsumer = new ProducerConsumer(); producerConsumer.subscribeTopic(); } }
-------------------------------上比较乱.下面创建一个生产者,一个消费者.它俩是配套------------------------------
消费者
/** * Created by guest2 on 2018/3/10. * 创建消费者代码 */ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; /** * @author Joker * 自己控制偏移量提交 * 很多时候,我们是希望在获得消息并经过一些逻辑处理后,才认为该消息已被消费,这可以通过自己控制偏移量提交来实现。 */ public class ManualOffsetConsumer { private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class); public static void main(String[] args) { // TODO Auto-generated method stub Properties props = new Properties(); //设置brokerServer(kafka)ip地址 props.put("bootstrap.servers", "192.168.1.116:9092,192.168.1.117:9092,192.168.1.119:9092"); //设置consumer group name props.put("group.id","mygroup11"); props.put("enable.auto.commit", "false"); //设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset //如果采用latest,消费者只能得道其启动后,生产者生产的消息 props.put("auto.offset.reset", "earliest"); //设置心跳时间 props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props); consumer.subscribe(Arrays.asList("test"));//主题? final int minBatchSize = 5; //批量提交数量 List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(30000); for (ConsumerRecord<String, String> record : records) { System.out.println("consumer message values is----: "+record.value()+" and the offset is "+ record.offset()); buffer.add(record); } if (buffer.size() >= minBatchSize) { System.out.println("---now commit offset"+buffer.size()); consumer.commitSync(); buffer.clear(); } } } }
消费者
import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; import kafka.javaapi.producer.Producer; import java.util.Properties; import java.util.concurrent.TimeUnit; /** * Created by zhanghuayan on 2017/1/22. */ public class ProducerTest extends Thread { private String topic; public ProducerTest(String topic) { super(); this.topic = topic; } public void run() { Producer producer = createProducer(); int i=0; while(true){ producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } private Producer<Integer, String> createProducer() { Properties properties = new Properties(); properties.put("zookeeper.connect", "192.168.1.116:2181");//声明zk properties.put("serializer.class", StringEncoder.class.getName()); properties.put("metadata.broker.list", "192.168.1.116:9092");// 声明kafka broker return new Producer<Integer, String>(new ProducerConfig(properties)); } public static void main(String[] args){ new ProducerTest("test").start(); } }