zoukankan      html  css  js  c++  java
  • 连接mongodb,kafka异步处理代码

    1. mongodb异步处理

    依赖:

            <dependencies>
                <dependency>
                    <groupId>org.mongodb</groupId>
                    <artifactId>mongodb-driver-async</artifactId>
                    <version>3.0.4</version>
                </dependency>
            </dependencies>    

    代码

    public static void main(String[] args) {
            List<ServerAddress> address=new ArrayList<>();
            address.add(new ServerAddress("172.16.4.90",3000));
            address.add(new ServerAddress("172.16.4.91",3000));
            address.add(new ServerAddress("172.16.4.92",3000));
            ClusterSettings clusterSettings = ClusterSettings.builder().hosts(address).build();
            MongoClientSettings settings = MongoClientSettings.builder().clusterSettings(clusterSettings).build();
            MongoClient mongoClient = MongoClients.create(settings);
    
            MongoDatabase database = mongoClient.getDatabase("shardb");
            MongoCollection<Document> collection = database.getCollection("shardtable");
            
            Document doc = new Document("name", "MongoDB")
            .append("type", "database")
            .append("count", 1)
            .append("info", new Document("x", 203).append("y", 102));
            Long start=System.currentTimeMillis();
            collection.insertOne(doc, new SingleResultCallback<Void>() {
                @Override
                public void onResult(final Void result, final Throwable t) {
                    System.out.println("Inserted cosume="+(System.currentTimeMillis()-start));
                }
            });
             System.out.println("response cosume="+(System.currentTimeMillis()-start));
        }

    2.kafka异步处理

    依赖:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.9.0.0</version>
            </dependency>

    代码

    public static void main(String[] args) {
             Properties props = new Properties();
             props.put("bootstrap.servers", "172.16.4.93:9092,172.16.4.94:9092,172.16.4.95:9092");
             props.put("acks", "all");
             props.put("retries", 0);
             props.put("batch.size", 16384);
             props.put("linger.ms", 1);
             props.put("buffer.memory", 33554432);
             props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
             props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
             Producer<String, String> producer = new KafkaProducer(props);
             Long start=System.currentTimeMillis();
             for(int i = 0; i < 100; i++){
                //Future<RecordMetadata>  response= 
                producer.send(new ProducerRecord<String, String>("davidwang456", Integer.toString(i), Integer.toString(i)),
                        new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        if(e != null){
                            e.printStackTrace();
                            System.out.println("The offset of the record we just sent is: " + metadata.offset());
                        }
                    }});
    /*            if(response.isDone()){
                    System.out.println("send message to david1 message key="+i+",value="+i);
                }    */         
             }
             System.out.println(System.currentTimeMillis()-start);
             producer.close();
        }

    结果

    1. kafka的异步处理结果可以打印出来。

    2. mongodb的异步处理结果没有打印出来。

  • 相关阅读:
    洛谷P1586 四方定理
    洛谷P3807 【模板】卢卡斯定理exgcd
    扩展中国剩余定理详解
    HDU 1573 X问题
    POJ 2891 Strange Way to Express Integers
    中国剩余定理详解
    java.lang.UnsupportedClassVersionError: Bad version number in .class file 解决方案
    MySQL对于有大量重复数据表的处理方法
    【一牛鸣】下周或触发新变盘9.16
    Django里面的RequestContext
  • 原文地址:https://www.cnblogs.com/davidwang456/p/5093430.html
Copyright © 2011-2022 走看看