zoukankan      html  css  js  c++  java
  • Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十)ES6.2.2 Client API

    scala版本2.11

    java版本1.8

    spark版本2.2.1

    es版本6.2.2

    hadoop版本2.9.0

    elasticsearch节点列表:

    192.168.0.120
    192.168.0.121
    192.168.0.122

    内容导航:

    1)首先,讲解使用elasticsearch client api讲解如何创建(删除、修改、查询)index,type,mapping;对数据进行增删改查。

    2)然后,讲解如何使用在spark下写入elasticsearch。

    3)最后,讲解如何读取kafka上的数据,然后读取kafka上数据流写入es。

    使用elasticsearch client api

    Client

    Client是一个类,可以通过该类实现对ES集群各种操作:index/get/delete/search操作,以及对ES集群的管理任务。

    Client的构造需要基于TransportClient。

    TransportClient

    TransportClient可以远程连接到ES集群,通过一个传输模块,但是它不真正的连接到集群,只是获取集群的一个或多个初始传输地址,在每次请求动作时,才真正连接到ES集群。

    Settgings

    Settings类主要是在启动Client之前,配置一些属性参数,主要配置集群名称cluster name,还有其他参数:

    client.transport.sniff:是否为传输client添加嗅探功能;

    client.transport.ignore_cluster_name 设为true,或略连接点的集群名称验证;

    client.transport.ping_timeout 设置ping节点的时间超时时长,默认5s;

    client.transport.nodes_sample_interval 设置sample/ping nodes listed间隔时间,默认5s。

    初始化client的示例如下:

    1)ClientTools.java(单利方式提供TransportClient对象,关于如何创建client参考《https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html》)

    package com.dx.es;
    
    import java.net.InetAddress;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.TransportAddress;
    import org.elasticsearch.transport.client.PreBuiltTransportClient;
    
    public class ClientTools {
        private static ClientTools instance=null;
        private TransportClient client=null;
        
        private ClientTools(){
            this.client=null;
            init();
        }
            
        public static synchronized ClientTools getInstance(){
            if(instance==null){
                instance=new ClientTools();
            }
            return instance;
        }
        
        public TransportClient get(){
            return client;
        }
        
        public void close(){
            if(null != client){
                try {
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        
        private void init() {
            if(null != this.client){
                return;
            }
            
            try {
                Settings settings = Settings.builder()
                        .put("cluster.name",Config.getInstance().get("cluster.name"))
                        .put("client.transport.sniff", Boolean.valueOf(Config.getInstance().get("client.transport.sniff")))
                        .build();
                
                @SuppressWarnings("unchecked")
                PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings);
                
                this.client = preBuiltTransportClient;
                this.client.addTransportAddress(new TransportAddress(InetAddress.getByName(Config.getInstance().get("host1")), 9300));
                this.client.addTransportAddress(new TransportAddress(InetAddress.getByName(Config.getInstance().get("host2")), 9300));
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    2)(es配置信息管理)

    package com.dx.es;
    
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.InputStream;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Properties;
    
    public class Config {
        private static Config instance=null;
        private Map<String, String> confItems=null;
        
        private Config(){
            this.confItems=new HashMap<String, String>();
        
            init();
        }
        
        public static synchronized Config getInstance(){
            if(instance==null){
                instance=new Config();
            }
            return instance;
        }
    
        public String get(String key){
            if(!this.confItems.containsKey(key))
                return null;
            
            return this.confItems.get(key);
        }
        
        private void init() {        
            Properties prop = new Properties();     
            try{
                // 读取属性文件conf.properties
                InputStream in = new BufferedInputStream (new FileInputStream("E:\spark_hadoop_cdh\workspace\ES_Client_API\src\main\resources\conf.properties"));
                // 加载属性列表
                prop.load(in);     
                Iterator<String> it=prop.stringPropertyNames().iterator();
                while(it.hasNext()){
                    String key=it.next();
                    System.out.println(key+":"+prop.getProperty(key));
                    this.confItems.put(key, prop.getProperty(key));
                }
                in.close();          
            }
            catch(Exception e){
                System.out.println(e);
            }
        }
    }
    View Code

    conf.properties配置内容为:

    cluster.name=es-application
    client.transport.sniff=true
    es_ip=192.168.0.120
    host1=slave1
    host2=slave2

    Index API

    参考:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-index.html#java-docs-index

    package com.dx.es;
    
    import java.io.IOException;
    import java.util.Date;
    
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.xcontent.XContentBuilder;
    import org.elasticsearch.common.xcontent.XContentFactory;
    import org.elasticsearch.rest.RestStatus;
    
    public class ClientAPITest {    
        public static void main(String[] args) {
            TransportClient client =    ClientTools.getInstance().get();
            
            XContentBuilder jsonBuilder=null;
            try {
                jsonBuilder = XContentFactory.jsonBuilder()
                        .startObject()
                            .field("user", "kimchy")
                            .field("postDate", new Date())
                            .field("message", "trying out Elasticsearch")
                        .endObject();
            } catch (IOException e) {
                e.printStackTrace();
            }
            
            IndexResponse response = client.prepareIndex("twitter","tweet","1")
                .setSource(jsonBuilder)
                .get();
            
            // Index name
            String _index = response.getIndex();
            // Type name
            String _type = response.getType();
            // Document ID (generated or not)
            String _id = response.getId();
            // Version (if it's the first time you index this document, you will get: 1)
            long _version = response.getVersion();
            // status has stored current instance statement.
            RestStatus status = response.status();
            if(status==RestStatus.CREATED){
                System.out.println("success !!!");
            }
            
            client.close();
        }
    }

    执行后效果,创建了index.type,和一条记录。

    Get API

    参考《https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-get.html》

    package com.dx.es;
    
    import java.util.Map;
    
    import org.elasticsearch.action.get.GetResponse;
    import org.elasticsearch.client.transport.TransportClient;
    
    public class ClientAPITest {
        public static void main(String[] args) {
            TransportClient client = ClientTools.getInstance().get();
            
            GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
            
            Map<String, Object> fields = response.getSource();
            for(Map.Entry<String, Object> kvEntry : fields.entrySet()){
                System.out.println(kvEntry.getKey()+":"+kvEntry.getValue());
            }
            
            client.close();
        }
    }

     打印结果:

    postDate:2018-08-05T06:48:18.334Z
    message:trying out Elasticsearch
    user:kimchy

    Delete API

    参考《https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-delete.html》

    package com.dx.es;
    
    import org.elasticsearch.action.delete.DeleteResponse;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.rest.RestStatus;
    
    public class ClientAPITest {
        public static void main(String[] args) {
            TransportClient client = ClientTools.getInstance().get();
            
            DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
            
            if(RestStatus.OK==    response.status()){
                System.out.println("Success ...");
            }
            
            client.close();
        }
    }

    通过es-head插件查看index.type依然存储只是数据为空。

    Delete By Query API

    参考:《https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-delete-by-query.html#java-docs-delete-by-query》

    package com.dx.es;
    
    import java.io.IOException;
    import java.util.Date;
    
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.xcontent.XContentBuilder;
    import org.elasticsearch.common.xcontent.XContentFactory;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.index.reindex.BulkByScrollResponse;
    import org.elasticsearch.index.reindex.DeleteByQueryAction;
    
    public class ClientAPITest {
        public static void main(String[] args) {
            TransportClient client = ClientTools.getInstance().get();
            create(client);
            
            BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                    .filter(QueryBuilders.matchQuery("gender", "male")) 
                    .source("twitter")                                  
                    .get();                                             
            long deleted = response.getDeleted();   
            System.out.println(deleted);
            
            client.close();
        }
    
        private static void create(TransportClient client) {
            XContentBuilder jsonBuilder = null;
            for (int i = 1; i <= 10; i++) {
                try {
                    jsonBuilder = XContentFactory.jsonBuilder()
                            .startObject()
                            .field("user", "kimchy"+i)
                            .field("gender", ((i%2==0) ? "male" : "famale"))
                            .field("postDate", new Date())
                            .field("message", "trying out Elasticsearch")
                            .endObject();
                } catch (IOException e) {
                    e.printStackTrace();
                }
    
                IndexResponse response = client.prepareIndex("twitter", "tweet", String.valueOf(i)).setSource(jsonBuilder).get();
            }
        }
    }

    新增之后查看出记录:

    删除之后,数据结果:

    如果执行一个耗时删除处理,可以采用异步方式删除,使用execute方法替换get,同事提供监听功能。

    package com.dx.es;
    
    import java.io.IOException;
    import java.util.Date;
    
    import org.elasticsearch.action.ActionListener;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.xcontent.XContentBuilder;
    import org.elasticsearch.common.xcontent.XContentFactory;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.index.reindex.BulkByScrollResponse;
    import org.elasticsearch.index.reindex.DeleteByQueryAction;
    
    public class ClientAPITest {
        public static void main(String[] args) {
            TransportClient client = ClientTools.getInstance().get();
            // create(client);
            
            DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                .filter(QueryBuilders.matchQuery("gender", "male"))
                .source("twitter")
                .execute(new ActionListener<BulkByScrollResponse>() {
                    public void onResponse(BulkByScrollResponse response) {
                        long deleted = response.getDeleted();
                        System.out.println(deleted);
                    }
    
                    public void onFailure(Exception e) {
                        // Handle the exception
                        e.printStackTrace();
                    }
                });
    
            try {
                Thread.sleep(60000);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            
            client.close();
        }
    
        private static void create(TransportClient client) {
            XContentBuilder jsonBuilder = null;
            for (int i = 1; i <= 10; i++) {
                try {
                    jsonBuilder = XContentFactory.jsonBuilder()
                            .startObject()
                            .field("user", "kimchy"+i)
                            .field("gender", ((i%2==0) ? "male" : "famale"))
                            .field("postDate", new Date())
                            .field("message", "trying out Elasticsearch")
                            .endObject();
                } catch (IOException e) {
                    e.printStackTrace();
                }
    
                IndexResponse response = client.prepareIndex("twitter", "tweet", String.valueOf(i)).setSource(jsonBuilder).get();
            }
        }
    }
    View Code

    Update API

    创建UpdateRequest把它发送给client:

    package com.dx.es;
    
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.xcontent.XContentFactory;
    
    public class ClientAPITest {
        public static void main(String[] args)  {
            TransportClient client = ClientTools.getInstance().get();
    
            UpdateRequest updateRequest = new UpdateRequest();
            updateRequest.index("twitter");
            updateRequest.type("tweet");
            updateRequest.id("1");
            try {
                updateRequest.doc(XContentFactory.jsonBuilder()
                            .startObject()
                            .field("gender", "male")
                            .endObject());
            } catch (IOException e) {
                e.printStackTrace();
            }
            
            try {
                client.update(updateRequest).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }        
            
            client.close();
        }
    }

    或者使用prepareUpdate()方法:

    方式一:

    package com.dx.es;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.elasticsearch.action.update.UpdateRequestBuilder;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.script.Script;
    
    public class ClientAPITest {
        public static void main(String[] args) {
            TransportClient client = ClientTools.getInstance().get();
    
            UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate("twitter", "tweet", "1");
            Map<String, Object> params = new HashMap<String, Object>();
            updateRequestBuilder.setScript(new Script(Script.DEFAULT_SCRIPT_TYPE, Script.DEFAULT_SCRIPT_LANG, "ctx._source.gender = "female"",params));
            updateRequestBuilder.get();
    
            client.close();
        }
    }

    方式二:

    package com.dx.es;
    
    import java.io.IOException;
    
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.xcontent.XContentFactory;
    
    public class ClientAPITest {
        public static void main(String[] args) {
            TransportClient client = ClientTools.getInstance().get();
    
            try {
                client.prepareUpdate("twitter", "tweet", "1")
                        .setDoc(XContentFactory.jsonBuilder().startObject().field("gender", "male").endObject()).get();
            } catch (IOException e) {
                e.printStackTrace();
            }
    
            client.close();
        }
    }

    Update by script

    package com.dx.es;
    
    import java.util.concurrent.ExecutionException;
    
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.script.Script;
    
    public class ClientAPITest {
        public static void main(String[] args) {
            TransportClient client = ClientTools.getInstance().get();
    
            UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")
                    .script(new Script("ctx._source.gender = "female""));
            try {
                client.update(updateRequest).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            
            client.close();
        }
    }

    Update by merging documents

    package com.dx.es;
    
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.xcontent.XContentFactory;
    
    public class ClientAPITest {
        public static void main(String[] args) {
            TransportClient client = ClientTools.getInstance().get();
    
            UpdateRequest updateRequest = null;
            try {
                updateRequest = new UpdateRequest("twitter", "tweet", "1")
                        .doc(
                                XContentFactory.jsonBuilder().startObject()
                                    .field("gender", "male")
                                    .endObject()
                        );
            } catch (IOException e) {
                e.printStackTrace();
            }
            
            try {
                client.update(updateRequest).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
    
            client.close();
        }
    }

    Upsert

    package com.dx.es;
    
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.xcontent.XContentFactory;
    
    public class ClientAPITest {
        public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
            TransportClient client = ClientTools.getInstance().get();
    
            IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "11")
                    .source(XContentFactory.jsonBuilder()                
                            .startObject()
                            .field("user", "Joe Smith")
                            .field("gender", "male")
                            .endObject()
                    );
            
            UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "11")
                    .doc(XContentFactory.jsonBuilder()
                            .startObject()
                            .field("user", "Joe Dalton")
                            .field("gender", "male")
                            .endObject()
                    )
                    .upsert(indexRequest);
            
            client.update(updateRequest).get();
            
            client.close();
        }
    }

    备注:如果对应的id数据已经存储在值则执行update,否则执行index。

    Multi Get API

    package com.dx.es;
    
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    
    import org.elasticsearch.action.get.GetResponse;
    import org.elasticsearch.action.get.MultiGetItemResponse;
    import org.elasticsearch.action.get.MultiGetResponse;
    import org.elasticsearch.client.transport.TransportClient;
    
    public class ClientAPITest {
        public static void main(String[] args)  {
            TransportClient client = ClientTools.getInstance().get();
    
            MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
                    .add("twitter", "tweet", "1")           
                    .add("twitter", "tweet", "2", "3", "4") 
                    .add("twitter", "tweet", "11")          
                    .get();
    
                for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
                    GetResponse response = itemResponse.getResponse();
                    if (response.isExists()) {                      
                        String json = response.getSourceAsString(); 
                        System.out.println(json);
                    }
                }
            
            client.close();
        }
    }

    返回打印结果:

    {"user":"kimchy1","gender":"male","postDate":"2018-08-05T10:04:26.631Z","message":"trying out Elasticsearch"}
    {"user":"kimchy2","gender":"male","postDate":"2018-08-05T10:04:26.673Z","message":"trying out Elasticsearch"}
    {"user":"kimchy3","gender":"famale","postDate":"2018-08-05T10:04:26.720Z","message":"trying out Elasticsearch"}
    {"user":"kimchy4","gender":"male","postDate":"2018-08-05T10:04:26.730Z","message":"trying out Elasticsearch"}
    {"user":"Joe Dalton","gender":"male"}

    Bulk API

    package com.dx.es;
    
    import java.io.IOException;
    import java.util.Date;
    
    import org.elasticsearch.action.bulk.BulkRequestBuilder;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.get.GetResponse;
    import org.elasticsearch.action.get.MultiGetItemResponse;
    import org.elasticsearch.action.get.MultiGetResponse;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.xcontent.XContentFactory;
    
    public class ClientAPITest {
        public static void main(String[] args) throws IOException  {
            TransportClient client = ClientTools.getInstance().get();
            BulkRequestBuilder bulkRequest = client.prepareBulk();
    
            // either use client#prepare, or use Requests# to directly build index/delete requests
            bulkRequest.add(client.prepareIndex("twitter", "tweet", "12")
                    .setSource(XContentFactory.jsonBuilder()
                                .startObject()
                                    .field("user", "auth")
                                    .field("postDate", new Date())
                                    .field("message", "trying out Elasticsearch")
                                .endObject()
                              )
                    );
    
            bulkRequest.add(client.prepareIndex("twitter", "tweet", "13")
                    .setSource(XContentFactory.jsonBuilder()
                                .startObject()
                                    .field("user", "judy")
                                    .field("postDate", new Date())
                                    .field("message", "another post")
                                .endObject()
                              )
                    );
    
            BulkResponse bulkResponse = bulkRequest.get();        
            if (bulkResponse.hasFailures()) {
                // process failures by iterating through each bulk response item
                System.out.println(    bulkResponse.buildFailureMessage());
            }
                    
            MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
                    .add("twitter", "tweet", "12", "13") 
                    .get();
    
                for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
                    GetResponse response = itemResponse.getResponse();
                    if (response.isExists()) {                      
                        String json = response.getSourceAsString(); 
                        System.out.println(json);
                    }
                }
            
            client.close();
        }
    }

    Using Bulk Processor

    参考:《https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.2/java-docs-bulk-processor.html#java-docs-bulk-processor》

    package com.dx.es;
    
    import java.io.IOException;
    import java.util.Date;
    
    import org.elasticsearch.action.bulk.BackoffPolicy;
    import org.elasticsearch.action.bulk.BulkProcessor;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.delete.DeleteRequest;
    import org.elasticsearch.action.get.GetResponse;
    import org.elasticsearch.action.get.MultiGetItemResponse;
    import org.elasticsearch.action.get.MultiGetResponse;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.unit.ByteSizeUnit;
    import org.elasticsearch.common.unit.ByteSizeValue;
    import org.elasticsearch.common.unit.TimeValue;
    import org.elasticsearch.common.xcontent.XContentFactory;
    
    public class ClientAPITest {
        public static void main(String[] args) throws IOException  {
            TransportClient client = ClientTools.getInstance().get();
            
            BulkProcessor bulkProcessor = BulkProcessor.builder(
                    client,  
                    new BulkProcessor.Listener() {
                        public void beforeBulk(long executionId, BulkRequest request) {
                            
                        }
    
                        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                            
                        }
    
                        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                            
                        } 
                    })
                    .setBulkActions(10000) 
                    .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) 
                    .setFlushInterval(TimeValue.timeValueSeconds(5)) 
                    .setConcurrentRequests(1) 
                    .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
                    .build();
    
            bulkProcessor.add(new DeleteRequest("twitter", "tweet", "1"));
            bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
    
            bulkProcessor.add(new IndexRequest("twitter", "tweet", "12")
                    .source(XContentFactory.jsonBuilder()
                            .startObject()
                            .field("user", "auth")
                            .field("postDate", new Date())
                            .field("message", "trying out Elasticsearch")
                            .endObject()
                            )
                );
            bulkProcessor.add(new IndexRequest("twitter", "tweet", "13")
                    .source(XContentFactory.jsonBuilder()
                            .startObject()
                            .field("user", "judy")
                            .field("postDate", new Date())
                            .field("message", "another post")
                            .endObject()
                          )
                    );
    
            // Flush any remaining requests
            bulkProcessor.flush();
            // Or close the bulkProcessor if you don't need it anymore
            bulkProcessor.close();
    
            // Refresh your indices
            client.admin().indices().prepareRefresh().get();
                    
            MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
                    .add("twitter", "tweet", "1", "2", "12", "13") 
                    .get();
    
                for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
                    GetResponse response = itemResponse.getResponse();
                    if (response.isExists()) {                      
                        String json = response.getSourceAsString(); 
                        System.out.println(json);
                    }
                }
            
            client.close();
        }
    }

    什么情况下重建索引?《Elasticsearch索引管理-reindex重建索引》------字段类型发生变化时需要重建索引。

    使用在spark下写入elasticsearch

    如果要使用spark相关类(例如:SparkConf)需要引入spark-core,要把RDD相关数据写入ES需要引入elasticsearch-spark-20_2.11

    maven引入如下:

            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.2.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20 -->
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch-spark-20_2.11</artifactId>
                <version>6.2.2</version>
            </dependency>        

    代码实现:

    package com.dx.es;
    
    import java.util.Map;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
    
    import com.google.common.collect.ImmutableList;
    import com.google.common.collect.ImmutableMap;
    
    public class JavaEsSpark_Test {
        public static void main(String[] args) throws InterruptedException {
            SparkConf conf = new SparkConf();
            conf.setMaster("local[*]"); // 指定运行模式模式
            conf.setAppName("spark to es");// 设置任务名
            conf.set("es.index.auto.create", "true");// 开启自动创建索引        
            conf.set("es.nodes", "192.168.0.120,192.168.0.121,192.168.0.122");// es的节点,多个用逗号分隔
            conf.set("es.port", "9200");// 端口号
            JavaSparkContext jsc = new JavaSparkContext(conf);
    
            Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
            Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");
    
            JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
            JavaEsSpark.saveToEs(javaRDD, "spark/docs");
            
            jsc.close();
        }
    }

    执行之后通过head工具查看是否插入成功。

    参考:

    Es Client Api

    https://www.sojson.com/blog/87.html

    https://www.sojson.com/blog/88.html

    https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html

    ES索引存储原理:

    https://blog.csdn.net/cyony/article/details/65437708?locationNum=9&fps=1

    写入ES示例:

    http://qindongliang.iteye.com/blog/2372853

  • 相关阅读:
    使用边缘计算来改变5G世界中的网络
    解开关于人工智能的六个迷思
    哪些数据将成为区块链系统的关键数据?
    如何通过7个步骤构建机器学习模型
    人工智能的发展体现了人类社会由实向虚的趋势
    5G技术与人工智能的智能结合
    量子计算总是混合的,这需要不断协调
    7.5省队集训 tree
    bzoj2989&4170: 数列
    bzoj1010: [HNOI2008]玩具装箱toy
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9356751.html
Copyright © 2011-2022 走看看