本文部分转载于:
http://www.cnblogs.com/luxiaoxun/p/4869509.html
ElasticSearch的基本用法与集群搭建
ElasticSearch和Solr都是基于Lucene的搜索引擎,不过ElasticSearch天生支持分布式,而Solr是4.0版本后的SolrCloud才是分布式版本,Solr的分布式支持需要ZooKeeper的支持。
这里有一个详细的ElasticSearch和Solr的对比:http://solr-vs-elasticsearch.com/
二、基本用法
Elasticsearch集群可以包含多个索引(indices),每一个索引可以包含多个类型(types),每一个类型包含多个文档(documents),然后每个文档包含多个字段(Fields),这种面向文档型的储存,也算是NoSQL的一种吧。
ES比传统关系型数据库,对一些概念上的理解:
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices -> Types -> Documents -> Fields
从创建一个Client到添加、删除、查询等基本用法:
1、创建Client
public ElasticSearchService(String ipAddress, int port) { client = new TransportClient() .addTransportAddress(new InetSocketTransportAddress(ipAddress, port)); }
这里是一个TransportClient。
ES下两种客户端对比:
TransportClient:轻量级的Client,使用Netty线程池,Socket连接到ES集群。本身不加入到集群,只作为请求的处理。
Node Client:客户端节点本身也是ES节点,加入到集群,和其他ElasticSearch节点一样。频繁的开启和关闭这类Node Clients会在集群中产生“噪音”。
2、创建/删除Index和Type信息
1 // 创建索引 2 public void createIndex() { 3 client.admin().indices().create(new CreateIndexRequest(IndexName)) 4 .actionGet(); 5 } 6 7 // 清除所有索引 8 public void deleteIndex() { 9 IndicesExistsResponse indicesExistsResponse = client.admin().indices() 10 .exists(new IndicesExistsRequest(new String[] { IndexName })) 11 .actionGet(); 12 if (indicesExistsResponse.isExists()) { 13 client.admin().indices().delete(new DeleteIndexRequest(IndexName)) 14 .actionGet(); 15 } 16 } 17 18 // 删除Index下的某个Type 19 public void deleteType(){ 20 client.prepareDelete().setIndex(IndexName).setType(TypeName).execute().actionGet(); 21 } 22 23 // 定义索引的映射类型 24 public void defineIndexTypeMapping() { 25 try { 26 XContentBuilder mapBuilder = XContentFactory.jsonBuilder(); 27 mapBuilder.startObject() 28 .startObject(TypeName) 29 .startObject("properties") 30 .startObject(IDFieldName).field("type", "long").field("store", "yes").endObject() 31 .startObject(SeqNumFieldName).field("type", "long").field("store", "yes").endObject() 32 .startObject(IMSIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject() 33 .startObject(IMEIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject() 34 .startObject(DeviceIDFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject() 35 .startObject(OwnAreaFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject() 36 .startObject(TeleOperFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject() 37 .startObject(TimeFieldName).field("type", "date").field("store", "yes").endObject() 38 .endObject() 39 .endObject() 40 .endObject(); 41 42 PutMappingRequest putMappingRequest = Requests 43 .putMappingRequest(IndexName).type(TypeName) 44 .source(mapBuilder); 45 client.admin().indices().putMapping(putMappingRequest).actionGet(); 46 } catch (IOException e) { 47 log.error(e.toString()); 48 } 49 }
这里自定义了某个Type的索引映射(Mapping),默认ES会自动处理数据类型的映射:针对整型映射为long,浮点数为double,字符串映射为string,时间为date,true或false为boolean。
注意:针对字符串,ES默认会做“analyzed”处理,即先做分词、去掉stop words等处理再index。如果你需要把一个字符串做为整体被索引到,需要把这个字段这样设置:field("index", "not_analyzed")。
详情参考:https://www.elastic.co/guide/en/elasticsearch/guide/current/mapping-intro.html
个人注:
设置mapping信息 这段代码是在ES中的索引库index和类型都已经建立完之后,再向type中插入数据之前设置要插入数据对应的mappings信息...
如果给一个还没有创建的索引库,类型 设置mapping信息 可以参考如下代码:
也可以实现判断一下这个索引库index 或者指定索引库index对应的类型type到底是否存在...再设置mapping信息.
注意:插入数据之后是无法修改索引库对应的mapping信息的...只能删了重新创建. 如下部分我在项目中使用的代码:
1 public void setMappings(){ 2 //mappings 3 try { 4 XContentBuilder mappings = XContentFactory.jsonBuilder(); 5 mappings.startObject() 6 .startObject(EsUtil.mmobjectTypename) 7 .startObject("properties") 8 .startObject("name").field("type","text").endObject() 9 .startObject("search_createtime").field("type","text").endObject() 10 .endObject() 11 .endObject() 12 .endObject(); 13 boolean exists = transportClient.admin().indices() 14 .prepareExists(EsUtil.indexname) 15 .execute().actionGet().isExists(); 16 if(exists){ 17 PutMappingRequest putMappingRequest = Requests. 18 putMappingRequest(EsUtil.indexname).type(EsUtil.mmobjectTypename) 19 .source(mappings); 20 transportClient.admin().indices().putMapping(putMappingRequest).actionGet(); 21 }else{ 22 CreateIndexRequestBuilder prepareCreate = transportClient.admin().indices().prepareCreate(EsUtil.indexname); 23 prepareCreate.addMapping(EsUtil.mmobjectTypename, mappings).execute().actionGet(); 24 } 25 } catch (IOException e) { 26 e.printStackTrace(); 27 } 28 }
关于设置mappings信息还可以参考如下:
http://stackoverflow.com/questions/23552845/configure-elasticsearch-mapping-with-java-api
3、索引数据
1 // 批量索引数据 2 public void indexHotSpotDataList(List<Hotspotdata> dataList) { 3 if (dataList != null) { 4 int size = dataList.size(); 5 if (size > 0) { 6 BulkRequestBuilder bulkRequest = client.prepareBulk(); 7 for (int i = 0; i < size; ++i) { 8 Hotspotdata data = dataList.get(i); 9 String jsonSource = getIndexDataFromHotspotData(data); 10 if (jsonSource != null) { 11 bulkRequest.add(client 12 .prepareIndex(IndexName, TypeName, 13 data.getId().toString()) 14 .setRefresh(true).setSource(jsonSource)); 15 } 16 } 17 18 BulkResponse bulkResponse = bulkRequest.execute().actionGet(); 19 if (bulkResponse.hasFailures()) { 20 Iterator<BulkItemResponse> iter = bulkResponse.iterator(); 21 while (iter.hasNext()) { 22 BulkItemResponse itemResponse = iter.next(); 23 if (itemResponse.isFailed()) { 24 log.error(itemResponse.getFailureMessage()); 25 } 26 } 27 } 28 } 29 } 30 } 31 32 // 索引数据 33 public boolean indexHotspotData(Hotspotdata data) { 34 String jsonSource = getIndexDataFromHotspotData(data); 35 if (jsonSource != null) { 36 IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName, 37 TypeName).setRefresh(true); 38 requestBuilder.setSource(jsonSource) 39 .execute().actionGet(); 40 return true; 41 } 42 43 return false; 44 } 45 46 // 得到索引字符串 47 public String getIndexDataFromHotspotData(Hotspotdata data) { 48 String jsonString = null; 49 if (data != null) { 50 try { 51 XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); 52 jsonBuilder.startObject().field(IDFieldName, data.getId()) 53 .field(SeqNumFieldName, data.getSeqNum()) 54 .field(IMSIFieldName, data.getImsi()) 55 .field(IMEIFieldName, data.getImei()) 56 .field(DeviceIDFieldName, data.getDeviceID()) 57 .field(OwnAreaFieldName, data.getOwnArea()) 58 .field(TeleOperFieldName, data.getTeleOper()) 59 .field(TimeFieldName, data.getCollectTime()) 60 .endObject(); 61 jsonString = jsonBuilder.string(); 62 } catch (IOException e) { 63 log.equals(e); 64 } 65 } 66 67 return jsonString; 68 }
ES支持批量和单个数据索引。
4、查询获取数据
1 // 获取少量数据100个 2 private List<Integer> getSearchData(QueryBuilder queryBuilder) { 3 List<Integer> ids = new ArrayList<>(); 4 SearchResponse searchResponse = client.prepareSearch(IndexName) 5 .setTypes(TypeName).setQuery(queryBuilder).setSize(100) 6 .execute().actionGet(); 7 SearchHits searchHits = searchResponse.getHits(); 8 for (SearchHit searchHit : searchHits) { 9 Integer id = (Integer) searchHit.getSource().get("id"); 10 ids.add(id); 11 } 12 return ids; 13 } 14 15 // 获取大量数据 16 private List<Integer> getSearchDataByScrolls(QueryBuilder queryBuilder) { 17 List<Integer> ids = new ArrayList<>(); 18 // 一次获取100000数据 19 SearchResponse scrollResp = client.prepareSearch(IndexName) 20 .setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000)) 21 .setQuery(queryBuilder).setSize(100000).execute().actionGet(); 22 while (true) { 23 for (SearchHit searchHit : scrollResp.getHits().getHits()) { 24 Integer id = (Integer) searchHit.getSource().get(IDFieldName); 25 ids.add(id); 26 } 27 scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()) 28 .setScroll(new TimeValue(600000)).execute().actionGet(); 29 if (scrollResp.getHits().getHits().length == 0) { 30 break; 31 } 32 } 33 34 return ids; 35 }
这里的QueryBuilder是一个查询条件,ES支持分页查询获取数据,也可以一次性获取大量数据,需要使用Scroll Search。
5、聚合(Aggregation Facet)查询
1 // 得到某段时间内设备列表上每个设备的数据分布情况<设备ID,数量> 2 public Map<String, String> getDeviceDistributedInfo(String startTime, 3 String endTime, List<String> deviceList) { 4 5 Map<String, String> resultsMap = new HashMap<>(); 6 7 QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList); 8 QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime); 9 QueryBuilder queryBuilder = QueryBuilders.boolQuery() 10 .must(deviceQueryBuilder).must(rangeBuilder); 11 12 TermsBuilder termsBuilder = AggregationBuilders.terms("DeviceIDAgg").size(Integer.MAX_VALUE) 13 .field(DeviceIDFieldName); 14 SearchResponse searchResponse = client.prepareSearch(IndexName) 15 .setQuery(queryBuilder).addAggregation(termsBuilder) 16 .execute().actionGet(); 17 Terms terms = searchResponse.getAggregations().get("DeviceIDAgg"); 18 if (terms != null) { 19 for (Terms.Bucket entry : terms.getBuckets()) { 20 resultsMap.put(entry.getKey(), 21 String.valueOf(entry.getDocCount())); 22 } 23 } 24 return resultsMap; 25 }
Aggregation查询可以查询类似统计分析这样的功能:如某个月的数据分布情况,某类数据的最大、最小、总和、平均值等。
详情参考:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html
三、集群配置
配置文件elasticsearch.yml
集群名和节点名:
#cluster.name: elasticsearch
#node.name: "Franz Kafka"
是否参与master选举和是否存储数据
#node.master: true
#node.data: true
分片数和副本数
#index.number_of_shards: 5
#index.number_of_replicas: 1
master选举最少的节点数,这个一定要设置为整个集群节点个数的一半加1,即N/2+1
#discovery.zen.minimum_master_nodes: 1
discovery ping的超时时间,拥塞网络,网络状态不佳的情况下设置高一点
#discovery.zen.ping.timeout: 3s
注意,分布式系统整个集群节点个数N要为奇数个!!
如何避免ElasticSearch发生脑裂(brain split):http://blog.trifork.com/2013/10/24/how-to-avoid-the-split-brain-problem-in-elasticsearch/
即使集群节点个数为奇数,minimum_master_nodes为整个集群节点个数一半加1,也难以避免脑裂的发生,详情看讨论:https://github.com/elastic/elasticsearch/issues/2488
四、Elasticsearch插件
1、elasticsearch-head是一个elasticsearch的集群管理工具:./elasticsearch-1.7.1/bin/plugin -install mobz/elasticsearch-head
2、elasticsearch-sql:使用SQL语法查询elasticsearch:./bin/plugin -u https://github.com/NLPchina/elasticsearch-sql/releases/download/1.3.5/elasticsearch-sql-1.3.5.zip --install sql
github地址:https://github.com/NLPchina/elasticsearch-sql
3、elasticsearch-bigdesk是elasticsearch的一个集群监控工具,可以通过它来查看ES集群的各种状态。
安装:./bin/plugin -install lukas-vlcek/bigdesk
访问:http://192.103.101.203:9200/_plugin/bigdesk/,
4、elasticsearch-servicewrapper插件是ElasticSearch的服务化插件,
在https://github.com/elasticsearch/elasticsearch-servicewrapper下载该插件后,解压缩,将service目录拷贝到elasticsearch目录的bin目录下。
而后,可以通过执行以下语句安装、启动、停止ElasticSearch:
sh elasticsearch install
sh elasticsearch start
sh elasticsearch stop
参考:
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
http://stackoverflow.com/questions/10213009/solr-vs-elasticsearch
=============================================
java代码操作ES的settings配置信息