zoukankan      html  css  js  c++  java
  • Spark2.2+ES6.4.2(三十二):ES API之index的create/update/delete/open/close(创建index时设置setting,并创建index后根据avro模板动态设置index的mapping)

    要想通过ES API对es的操作,必须获取到TransportClient对象,让后根据TransportClient获取到IndicesAdminClient对象后,方可以根据IndicesAdminClient对象提供的方法对ES的index进行操作:create index,update index(update index settings,update index mapping),delete index,open index,close index。

    准备工作(创建TransportClient,IndicesAdminClient)

    第一步:导入ES6.4.2的依赖包: 

        <dependencies>
            <!--Spark -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>com.twitter</groupId>
                <artifactId>bijection-avro_2.11</artifactId>
                <version>0.9.5</version>
            </dependency>
            <dependency>
                <groupId>com.databricks</groupId>
                <artifactId>spark-avro_2.11</artifactId>
                <version>3.2.0</version>
                <type>jar</type>
            </dependency>
    
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch-spark-20_2.11</artifactId>
                <version>6.4.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>transport</artifactId>
                <version>6.4.2</version>
            </dependency>
        </dependencies>

    备注:这里依赖可能有点多,elastricsearch api操作的话就是依赖org.elasticsearch.client。

    第二步:获取TransportClient,IndicesAdminClient对象:

        /**
         * 获取ES Client API对象。
         * */
        public static TransportClient getClient() {
            Map<String, String> esOptionsMap = getSparkESCommonOptions();
    
            return getClient(esOptionsMap);
        }
    
        /**
         * 获取ES Client API对象。
         * */
        public static TransportClient getClient(Map<String, String> esOptionsMap) {
            Settings settings = Settings.builder()//
                    .put("cluster.name", esOptionsMap.get("cluster.name")) //
                    .put("client.transport.sniff", true)//
                    .build();
    
            PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings);
            TransportClient client = preBuiltTransportClient;
    
            // 192.168.1.120,192.168.1.121,192.168.1.122,192.168.1.123
            String esNodeStr = esOptionsMap.get("es.nodes");
            String[] esNodeArr = esNodeStr.split(",");
    
            try {
                for (String esNode : esNodeArr) {
                    client.addTransportAddress(new TransportAddress(InetAddress.getByName(esNode), 9300));
                }
            } catch (UnknownHostException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
    
            return client;
        }
    
        public static IndicesAdminClient getAdminClient() {
            Map<String, String> esOptionsMap = getSparkESCommonOptions();
    
            return getAdminClient(esOptionsMap);
        }
    
        public static IndicesAdminClient getAdminClient(Map<String, String> esOptionsMap) {
            TransportClient client = getClient(esOptionsMap);
            IndicesAdminClient adminClient = client.admin().indices();
            return adminClient;
        }

    备注:其中getSparkESCommonOptions()中配置对象包含:

    cluster.name=es-application
    es.nodes=192.168.1.120,192.168.1.121,192.168.1.122,192.168.1.123
    es.port=9200
    es.index.auto.create=true
    pushdown=true
    es.nodes.wan.only=true
    es.mapping.date.rich=false #//设置读取es中date数据类型字段时,把它当做string来读取。
    es.scroll.size=10000

    ES API之Exists/Create Index:

    创建index之前,需要判断index及其对应的类型是否存在,使用这个方法:

        /**
         * 是否ES包含某个索引类型
         * 
         * @param indexName
         *            index
         * @param indexType
         *            index对应的type
         * */
        public static boolean typeExists(String indexName, String indexType) {
            TypesExistsResponse typeResponse = getAdminClient().prepareTypesExists(indexName).setTypes(indexType).execute().actionGet();
            if (typeResponse.isExists()) {
                return true;
            }
            return false;
        }
    
        /**
         * 判断ES中是否存在某个index<br>
         * 是否包含类型,待验证,看别人调用时是不需要带类型的。
         * */
        public static boolean indexExists(String... indices) {
            IndicesExistsRequest request = new IndicesExistsRequest(indices);
            IndicesExistsResponse response = getAdminClient().exists(request).actionGet();
            if (response.isExists()) {
                return true;
            }
            return false;
        }

    创建index,包含两种:不指定mapping和isettings只创建一个空的index;指定mapping和settings创建复杂的index。

    创建一个空的index:

        /**
         * 创建简单索引——没有指定mapping<br>
         * 此时数据插入时,会读取数据的数据的字段名称,自动创建mapping字段(但是,存在问题数据类型不能完好的控制,比如double类型可能会被匹配为float,date类型的格式消失)
         * */
        public static boolean indexCreate(String indexName) {
            CreateIndexResponse response = getAdminClient().prepareCreate(indexName).get();
            return response.isAcknowledged();
        }

    备注:此时数据插入时,会读取数据的数据的字段名称,自动创建mapping字段(但是,存在问题数据类型不能完好的控制,比如double类型可能会被匹配为float,date类型的格式消失)

    创建复杂的index:

        /**
         * 创建复杂索引(类型/mapping),指定索引的setting和mapping,其中mappingSource是一个json数据字符串。
         * 
         * @param indexName
         *            索引名
         * @param indexType
         *            索引类型名
         * @param builder
         *            索引mapping
         */
        public static boolean indexCreate(String indexName, String indexType, XContentBuilder builder) {
            Settings settings = Settings.builder() //
                    .put("index.mapping.ignore_malformed", true)//
                    .put("index.refresh_interval", "60s") //
                    .put("index.number_of_shards", 4)//
                    .put("index.number_of_replicas", 0)//
                    .put("index.max_result_window", 500000)//
    
                    .put("index.translog.durability", "async")//
                    .put("index.translog.sync_interval", "120s")//
                    .put("index.translog.flush_threshold_size", "2gb")//
    
                    .put("index.merge.scheduler.max_thread_count", 1)//
                    .build();
    
            return indexCreate(indexName, indexType, builder, settings);
        }
    
        /**
         * 创建复杂索引(类型/mapping),指定索引的setting和mapping,其中mappingSource是一个json数据字符串。
         * 
         * @param indexName
         *            索引名
         * @param indexType
         *            索引类型名
         * @param builder
         *            索引mapping
         * @param settings
         *            索引settings<br>
         *            setting http://192.168.1.120:9200/twitter/_settings?pretty<br>
         *            "settings":<br>
         *            {<br>
         *            ----"index":<br>
         *            ----{<br>
         *            --------"mapping":<br>
         *            --------{<br>
         *            ------------"ignore_malformed":"true"<br>
         *            --------},<br>
         *            --------"refresh_interval":"60s",<br>
         *            --------"number_of_shards":"4",<br>
         *            --------"translog":<br>
         *            --------{<br>
         *            ------------"flush_threshold_size":"2048m",<br>
         *            ------------"sync_interval":"120s",<br>
         *            ------------"durability":"async"<br>
         *            --------},<br>
         *            --------"provided_name":"indexName",<br>
         *            --------"merge":{<br>
         *            ------------"scheduler":<br>
         *            ------------{<br>
         *            ----------------"max_thread_count":"1"<br>
         *            ------------}<br>
         *            --------},<br>
         *            --------"max_result_window":"500000",<br>
         *            --------"creation_date":"1540781909323",<br>
         *            --------"number_of_replicas":"0",<br>
         *            --------"uuid":"5c079b5tQrGdX0fF23xtQA",<br>
         *            --------"version":{"created":"6020499"}<br>
         *            ----}<br>
         *            }<br>
         */
        public static boolean indexCreate(String indexName, String indexType, XContentBuilder builder, Settings settings) {
            if (indexExists(indexName)) {
                return false;
            }
    
            // CreateIndexResponse准备创建索引,增加setSetting()方法可以设置setting参数,否则将会按默认设置
            CreateIndexResponse cIndexResponse = getAdminClient().prepareCreate(indexName)//
                    .setSettings(settings)// setting
                    .addMapping(indexType, builder)// type,mapping 这种方式也可以,经过测试。
                    .get();
    
            return cIndexResponse.isAcknowledged();
        }

    如何根据Avro创建动态生成Mapping呢?

        /**
         * 重建index
         * 
         * @throws IOException
         * */
        protected void createIndex(String indexName, String indexType) throws IOException {
            Tuple3<List<String>, Map<String, String>, ExpressionEncoder<Row>> src = getTargetSchema(srcSchemaKey, true);
    
            Map<String, Map<String, String>> extFields = new HashMap<String, Map<String, String>>();
            Map<String, String> insertDateProperty = new HashMap<String, String>();
            insertDateProperty.put("type", "date");
            insertDateProperty.put("format", "yyyy-MM-dd");
            extFields.put("index_date", insertDateProperty);
            Map<String, String> typeProperty = new HashMap<String, String>();
            typeProperty.put("type", "keyword");
            extFields.put("type", typeProperty);
    
            XContentBuilder mappingSource = getMapping(indexType, src._2(), extFields);
    
            if (!indexCreate(indexName, indexType, mappingSource)) {
                throw new RuntimeException("重新创建index" + indexName + "时,设置mapping失败!");
            }
        }
            /**
         * 
         * @param indexType
         *            index类型
         * @param schemaColVsTypeMap
         *            从*.avsc schema文件中读取出的字段,格式:colName vs colType
         * @param extFields
         *            新增扩展字段(在*.avsc schema文件中没有包含的字段)<br>
         * @return mapping:<br>
         *         {<br>
         *         ----"mrs_rsrp_d_2018.10.26":<br>
         *         ----{<br>
         *         --------"aliases":{},<br>
         *         --------"mappings":<br>
         *         --------{<br>
         *         -----------"_doc":{<br>
         *         -----------"properties":<br>
         *         -----------{<br>
         *         --------------"cgi":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},<br>
         *         --------------"timestamp":{"type":"long"}<br>
         *         -----------}<br>
         *         --------},<br>
         *         --------"settings":{}<br>
         *         ----}<br>
         *         }<br>
         * @throws 生成XContentBuilder时
         *             ,抛出异常。
         */
        public static XContentBuilder getMapping(String indexType, Map<String, String> schemaColVsTypeMap, Map<String, Map<String, String>> extFields)
                throws IOException {
            XContentBuilder builder = XContentFactory.jsonBuilder()//
                    .startObject()//
                    .startObject(indexType)//
                    .startObject("_all").field("enabled", false).endObject()// 是否包一个row中的所有字段作为一个大的索引字段,支持从所有列中查询
                    // .startObject("_source").field("enabled", false).endObject()// 不可以设为false,否则从es中查不到字段(其属性决定了那些字段存储到es,默认所有字段都存储,也可以通过include,exclude指定特定字段存储与不存储)
                    // .startObject("_field_names").field("enabled", false).endObject()//
                    .startObject("properties");
    
            for (Map.Entry<String, String> kv : schemaColVsTypeMap.entrySet()) {
                String colName = kv.getKey();
                String colType = kv.getValue();
    
                // "insert_time":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"},
                // "scan_start_time":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"},
                // "scan_stop_time":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"},
                if (colName.equalsIgnoreCase("scan_start_time")//
                        || colName.equalsIgnoreCase("scan_stop_time")//
                        || colName.equalsIgnoreCase("insert_time")) {
                    builder.startObject(colName) //
                            .field("type", "date")//
                            .field("format", "yyyy-MM-dd HH:mm:ss")// 也可以 yyyy/MM/dd||yyyy/MM/dd HH:mm:ss
                            .field("index", "true") // not_analyzed|analyzed
                            .endObject();
                }
                // "city_name":{"type":"text","fields":{"keyword":{"type":"keyword"}}},
                // "province_name":{"type":"text","fields":{"keyword":{"type":"keyword"}}},
                // "region_name":{"type":"text","fields":{"keyword":{"type":"keyword"}}},
                else if (colName.equalsIgnoreCase("city_name")//
                        || colName.equalsIgnoreCase("region_name")//
                        || colName.equalsIgnoreCase("province_name")) {
                    builder.startObject(colName).field("type", "keyword").endObject();
                } else {
                    if (colType.equalsIgnoreCase("long")) {
                        builder.startObject(colName).field("type", "long").endObject();
                    } else if (colType.equalsIgnoreCase("string")) {
                        builder.startObject(colName).field("type", "keyword").endObject();
                    } else if (colType.equalsIgnoreCase("double")) {
                        builder.startObject(colName).field("type", "double").endObject();
                    } else {
                        builder.startObject(colName).field("type", colType).endObject();
                    }
                }
            }
    
            // 追加扩展字段到mapping字段中
            for (Map.Entry<String, Map<String, String>> kv : extFields.entrySet()) {
                String colName = kv.getKey();
                builder.startObject(colName);
    
                for (Map.Entry<String, String> kvProperty : kv.getValue().entrySet()) {
                    builder.field(kvProperty.getKey(), kvProperty.getValue());
                }
                builder.endObject();
            }
    
            builder.endObject();// end of properties
            builder.endObject();// end of indexType
            builder.endObject();// end of start
    
            return builder;
        }
        
        /**
         * 返回 target columns list,column vs column type map,expression encoder
         * */
        protected Tuple3<List<String>, Map<String, String>, ExpressionEncoder<Row>> getTargetSchema(String schemaFilePath, boolean withTimestamp) {
            Broadcast<String> targetSchemaContent = null;
            try {
                String avroContent = getHdfsFileContent(schemaFilePath);
                targetSchemaContent = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()).broadcast(avroContent);
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
    
            Schema.Parser parser = new Schema.Parser();
            Schema targetSchema = parser.parse(targetSchemaContent.getValue());
            List<String> targetColumns = new ArrayList<String>();
            Map<String, String> targetKeyTypeItems = new LinkedHashMap<String, String>();
            for (Field field : targetSchema.getFields()) {
                targetColumns.add(field.name());
                List<Schema> types = targetSchema.getField(field.name()).schema().getTypes();
                String datatype = types.get(types.size() - 1).getName();
                targetKeyTypeItems.put(field.name(), datatype);
            }
    
            ExpressionEncoder<Row> encoder = SchemaHelper.createSchemaEncoder(targetSchema, withTimestamp);
    
            return new Tuple3<List<String>, Map<String, String>, ExpressionEncoder<Row>>(targetColumns, targetKeyTypeItems, encoder);
        }
        
        /**
        * 将schema转化为Encoder
        */
        protected static ExpressionEncoder<Row> createSchemaEncoder(Schema schema, boolean withTimestamp) {
            StructType type = (StructType) SchemaConverters.toSqlType(schema).dataType();
    
            if (withTimestamp) {
                List<String> fields = java.util.Arrays.asList(type.fieldNames());
                if (!fields.contains("timestamp")) {
                    type = type.add("timestamp", DataTypes.TimestampType);
                } else {
                    int index = type.fieldIndex("timestamp");
                    StructField field = type.fields()[index];
                    type.fields()[index] = new StructField(field.name(), DataTypes.TimestampType, field.nullable(), field.metadata());
                }
            }
    
            ExpressionEncoder<Row> encoder = RowEncoder.apply(type);
    
            return encoder;
        }
    
        /**
        * 读取hdfs上文件内容
        */
        protected static String getHdfsFileContent(String filePath){
            String content = "";
            try {
                reader = getHDFSFileReader(filePath);
                String line=null;
                while ((line = reader.readLine()) != null) {
                    if (!line.startsWith("#") && line.trim().length() > 0) {
                        content+=line.trim();
                    }
                }
    
                reader.close();
            } catch (FileNotFoundException e) {
                e.printStackTrace();
                throw new RuntimeException("file not found exception:" + this.avroSchemaPath);
            } catch (IOException e) {
                e.printStackTrace();
                throw new RuntimeException("reading file while an error was thrown:" + this.avroSchemaPath);
            } finally {
                if (reader != null) {
                    try {
                        reader.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
            }
            
            return content;
        }    
        
        protected static BufferedReader getHDFSFileReader(String hdfsFile) {
            try {
                System.out.println("hdfsfile: " + hdfsFile);
                Path configPath = new Path(hdfsFile);
    
                FileSystem fs = FileSystem.get(new Configuration());
    
                if (fs.exists(configPath)) {
                    return new BufferedReader(new InputStreamReader(fs.open(configPath)));
                } else {
                    throw new FileNotFoundException("file(" + configPath + ") not found.");
                }
            } catch (IOException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            } finally {
            }
        }

    所有代码都在这里,具体的不加介绍了。

    ES API之Update Index:

    所谓的修改index,也就是修改index的settings和mapping:

        /**
         * 修改ES索引的mapping属性
         * */
        public static boolean indexUpdateMapping(String indexName, String indexType, XContentBuilder builder) {
            org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest mapping = Requests.putMappingRequest(indexName).type(indexType)
                    .source(builder);
            PutMappingResponse pMappingResource = getAdminClient().putMapping(mapping).actionGet();
    
            return pMappingResource.isAcknowledged();
        }
    
        /**
         * 修改ES索引的settings属性<br>
         * 更新索引属性(更新索引的settings属性,这是更改已经创建的属性、但有些一旦创建不能更改,需要按照自己的需求来进行选择使用)
         * */
        public static boolean indexUpdatSettings(String indexName, Map<String, String> settingsMap) {
            Builder settings = Settings.builder();//
            for (Map.Entry<String, String> kv : settingsMap.entrySet()) {
                settings.put(kv.getKey(), kv.getValue());
            }
    
            return indexUpdatSettings(indexName, settings);
        }
    
        /**
         * 修改ES索引的settings属性<br>
         * 更新索引属性(更新索引的settings属性,这是更改已经创建的属性、但有些一旦创建不能更改,需要按照自己的需求来进行选择使用)
         * */
        public static boolean indexUpdatSettings(String indexName, Builder settings) {
            UpdateSettingsResponse uIndexResponse = getAdminClient().prepareUpdateSettings(indexName)//
                    .setSettings(settings)//
                    .execute().actionGet();
            return uIndexResponse.isAcknowledged();
        }
        /**
         * 修改索引,修改索引的setting。
         * 
         * @param indexName
         *            索引名称<br>
         *            如果不需要实时精确的查询结果,可以把每个索引的index.refresh_interval设置为30s,如果在导入大量的数据,可以把这个值先设置为-1,完成数据导入之后在设置回来<br>
         *            如果在用bulk导入大量的数据,可以考虑不要副本,设置index.number_of_replicas:
         *            0。有副本存在的时候,导入数据需要同步到副本,并且副本也要完成分析,索引和段合并的操作,影响导入性能。可以不设置副本导入数据然后在恢复副本。<br>
         *            <b>注意</b>:<br>
         *            有些属性一旦创建就不可以修改,比如:index.number_of_shards,修改会抛出异常。
         */
        public static boolean indexUpdateSettings(String indexName) {
            Settings settings = Settings.builder() //
                    // .put("index.mapping.ignore_malformed", false)//
                    .put("index.refresh_interval", "30s") //
                    // .put("index.number_of_shards", 4)//
                    .put("index.number_of_replicas", 1)//
                    // .put("index.max_result_window", 500000)//
                    //
                    // .put("index.translog.durability", "async")//
                    // .put("index.translog.sync_interval", "120s")//
                    // .put("index.translog.flush_threshold_size", "2gb")//
                    //
                    .put("index.merge.scheduler.max_thread_count", 1)//
                    .build();
            return indexUpdatSettings(indexName, settings);
        }

    ES API之Delete/Open/Close Index:

        /**
         * 删除ES中某个或者多个索引
         * */
        public static boolean indexDelete(String... indices) {
            DeleteIndexResponse dIndexResponse = getAdminClient().prepareDelete(indices).execute().actionGet();
            if (dIndexResponse.isAcknowledged()) {
                System.out.println("删除索引成功");
                return true;
            } else {
                System.out.println("删除索引失败");
                return false;
            }
        }
    
        /**
         * 关闭ES中某个或者多个索引<br>
         * curl -XPOST "http://127.0.0.1:9200/indexname/_close"
         * */
        public static boolean indexClose(String... indices) {
            CloseIndexResponse cIndexResponse = getAdminClient().prepareClose(indices).execute().actionGet();
            if (cIndexResponse.isAcknowledged()) {
                System.out.println("关闭索引成功");
                return true;
            }
            return false;
        }
    
        /**
         * 开启ES中某个或者多个索引<br>
         * curl -XPOST "http://127.0.0.1:9200/indexname/_open"
         * */
        public static boolean indexOpen(String... indices) {
            OpenIndexResponse oIndexResponse = getAdminClient().prepareOpen(indices).execute().actionGet();
            if (oIndexResponse.isAcknowledged()) {
                System.out.println("开启索引成功");
                return true;
            }
            return false;
        }
  • 相关阅读:
    12 Overlap Graphs
    11 Mortal Fibonacci Rabbits
    10 Consensus and Profile
    09 Finding a Motif in DNA
    08 Translating RNA into Protein
    07Mendel's First Law
    大数据面试题目录
    大数据面试题以及答案整理(一)
    工作流调度器之Azkaban
    VI编辑
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9900042.html
Copyright © 2011-2022 走看看