zoukankan      html  css  js  c++  java
  • Elasticsearch+Hbase实现海量数据秒回查询

    ---------------------------------------------------------------------------------------------
    [版权申明:本文系作者原创,转载请注明出处] 
    文章出处:http://blog.csdn.net/sdksdk0/article/details/53966430
    作者:朱培      ID:sdksdk0     

    --------------------------------------------------------------------------------------------


    首先祝大家2017新年快乐,我今天分享的是通过ElasticSearch与hbase进行整合的一个搜索案例,这个案例涉及的技术面比较广,首先你得有JAVAEE的基础,要会SSM,而且还要会大数据中的hdfs、zookeeper、hbase以及ElasticSearch和kibana。环境部署在4台centos7上。主机名为node1-node4。这里假设你已经安装好了zookeeper、hadoop、hbase和ElasticSearch还有kibana,我这里使用的是hadoop2.5.2,ElasticSearch用的你是2.2,kibana是4.4.1。我这里的环境是 hadoop是4台在node1-node4, zookeeper是3台再node1-node3,,ElasticSearch是3台在node1-node3,kibana是一台在node1上。该系统可以对亿万数据查询进行秒回,是一般的关系型数据库很难做到的。在IntelliJ IDEA 中进行代码编写。环境搭建我这里就不啰嗦,相信大家作为一名由经验的开发人员来说都是小事一桩。文末提供源码下载链接。



    一、ElasticSearch和Hbase

    ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。 Elasticsearch的性能是solr的50倍。


    HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩、
    实时读写的分布式数据库
    – 利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理
    HBase中的海量数据,利用Zookeeper作为其分布式协同服务
    – 主要用来存储非结构化和半结构化的松散数据(列存 NoSQL 数据库)



    二、需求分析&服务器环境设置

    主要是做一个文章的搜索。有文章标题、作者、摘要、内容四个主要信息。效果图如下:这里样式我就没怎么设置了。。。。想要好看一点的可以自己加css。



    服务器:

    在3台centos7中部署,主机名为node1-node3.安装好ElasticSearch并配置好集群,

    1.     解压

    2.     修改config/elasticsearch.yml    (注意要顶格写,冒号后面要加一个空格)

    a)      Cluster.name: tf   (同一集群要一样)

    b)      Node.name: node-1  (同一集群要不一样)

    c)       Network.Host: 192.168.44.137  这里不能写127.0.0.1

    3.     解压安装kibana

    4.     再congfig目录下的kibana.yml中修改elasticsearch.url

    5.     安装插件

    Step 1: Install Marvel into Elasticsearch:

    bin/plugin install license
    bin/plugin install marvel-agent

    Step 2: Install Marvel into Kibana

    bin/kibana plugin --install elasticsearch/marvel/latest

    Step 3: Start Elasticsearch and Kibana

    bin/elasticsearch
    bin/kibana


    启动好elasticsearch集群后,

    然后启动zookeeper、hdfs、hbase。zkService.sh start  、start-all.sh、start-hbase.sh。

    接下来就是剩下编码步骤了。




    三、编码开发

    1、首先在IntelliJ IDEA中新建一个maven工程,加入如下依赖。

    1. <dependencies>
    2. <dependency>
    3. <groupId>junit</groupId>
    4. <artifactId>junit</artifactId>
    5. <version>4.9</version>
    6. </dependency>
    7. <!-- spring 3.2 -->
    8. <dependency>
    9. <groupId>org.springframework</groupId>
    10. <artifactId>spring-context</artifactId>
    11. <version>3.2.0.RELEASE</version>
    12. </dependency>
    13. <dependency>
    14. <groupId>org.springframework</groupId>
    15. <artifactId>spring-orm</artifactId>
    16. <version>3.2.0.RELEASE</version>
    17. </dependency>
    18. <dependency>
    19. <groupId>org.springframework</groupId>
    20. <artifactId>spring-aspects</artifactId>
    21. <version>3.2.0.RELEASE</version>
    22. </dependency>
    23. <dependency>
    24. <groupId>org.springframework</groupId>
    25. <artifactId>spring-web</artifactId>
    26. <version>3.2.0.RELEASE</version>
    27. </dependency>
    28. <dependency>
    29. <groupId>org.springframework</groupId>
    30. <artifactId>spring-webmvc</artifactId>
    31. <version>3.2.0.RELEASE</version>
    32. </dependency>
    33. <dependency>
    34. <groupId>org.springframework</groupId>
    35. <artifactId>spring-test</artifactId>
    36. <version>3.2.0.RELEASE</version>
    37. </dependency>
    38. <!-- JSTL -->
    39. <dependency>
    40. <groupId>jstl</groupId>
    41. <artifactId>jstl</artifactId>
    42. <version>1.2</version>
    43. </dependency>
    44. <dependency>
    45. <groupId>taglibs</groupId>
    46. <artifactId>standard</artifactId>
    47. <version>1.1.2</version>
    48. </dependency>
    49. <!-- slf4j -->
    50. <dependency>
    51. <groupId>org.slf4j</groupId>
    52. <artifactId>slf4j-api</artifactId>
    53. <version>1.7.10</version>
    54. </dependency>
    55. <dependency>
    56. <groupId>org.slf4j</groupId>
    57. <artifactId>slf4j-log4j12</artifactId>
    58. <version>1.7.10</version>
    59. </dependency>
    60. <!-- elasticsearch -->
    61. <dependency>
    62. <groupId>org.elasticsearch</groupId>
    63. <artifactId>elasticsearch</artifactId>
    64. <version>2.2.0</version>
    65. </dependency>
    66. <!-- habse -->
    67. <dependency>
    68. <groupId>org.apache.hbase</groupId>
    69. <artifactId>hbase-client</artifactId>
    70. <version>1.1.3</version>
    71. <exclusions>
    72. <exclusion>
    73. <groupId>com.google.guava</groupId>
    74. <artifactId>guava</artifactId>
    75. </exclusion>
    76. </exclusions>
    77. </dependency>
    78. </dependencies>

    2、Dao层

    1. private Integer id;
    2. private String title;
    3. private String describe;
    4. private String content;
    5. private String author;

    实现其getter/setter方法。


    3、数据准备

    在桌面新建一个doc1.txt文档,用于把我们需要查询的数据写入到里面,这里我只准备了5条数据。中间用tab键隔开。




    4、在hbase中建立表。表名师doc,列族是cf。


    public static void main(String[] args) throws Exception {
          HbaseUtils hbase = new HbaseUtils();
          //创建一张表
    
    	hbase.createTable("doc","cf");
    }


    /**
     * 创建一张表
     * @param tableName
     * @param column
     * @throws Exception
     */
    public void createTable(String tableName, String column) throws Exception {
       if(admin.tableExists(TableName.valueOf(tableName))){
          System.out.println(tableName+"表已经存在!");
       }else{
          HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
          tableDesc.addFamily(new HColumnDescriptor(column.getBytes()));
          admin.createTable(tableDesc);
          System.out.println(tableName+"表创建成功!");
       }
    }


    5、导入索引。这一步的时候确保你的hdfs和hbase以及elasticsearch是处于开启状态。

    1. @Test
    2. public void createIndex() throws Exception {
    3. List<Doc> arrayList = new ArrayList<Doc>();
    4. File file = new File("C:\Users\asus\Desktop\doc1.txt");
    5. List<String> list = FileUtils.readLines(file,"UTF8");
    6. for(String line : list){
    7. Doc Doc = new Doc();
    8. String[] split = line.split(" ");
    9. System.out.print(split[0]);
    10. int parseInt = Integer.parseInt(split[0].trim());
    11. Doc.setId(parseInt);
    12. Doc.setTitle(split[1]);
    13. Doc.setAuthor(split[2]);
    14. Doc.setDescribe(split[3]);
    15. Doc.setContent(split[3]);
    16. arrayList.add(Doc);
    17. }
    18. HbaseUtils hbaseUtils = new HbaseUtils();
    19. for (Doc Doc : arrayList) {
    20. try {
    21. //把数据插入hbase
    22. hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_TITLE, Doc.getTitle());
    23. hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_AUTHOR, Doc.getAuthor());
    24. hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_DESCRIBE, Doc.getDescribe());
    25. hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_CONTENT, Doc.getContent());
    26. //把数据插入es
    27. Esutil.addIndex("tfjt","doc", Doc);
    28. } catch (Exception e) {
    29. e.printStackTrace();
    30. }
    31. }
    32. }

    数据导入成功之后可以在服务器上通过命令查看一下:

    curl -XGET http://node1:9200/tfjt/_search




    7、搜索。

    在这里新建了一个工具类Esutil.java,主要用于处理搜索的。注意,我们默认的elasticsearch是9200端口的,这里数据传输用的是9300,不要写成9200了,然后就是集群名字为tf,也就是前面配置的集群名。还有就是主机名node1-node3,这里不能写ip地址,如果是本地测试的话,你需要在你的window下面配置hosts文件。


    1. public class Esutil {
    2. public static Client client = null;
    3. /**
    4. * 获取客户端
    5. * @return
    6. */
    7. public static Client getClient() {
    8. if(client!=null){
    9. return client;
    10. }
    11. Settings settings = Settings.settingsBuilder().put("cluster.name", "tf").build();
    12. try {
    13. client = TransportClient.builder().settings(settings).build()
    14. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("node1"), 9300))
    15. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("node2"), 9300))
    16. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("node3"), 9300));
    17. } catch (UnknownHostException e) {
    18. e.printStackTrace();
    19. }
    20. return client;
    21. }
    22. public static String addIndex(String index,String type,Doc Doc){
    23. HashMap<String, Object> hashMap = new HashMap<String, Object>();
    24. hashMap.put("id", Doc.getId());
    25. hashMap.put("title", Doc.getTitle());
    26. hashMap.put("describe", Doc.getDescribe());
    27. hashMap.put("author", Doc.getAuthor());
    28. IndexResponse response = getClient().prepareIndex(index, type).setSource(hashMap).execute().actionGet();
    29. return response.getId();
    30. }
    31. public static Map<String, Object> search(String key,String index,String type,int start,int row){
    32. SearchRequestBuilder builder = getClient().prepareSearch(index);
    33. builder.setTypes(type);
    34. builder.setFrom(start);
    35. builder.setSize(row);
    36. //设置高亮字段名称
    37. builder.addHighlightedField("title");
    38. builder.addHighlightedField("describe");
    39. //设置高亮前缀
    40. builder.setHighlighterPreTags("<font color='red' >");
    41. //设置高亮后缀
    42. builder.setHighlighterPostTags("</font>");
    43. builder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
    44. if(StringUtils.isNotBlank(key)){
    45. // builder.setQuery(QueryBuilders.termQuery("title",key));
    46. builder.setQuery(QueryBuilders.multiMatchQuery(key, "title","describe"));
    47. }
    48. builder.setExplain(true);
    49. SearchResponse searchResponse = builder.get();
    50. SearchHits hits = searchResponse.getHits();
    51. long total = hits.getTotalHits();
    52. Map<String, Object> map = new HashMap<String,Object>();
    53. SearchHit[] hits2 = hits.getHits();
    54. map.put("count", total);
    55. List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
    56. for (SearchHit searchHit : hits2) {
    57. Map<String, HighlightField> highlightFields = searchHit.getHighlightFields();
    58. HighlightField highlightField = highlightFields.get("title");
    59. Map<String, Object> source = searchHit.getSource();
    60. if(highlightField!=null){
    61. Text[] fragments = highlightField.fragments();
    62. String name = "";
    63. for (Text text : fragments) {
    64. name+=text;
    65. }
    66. source.put("title", name);
    67. }
    68. HighlightField highlightField2 = highlightFields.get("describe");
    69. if(highlightField2!=null){
    70. Text[] fragments = highlightField2.fragments();
    71. String describe = "";
    72. for (Text text : fragments) {
    73. describe+=text;
    74. }
    75. source.put("describe", describe);
    76. }
    77. list.add(source);
    78. }
    79. map.put("dataList", list);
    80. return map;
    81. }
    82. // public static void main(String[] args) {
    83. // Map<String, Object> search = Esutil.search("hbase", "tfjt", "doc", 0, 10);
    84. // List<Map<String, Object>> list = (List<Map<String, Object>>) search.get("dataList");
    85. // }
    86. }


    8、使用spring控制层处理

    在里面的spring配置这里就不说了,代码文末提供。

    1. @RequestMapping("/search.do")
    2. public String serachArticle(Model model,
    3. @RequestParam(value="keyWords",required = false) String keyWords,
    4. @RequestParam(value = "pageNum", defaultValue = "1") Integer pageNum,
    5. @RequestParam(value = "pageSize", defaultValue = "3") Integer pageSize){
    6. try {
    7. keyWords = new String(keyWords.getBytes("ISO-8859-1"),"UTF-8");
    8. } catch (UnsupportedEncodingException e) {
    9. e.printStackTrace();
    10. }
    11. Map<String,Object> map = new HashMap<String, Object>();
    12. int count = 0;
    13. try {
    14. map = Esutil.search(keyWords,"tfjt","doc",(pageNum-1)*pageSize, pageSize);
    15. count = Integer.parseInt(((Long) map.get("count")).toString());
    16. } catch (Exception e) {
    17. logger.error("查询索引错误!{}",e);
    18. e.printStackTrace();
    19. }
    20. PageUtil<Map<String, Object>> page = new PageUtil<Map<String, Object>>(String.valueOf(pageNum),String.valueOf(pageSize),count);
    21. List<Map<String, Object>> articleList = (List<Map<String, Object>>)map.get("dataList");
    22. page.setList(articleList);
    23. model.addAttribute("total",count);
    24. model.addAttribute("pageNum",pageNum);
    25. model.addAttribute("page",page);
    26. model.addAttribute("kw",keyWords);
    27. return "index.jsp";
    28. }


    9、页面


    1. <center>
    2. <form action="search.do" method="get">
    3. <input type="text" name="keyWords" />
    4. <input type="submit" value="百度一下">
    5. <input type="hidden" value="1" name="pageNum">
    6. </form>
    7. <c:if test="${! empty page.list }">
    8. <h3>百度为您找到相关结果约${total}个</h3>
    9. <c:forEach items="${page.list}" var="bean">
    10. <a href="/es/detailDocById/${bean.id}.do">${bean.title}</a>
    11. <br/>
    12. <br/>
    13. <span>${bean.describe}</span>
    14. <br/>
    15. <br/>
    16. </c:forEach>
    17. <c:if test="${page.hasPrevious }">
    18. <a href="search.do?pageNum=${page.previousPageNum }&keyWords=${kw}"> 上一页</a>
    19. </c:if>
    20. <c:forEach begin="${page.everyPageStart }" end="${page.everyPageEnd }" var="n">
    21. <a href="search.do?pageNum=${n }&keyWords=${kw}"> ${n }</a>   
    22. </c:forEach>
    23. <c:if test="${page.hasNext }">
    24. <a href="search.do?pageNum=${page.nextPageNum }&keyWords=${kw}"> 下一页</a>
    25. </c:if>
    26. </c:if>
    27. </center>


    10、项目发布

    在IntelliJ IDEA 中配置好常用的项目,这里发布名Application context名字为es,当然你也可以自定义设置。






    最终效果如下:搜索COS会得到结果,速度非常快。




    总结:这个案例的操作流程还是挺多的,要有细心和耐心,特别是服务器配置,各种版本要匹配好,不然会出各种头疼的问题,当然了,这个还是需要有一定基础,不然搞不定这个事情。。。。。


    源码地址:https://github.com/sdksdk0/es





  • 相关阅读:
    PHP观察者模式
    php减少损耗的方法之一 缓存对象
    php迭代器模式
    数据库安全措施的改进依据------未实践
    mysql利用phpmyadmin导入数据出现#1044错误 的可能原因
    两列布局的基本思路
    less1.5中的减错误
    ie63像素bug原因及解决办法不使用hack
    镜像翻转二叉树
    判断一个整数是否是 2 的幂次方
  • 原文地址:https://www.cnblogs.com/jpfss/p/10811566.html
Copyright © 2011-2022 走看看