zoukankan      html  css  js  c++  java
  • 亿级别记录的mongodb批量导入Es的java代码完整实现

    针对mongodb亿级别或者十亿级别的模糊查询,效率不高,解决方式是使用Es查询,这样就需要把数据导入的ES中

    完整的代码实现如下所示:(仅供参考)

    import java.io.IOException;
    import java.net.UnknownHostException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.commons.codec.binary.Base64;
    import org.apache.http.HttpHost;
    import org.bson.types.ObjectId;
    import org.elasticsearch.action.bulk.BulkItemResponse;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.common.xcontent.XContentType;
    
    import com.mongodb.BasicDBObject;
    import com.mongodb.DB;
    import com.mongodb.DBCollection;
    import com.mongodb.DBCursor;
    import com.mongodb.DBObject;
    import com.mongodb.MongoClient;
    import com.mongodb.MongoException;
    
    
    public class Test {
    
        public static void main(String[] args) throws IOException {
            int pageSize=10000;
    
            try {
                MongoClient mongo = new MongoClient("localhost", 27017);
    
                /**** Get database ****/
                // if database doesn't exists, MongoDB will create it for you
                DB db = mongo.getDB("www");
    
                /**** Get collection / table from 'testdb' ****/
                // if collection doesn't exists, MongoDB will create it for you
                DBCollection table = db.getCollection("person");
                
                 RestHighLevelClient client = new RestHighLevelClient(
                            RestClient.builder(
                                    new HttpHost("localhost", 9200, "http")));
                DBCursor dbObjects;            
                Long cnt=table.count();
                System.out.println(table.getStats().toString());
                Long page=getPageSize(cnt,pageSize);
                ObjectId lastIdObject=null;    
                Long start=System.currentTimeMillis();
                long ss=start;
                for(Long i=0L;i<page;i++) {
                    start=System.currentTimeMillis();
                    dbObjects=getCursorForCollection(table, lastIdObject, pageSize);
                    System.out.println("第"+(i+1)+"次查询,耗时:"+(System.currentTimeMillis()-start)+" 毫秒");
                    List<DBObject> objs=dbObjects.toArray();
                    start=System.currentTimeMillis();
                    batchInsertToEsSync(client,objs,"person","doc");
                    lastIdObject=(ObjectId) objs.get(objs.size()-1).get("_id");
                    System.out.println("第"+(i+1)+"次插入,耗时:"+(System.currentTimeMillis()-start)+" 毫秒");                
                }            
                System.out.println("耗时:"+(System.currentTimeMillis()-ss)/1000+"秒");    
            } catch (UnknownHostException e) {
                e.printStackTrace();
            } catch (MongoException e) {
                e.printStackTrace();
            }
    
        
        }
        
        public static void batchInsertToEsSync(RestHighLevelClient client,List<DBObject> objs,String tableName,String type) throws IOException {
            BulkRequest bulkRequest=new BulkRequest();
            for(DBObject obj:objs) {
                IndexRequest req = new IndexRequest(tableName, type);            
                Map<String,Object> map=new HashMap<>();
                for(String key:obj.keySet()) {
                    if("_id".equalsIgnoreCase(key)) {
                        map.put("id", obj.get(key));
                    }else {
                        String valStr="";
                        Object val=obj.get(key);
                        if(val!=null) {
                            valStr=Base64.encodeBase64String(val.toString().getBytes());
                        }
                        map.put(key, valStr);
                    }
                }
                req.id(map.get("id").toString());
                req.source(map, XContentType.JSON);
                bulkRequest.add(req);
            }   
            BulkResponse bulkResponse=client.bulk(bulkRequest);
            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                if (bulkItemResponse.isFailed()) { 
                    System.out.println(bulkItemResponse.getId()+","+bulkItemResponse.getFailureMessage());
                }
            }
        }
        
        public static DBCursor getCursorForCollection(DBCollection collection,ObjectId lastIdObject,int pageSize) {
            DBCursor dbObjects=null;
            if(lastIdObject==null) {
                lastIdObject=(ObjectId) collection.findOne().get("_id");
            }
            BasicDBObject query=new BasicDBObject();
            query.append("_id",new BasicDBObject("$gt",lastIdObject));
            BasicDBObject sort=new BasicDBObject();
            sort.append("_id",1);
            dbObjects=collection.find(query).limit(pageSize).sort(sort);
            return dbObjects;
        }
        
        public static Long getPageSize(Long cnt,int pageSize) {
            return cnt%pageSize==0?cnt/pageSize:cnt/pageSize+1;
        }
  • 相关阅读:
    JS运动基础
    用setTimeout模拟QQ延时提示框
    jQuery面向对象的写法
    AngularJS学习笔记
    Scrollbar的样式
    postfix/dovecot邮件服务器
    Git 命令及git服务器
    一个分页功能的实现
    SSE(Server-Sent Events)
    qq上网正常浏览器上不了网
  • 原文地址:https://www.cnblogs.com/davidwang456/p/9909422.html
Copyright © 2011-2022 走看看