1. mongodb异步处理
依赖:
<dependencies> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongodb-driver-async</artifactId> <version>3.0.4</version> </dependency> </dependencies>
代码
public static void main(String[] args) { List<ServerAddress> address=new ArrayList<>(); address.add(new ServerAddress("172.16.4.90",3000)); address.add(new ServerAddress("172.16.4.91",3000)); address.add(new ServerAddress("172.16.4.92",3000)); ClusterSettings clusterSettings = ClusterSettings.builder().hosts(address).build(); MongoClientSettings settings = MongoClientSettings.builder().clusterSettings(clusterSettings).build(); MongoClient mongoClient = MongoClients.create(settings); MongoDatabase database = mongoClient.getDatabase("shardb"); MongoCollection<Document> collection = database.getCollection("shardtable"); Document doc = new Document("name", "MongoDB") .append("type", "database") .append("count", 1) .append("info", new Document("x", 203).append("y", 102)); Long start=System.currentTimeMillis(); collection.insertOne(doc, new SingleResultCallback<Void>() { @Override public void onResult(final Void result, final Throwable t) { System.out.println("Inserted cosume="+(System.currentTimeMillis()-start)); } }); System.out.println("response cosume="+(System.currentTimeMillis()-start)); }
2.kafka异步处理
依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.0</version> </dependency>
代码
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "172.16.4.93:9092,172.16.4.94:9092,172.16.4.95:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer(props); Long start=System.currentTimeMillis(); for(int i = 0; i < 100; i++){ //Future<RecordMetadata> response= producer.send(new ProducerRecord<String, String>("davidwang456", Integer.toString(i), Integer.toString(i)), new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null){ e.printStackTrace(); System.out.println("The offset of the record we just sent is: " + metadata.offset()); } }}); /* if(response.isDone()){ System.out.println("send message to david1 message key="+i+",value="+i); } */ } System.out.println(System.currentTimeMillis()-start); producer.close(); }
结果
1. kafka的异步处理结果可以打印出来。
2. mongodb的异步处理结果没有打印出来。