zoukankan      html  css  js  c++  java
  • ES code study

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.edu360.es</groupId>
    <artifactId>HelloES</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    </properties>

    <dependencies>

    <!-- es的客户端-->
    <dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>5.4.3</version>
    </dependency>

    <!-- 依赖2.x的log4j -->
    <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.8.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8.2</version>
    </dependency>

    <!-- 单元测试 -->
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    </dependency>


    </dependencies>

    </project>

    package cn.edu360.es;

    import org.elasticsearch.action.get.GetResponse;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.InetSocketTransportAddress;
    import org.elasticsearch.transport.client.PreBuiltTransportClient;

    import java.net.InetAddress;

    /**
    * Created by zx on 2017/8/15.
    */
    public class HelloWorld {

    public static void main(String[] args) {

    try {

    //设置集群名称
    Settings settings = Settings.builder()
    .put("cluster.name", "my-es")
    .build();
    //创建client
    TransportClient client = new PreBuiltTransportClient(settings).addTransportAddresses(
    //用java访问ES用的端口是9300
    new InetSocketTransportAddress(InetAddress.getByName("192.168.100.211"), 9300),
    new InetSocketTransportAddress(InetAddress.getByName("192.168.100.212"), 9300),
    new InetSocketTransportAddress(InetAddress.getByName("192.168.100.213"), 9300));
    //搜索数据(.actionGet()方法是同步的,没有返回就等待)
    GetResponse response = client.prepareGet("news", "fulltext", "1").execute().actionGet();
    //输出结果
    System.out.println(response);
    //关闭client
    client.close();

    } catch (Exception e) {
    e.printStackTrace();
    }

    }
    }

    package cn.edu360.es;

    import org.elasticsearch.action.ActionListener;
    import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
    import org.elasticsearch.action.delete.DeleteResponse;
    import org.elasticsearch.action.get.GetResponse;
    import org.elasticsearch.action.get.MultiGetItemResponse;
    import org.elasticsearch.action.get.MultiGetResponse;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.action.search.SearchRequestBuilder;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.InetSocketTransportAddress;
    import org.elasticsearch.index.query.QueryBuilder;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.index.reindex.DeleteByQueryAction;
    import org.elasticsearch.search.SearchHit;
    import org.elasticsearch.search.SearchHitField;
    import org.elasticsearch.search.SearchHits;
    import org.elasticsearch.search.aggregations.Aggregation;
    import org.elasticsearch.search.aggregations.AggregationBuilders;
    import org.elasticsearch.search.aggregations.Aggregations;
    import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
    import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
    import org.elasticsearch.search.aggregations.bucket.terms.Terms;
    import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
    import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
    import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
    import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
    import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
    import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
    import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
    import org.elasticsearch.transport.client.PreBuiltTransportClient;
    import org.junit.Before;
    import org.junit.Test;

    import java.io.IOException;
    import java.net.InetAddress;
    import java.util.Date;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Set;

    import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
    import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;


    /**
    * Created by zx on 2017/9/5.
    * https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.4/index.html
    */
    public class EsCRUD {

    private TransportClient client = null;

    @Before
    public void init() throws Exception {
    //设置集群名称
    Settings settings = Settings.builder()
    .put("cluster.name", "my-es")
    //自动感知的功能(可以通过当前指定的节点获取所有es节点的信息)
    .put("client.transport.sniff", true)
    .build();
    //创建client
    client = new PreBuiltTransportClient(settings).addTransportAddresses(
    new InetSocketTransportAddress(InetAddress.getByName("192.168.100.211"), 9300),
    new InetSocketTransportAddress(InetAddress.getByName("192.168.100.212"), 9300),
    new InetSocketTransportAddress(InetAddress.getByName("192.168.100.213"), 9300));

    }


    @Test
    public void testCreate() throws IOException {

    IndexResponse response = client.prepareIndex("gamelog", "users", "1")
    .setSource(
    jsonBuilder()
    .startObject()
    .field("username", "老赵")
    .field("gender", "male")
    .field("birthday", new Date())
    .field("fv", 9999)
    .field("message", "trying out Elasticsearch")
    .endObject()
    ).get();


    }

    //查找一条
    @Test
    public void testGet() throws IOException {
    GetResponse response = client.prepareGet("gamelog", "users", "1").get();
    System.out.println(response.getSourceAsString());
    }


    //查找多条
    @Test
    public void testMultiGet() throws IOException {
    MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("gamelog", "users", "1")
    .add("gamelog", "users", "2", "3")
    .add("news", "fulltext", "1")
    .get();

    for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {
    String json = response.getSourceAsString();
    System.out.println(json);
    }
    }
    }

    @Test
    public void testUpdate() throws Exception {
    UpdateRequest updateRequest = new UpdateRequest();
    updateRequest.index("gamelog");
    updateRequest.type("users");
    updateRequest.id("2");
    updateRequest.doc(
    jsonBuilder()
    .startObject()
    .field("fv", 999.9)
    .endObject());
    client.update(updateRequest).get();
    }

    @Test
    public void testDelete() {
    DeleteResponse response = client.prepareDelete("gamelog", "users", "2").get();
    System.out.println(response);
    }

    @Test
    public void testDeleteByQuery() {
    BulkByScrollResponse response =
    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    //指定查询条件
    .filter(QueryBuilders.matchQuery("username", "老段"))
    //指定索引名称
    .source("gamelog")
    .get();

    long deleted = response.getDeleted();
    System.out.println(deleted);


    }

    //异步删除
    @Test
    public void testDeleteByQueryAsync() {
    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))
    .source("gamelog")
    .execute(new ActionListener<BulkByScrollResponse>() {
    @Override
    public void onResponse(BulkByScrollResponse response) {
    long deleted = response.getDeleted();
    System.out.println("数据删除了");
    System.out.println(deleted);
    }

    @Override
    public void onFailure(Exception e) {
    e.printStackTrace();
    }
    });
    try {
    System.out.println("异步删除");
    Thread.sleep(10000);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    @Test
    public void testRange() {

    QueryBuilder qb = rangeQuery("fv")
    // [88.99, 10000)
    .from(88.99)
    .to(10000)
    .includeLower(true)
    .includeUpper(false);

    SearchResponse response = client.prepareSearch("gamelog").setQuery(qb).get();

    System.out.println(response);
    }


    /**
    * curl -XPUT 'http://192.168.5.251:9200/player_info/player/1' -d '{ "name": "curry", "age": 29, "salary": 3500,"team": "war", "position": "pg"}'
    * curl -XPUT 'http://192.168.5.251:9200/player_info/player/2' -d '{ "name": "thompson", "age": 26, "salary": 2000,"team": "war", "position": "pg"}'
    * curl -XPUT 'http://192.168.5.251:9200/player_info/player/3' -d '{ "name": "irving", "age": 25, "salary": 2000,"team": "cav", "position": "pg"}'
    * curl -XPUT 'http://192.168.5.251:9200/player_info/player/4' -d '{ "name": "green", "age": 26, "salary": 2000,"team": "war", "position": "pf"}'
    * curl -XPUT 'http://192.168.5.251:9200/player_info/player/5' -d '{ "name": "james", "age": 33, "salary": 4000,"team": "cav", "position": "sf"}'
    */
    @Test
    public void testAddPlayer() throws IOException {

    IndexResponse response = client.prepareIndex("player_info", "player", "1")
    .setSource(
    jsonBuilder()
    .startObject()
    .field("name", "James")
    .field("age", 33)
    .field("salary", 3000)
    .field("team", "cav")
    .field("position", "sf")
    .endObject()
    ).get();


    }

    /**
    * https://elasticsearch.cn/article/102
    *
    * select team, count(*) as player_count from player group by team;
    */
    @Test
    public void testAgg1() {

    //指定索引和type
    SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
    //按team分组然后聚合,但是并没有指定聚合函数
    TermsAggregationBuilder teamAgg = AggregationBuilders.terms("player_count").field("team");
    //添加聚合器
    builder.addAggregation(teamAgg);
    //触发
    SearchResponse response = builder.execute().actionGet();
    //System.out.println(response);
    //将返回的结果放入到一个map中
    Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
    // Set<String> keys = aggMap.keySet();
    //
    // for (String key: keys) {
    // System.out.println(key);
    // }

    // //取出聚合属性
    StringTerms terms = (StringTerms) aggMap.get("player_count");


    //
    //// //依次迭代出分组聚合数据
    // for (Terms.Bucket bucket : terms.getBuckets()) {
    // //分组的名字
    // String team = (String) bucket.getKey();
    // //count,分组后一个组有多少数据
    // long count = bucket.getDocCount();
    // System.out.println(team + " " + count);
    // }

    Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator();
    while (teamBucketIt .hasNext()) {
    Terms.Bucket bucket = teamBucketIt.next();
    String team = (String) bucket.getKey();

    long count = bucket.getDocCount();

    System.out.println(team + " " + count);
    }

    }

    /**
    * select team, position, count(*) as pos_count from player group by team, position;
    */
    @Test
    public void testAgg2() {
    SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
    //指定别名和分组的字段
    TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
    TermsAggregationBuilder posAgg= AggregationBuilders.terms("pos_count").field("position");
    //添加两个聚合构建器
    builder.addAggregation(teamAgg.subAggregation(posAgg));
    //执行查询
    SearchResponse response = builder.execute().actionGet();
    //将查询结果放入map中
    Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
    //根据属性名到map中查找
    StringTerms teams = (StringTerms) aggMap.get("team_name");
    //循环查找结果
    for (Terms.Bucket teamBucket : teams.getBuckets()) {
    //先按球队进行分组
    String team = (String) teamBucket.getKey();
    Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
    StringTerms positions = (StringTerms) subAggMap.get("pos_count");
    //因为一个球队有很多位置,那么还要依次拿出位置信息
    for (Terms.Bucket posBucket : positions.getBuckets()) {
    //拿到位置的名字
    String pos = (String) posBucket.getKey();
    //拿出该位置的数量
    long docCount = posBucket.getDocCount();
    //打印球队,位置,人数
    System.out.println(team + " " + pos + " " + docCount);
    }


    }
    }


    /**
    * select team, max(age) as max_age from player group by team;
    */
    @Test
    public void testAgg3() {
    SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
    //指定安球队进行分组
    TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
    //指定分组求最大值
    MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age");
    //分组后求最大值
    builder.addAggregation(teamAgg.subAggregation(maxAgg));
    //查询
    SearchResponse response = builder.execute().actionGet();
    Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
    //根据team属性,获取map中的内容
    StringTerms teams = (StringTerms) aggMap.get("team_name");
    for (Terms.Bucket teamBucket : teams.getBuckets()) {
    //分组的属性名
    String team = (String) teamBucket.getKey();
    //在将聚合后取最大值的内容取出来放到map中
    Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
    //取分组后的最大值
    InternalMax ages = (InternalMax)subAggMap.get("max_age");
    double max = ages.getValue();
    System.out.println(team + " " + max);
    }
    }

    /**
    * select team, avg(age) as avg_age, sum(salary) as total_salary from player group by team;
    */
    @Test
    public void testAgg4() {
    SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
    //指定分组字段
    TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team");
    //指定聚合函数是求平均数据
    AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age");
    //指定另外一个聚合函数是求和
    SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
    //分组的聚合器关联了两个聚合函数
    builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg));
    SearchResponse response = builder.execute().actionGet();
    Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
    //按分组的名字取出数据
    StringTerms teams = (StringTerms) aggMap.get("team_name");
    for (Terms.Bucket teamBucket : teams.getBuckets()) {
    //获取球队名字
    String team = (String) teamBucket.getKey();
    Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
    //根据别名取出平均年龄
    InternalAvg avgAge = (InternalAvg)subAggMap.get("avg_age");
    //根据别名取出薪水总和
    InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
    double avgAgeValue = avgAge.getValue();
    double totalSalaryValue = totalSalary.getValue();
    System.out.println(team + " " + avgAgeValue + " " + totalSalaryValue);
    }
    }


    /**
    * select team, sum(salary) as total_salary from player group by team order by total_salary desc;
    */
    @Test
    public void testAgg5() {
    SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
    //按team进行分组,然后指定排序规则
    TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team").order(Terms.Order.aggregation("total_salary ", true));
    SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
    builder.addAggregation(termsAgg.subAggregation(sumAgg));
    SearchResponse response = builder.execute().actionGet();
    Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
    StringTerms teams = (StringTerms) aggMap.get("team_name");
    for (Terms.Bucket teamBucket : teams.getBuckets()) {
    String team = (String) teamBucket.getKey();
    Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
    InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
    double totalSalaryValue = totalSalary.getValue();
    System.out.println(team + " " + totalSalaryValue);
    }
    }

    }

    package cn.edu360.es;

    import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
    import org.elasticsearch.client.AdminClient;
    import org.elasticsearch.client.IndicesAdminClient;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.InetSocketTransportAddress;
    import org.elasticsearch.common.xcontent.XContentBuilder;
    import org.elasticsearch.common.xcontent.XContentFactory;
    import org.elasticsearch.transport.client.PreBuiltTransportClient;
    import org.junit.Before;
    import org.junit.Test;

    import java.io.IOException;
    import java.net.InetAddress;
    import java.util.HashMap;

    /**
    * Created by zx on 2017/9/6.
    */
    public class AdminAPI {

    private TransportClient client = null;

    //在所有的测试方法之前执行
    @Before
    public void init() throws Exception {
    //设置集群名称
    Settings settings = Settings.builder().put("cluster.name", "my-es").build();
    //创建client
    client = new PreBuiltTransportClient(settings).addTransportAddresses(
    new InetSocketTransportAddress(InetAddress.getByName("192.168.100.211"), 9300),
    new InetSocketTransportAddress(InetAddress.getByName("192.168.100.212"), 9300),
    new InetSocketTransportAddress(InetAddress.getByName("192.168.100.213"), 9300));

    }

    //创建索引,并配置一些参数
    @Test
    public void createIndexWithSettings() {
    //获取Admin的API
    AdminClient admin = client.admin();
    //使用Admin API对索引进行操作
    IndicesAdminClient indices = admin.indices();
    //准备创建索引
    indices.prepareCreate("gamelog")
    //配置索引参数
    .setSettings(
    //参数配置器
    Settings.builder()//指定索引分区的数量
    .put("index.number_of_shards", 4)
    //指定索引副本的数量(注意:不包括本身,如果设置数据存储副本为2,实际上数据存储了3份)
    .put("index.number_of_replicas", 2)
    )
    //真正执行
    .get();
    }

    //跟索引添加mapping信息(给表添加schema信息)
    @Test
    public void putMapping() {
    //创建索引
    client.admin().indices().prepareCreate("twitter")
    //创建一个type,并指定type中属性的名字和类型
    .addMapping("tweet",
    "{ " +
    " "tweet": { " +
    " "properties": { " +
    " "message": { " +
    " "type": "string" " +
    " } " +
    " } " +
    " } " +
    " }")
    .get();
    }

    /**
    * 你可以通过dynamic设置来控制这一行为,它能够接受以下的选项:
    * true:默认值。动态添加字段
    * false:忽略新字段
    * strict:如果碰到陌生字段,抛出异常
    * @throws IOException
    */
    @Test
    public void testSettingsMappings() throws IOException {
    //1:settings
    HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
    settings_map.put("number_of_shards", 3);
    settings_map.put("number_of_replicas", 2);

    //2:mappings(映射、schema)
    XContentBuilder builder = XContentFactory.jsonBuilder()
    .startObject()
    .field("dynamic", "true")
    //设置type中的属性
    .startObject("properties")
    //id属性
    .startObject("num")
    //类型是integer
    .field("type", "integer")
    //不分词,但是建索引
    .field("index", "not_analyzed")
    //在文档中存储
    .field("store", "yes")
    .endObject()
    //name属性
    .startObject("name")
    //string类型
    .field("type", "string")
    //在文档中存储
    .field("store", "yes")
    //建立索引
    .field("index", "analyzed")
    //使用ik_smart进行分词
    .field("analyzer", "ik_smart")
    .endObject()
    .endObject()
    .endObject();

    CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("user_info");
    //管理索引(user_info)然后关联type(user)
    prepareCreate.setSettings(settings_map).addMapping("user", builder).get();
    }

    /**
    * XContentBuilder mapping = jsonBuilder()
    .startObject()
    .startObject("productIndex")
    .startObject("properties")
    .startObject("title").field("type", "string").field("store", "yes").endObject()
    .startObject("description").field("type", "string").field("index", "not_analyzed").endObject()
    .startObject("price").field("type", "double").endObject()
    .startObject("onSale").field("type", "boolean").endObject()
    .startObject("type").field("type", "integer").endObject()
    .startObject("createDate").field("type", "date").endObject()
    .endObject()
    .endObject()
    .endObject();
    PutMappingRequest mappingRequest = Requests.putMappingRequest("productIndex").type("productIndex").source(mapping);
    client.admin().indices().putMapping(mappingRequest).actionGet();
    */


    /**
    * index这个属性,no代表不建索引
    * not_analyzed,建索引不分词
    * analyzed 即分词,又建立索引
    * expected [no], [not_analyzed] or [analyzed]
    * @throws IOException
    */

    @Test
    public void testSettingsPlayerMappings() throws IOException {
    //1:settings
    HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
    settings_map.put("number_of_shards", 3);
    settings_map.put("number_of_replicas", 1);

    //2:mappings
    XContentBuilder builder = XContentFactory.jsonBuilder()
    .startObject()//
    .field("dynamic", "true")
    .startObject("properties")
    .startObject("id")
    .field("type", "integer")
    .field("store", "yes")
    .endObject()
    .startObject("name")
    .field("type", "string")
    .field("index", "not_analyzed")
    .endObject()
    .startObject("age")
    .field("type", "integer")
    .endObject()
    .startObject("salary")
    .field("type", "integer")
    .endObject()
    .startObject("team")
    .field("type", "string")
    .field("index", "not_analyzed")
    .endObject()
    .startObject("position")
    .field("type", "string")
    .field("index", "not_analyzed")
    .endObject()
    .startObject("description")
    .field("type", "string")
    .field("store", "no")
    .field("index", "analyzed")
    .field("analyzer", "ik_smart")
    .endObject()
    .startObject("addr")
    .field("type", "string")
    .field("store", "yes")
    .field("index", "analyzed")
    .field("analyzer", "ik_smart")
    .endObject()
    .endObject()
    .endObject();

    CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("player_info");
    prepareCreate.setSettings(settings_map).addMapping("player", builder).get();

    }
    }

  • 相关阅读:
    撤回本地的提交
    antd Table每列样式修改
    大数组拼树
    滑动加载
    数组合并去除重复内容
    获取前一周期日期
    js 对象根据value获取对应的key
    less git上传问题处理
    5G
    Linux怎么安装node.js
  • 原文地址:https://www.cnblogs.com/ahu-lichang/p/11710089.html
Copyright © 2011-2022 走看看