zoukankan      html  css  js  c++  java
  • Elasticsearch(GEO)数据写入和空间检索

    Elasticsearch简介

    什么是 Elasticsearch?
    Elasticsearch 是一个开源的分布式 RESTful搜索和分析引擎,能够解决越来越多不同的应用场景。

    本文内容

    本文主要是介绍了ES GEO数据写入和空间检索,ES版本为7.3.1

    数据准备

    Qgis使用渔网工具,对范围进行切割,得到网格的Geojson

    新建索引设置映射

    def set_mapping(es,index_name="content_engine",doc_type_name="en",my_mapping={}):
        # ignore 404 and 400
        es.indices.delete(index=index_name, ignore=[400, 404])
        print("delete_index")
        # ignore 400 cause by IndexAlreadyExistsException when creating an index
        my_mapping = {
            "properties": {
                "location": {"type": "geo_shape"},
                "id": {"type": "long"}
            }
        }
        create_index = es.indices.create(index=index_name)
        mapping_index = es.indices.put_mapping(index=index_name, doc_type=doc_type_name, body=my_mapping,                          include_type_name=True)
        print("create_index")
        if create_index["acknowledged"] is not True or mapping_index["acknowledged"] is not True:
            print("Index creation failed...")
    

    数据插入

    使用multiprocessing和elasticsearch.helpers.bulk进行数据写入,每一万条为一组写入,剩下的为一组,然后多线程写入。分别写入4731254条点和面数据。写入时候使用多核,ssd,合适的批量数据可以有效加快写入速度,通过这些手段可以在三分钟左右写入四百多万的点或者面数据。

    def mp_worker(features):
        count = 0
        es = Elasticsearch(hosts=[ip], timeout=5000)
        success, _ = bulk(es,features, index=index_name, raise_on_error=True)
        count += success
        return count
    def mp_handler(input_file, index_name, doc_type_name="en"):
        with open(input_file, 'rb') as f:
            data = json.load(f)
        features = data["features"]
        del data
        act=[]
        i=0
        count=0
        actions = []
        for feature in features:
            action = {
                    "_index": index_name,
                    "_type": doc_type_name,
                    "_source": {
                        "id": feature["properties"]["id"],
                        "location": {
                            "type": "polygon",
                            "coordinates": feature["geometry"]["coordinates"]
                        }
                    }
                }
            i=i+1
            actions.append(action)
            if (i == 9500):
                act.append(actions)
                count=count+i
                i = 0
                actions = []
        if i!=0:
            act.append(actions)
            count = count + i
        del features
        print('read all %s data ' % count)
        p = multiprocessing.Pool(4)
        i=0
        for result in p.imap(mp_worker, act):
            i=i+result
        print('write all %s data ' % i)
    

    GEO(point)查询距离nkm附近的点和范围选择

    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import scan
    import time
    starttime = time.time()
    _index = "gis_point"
    _doc_type = "20190824"
    ip = "127.0.0.1:9200"
    # 附近nkm 选择
    _body = {
        "query": {
            "bool": {
                "must": {
                    "match_all": {}
                },
                "filter": {
                    "geo_distance": {
                        "distance": "9km",
                        "location": {
                            "lat": 18.1098857850465471,
                            "lon": 109.1271036098896730
                        }
                    }
                }
            }
        }
    }
    # 范围选择
    # _body={
    #   "query": {
    #     "geo_bounding_box": {
    #       "location": {
    #         "top_left": {
    #           "lat": 18.4748659238899933,
    #           "lon": 109.0007435371629470
    #         },
    #         "bottom_right": {
    #           "lat": 18.1098857850465471,
    #           "lon": 105.1271036098896730
    #         }
    #       }
    #     }
    #   }
    # }
    es = Elasticsearch(hosts=[ip], timeout=5000)
    scanResp = scan(es, query=_body, scroll="10m", index=_index, timeout="10m")
    for resp in scanResp:
        print(resp)
    endtime = time.time()
    print(endtime - starttime)
    

    GEO(shape)范围选择

    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import scan
    import time
    starttime = time.time()
    _index = "gis"
    _doc_type = "20190823"
    ip = "127.0.0.1:9200"
    # envelope format, [[minlon,maxlat],[maxlon,minlat]]
    _body = {
        "query": {
            "bool": {
                "must": {
                    "match_all": {}
                },
                "filter": {
                    "geo_shape": {
                        "location": {
                            "shape": {
                                "type": "envelope",
                                "coordinates": [[108.987103609889, 18.474865923889993], [109.003537162947, 18.40988578504]]
                            },
                            "relation": "within"
                        }
                    }
                }
            }
        }
    }
    
    es = Elasticsearch(hosts=[ip], timeout=5000)
    scanResp = scan(es, query=_body, scroll="1m", index=_index, timeout="1m")
    for resp in scanResp:
        print(resp)
    endtime = time.time()
    print(endtime - starttime)
    

    GEO(point)距离聚合

    from elasticsearch import Elasticsearch
    import time
    starttime = time.time()
    _index = "gis_point"
    _doc_type = "20190824"
    ip = "127.0.0.1:9200"
    # 距离聚合
    _body = {
        "aggs" : {
            "rings_around_amsterdam" : {
                "geo_distance" : {
                    "field" : "location",
                    "origin" : "18.1098857850465471,109.1271036098896730",
                    "ranges" : [
                        { "to" : 100000 },
                        { "from" : 100000, "to" : 300000 },
                        { "from" : 300000 }
                    ]
                }
            }
        }
    }
    
    es = Elasticsearch(hosts=[ip], timeout=5000)
    scanResp = es.search( body=_body, index=_index)
    for i in scanResp['aggregations']['rings_around_amsterdam']['buckets']:
        print(i)
    endtime = time.time()
    print(endtime - starttime)
    

    中心点聚合

    _body ={
         "aggs" : {
            "centroid" : {
                "geo_centroid" : {
                    "field" : "location"
                }
            }
        }
    }
    es = Elasticsearch(hosts=[ip], timeout=5000)
    scanResp = es.search( body=_body, index=_index)
    print(scanResp['aggregations'])
    

    范围聚合

    _body = {
        "aggs": {
            "viewport": {
                "geo_bounds": {
                    "field": "location"
    
                }
            }
        }
    }
    es = Elasticsearch(hosts=[ip], timeout=5000)
    scanResp = es.search(body=_body, index=_index)
    print(scanResp['aggregations']['viewport'])
    

    geohash聚合

    ##低精度聚合,precision代表geohash长度
    _body = {
        "aggregations": {
            "large-grid": {
                "geohash_grid": {
                    "field": "location",
                    "precision": 3
                }
            }
        }
    }
    # 高精度聚合,范围聚合以及geohash聚合
    # _body = {
    #     "aggregations": {
    #         "zoomed-in": {
    #             "filter": {
    #                 "geo_bounding_box": {
    #                     "location": {
    #                         "top_left": "18.4748659238899933,109.0007435371629470",
    #                         "bottom_right": "18.4698857850465471,108.9971036098896730"
    #                     }
    #                 }
    #             },
    #             "aggregations": {
    #                 "zoom1": {
    #                     "geohash_grid": {
    #                         "field": "location",
    #                         "precision": 7
    #                     }
    #                 }
    #             }
    #         }
    #     }
    # }
    es = Elasticsearch(hosts=[ip], timeout=5000)
    scanResp = es.search(body=_body, index=_index)
    for i in scanResp['aggregations']['large-grid']['buckets']:
        print(i)
    #for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']:
    #    print(i)    
    


    切片聚合

    # 低精度切片聚合,precision代表级别
    _body = {
        "aggregations": {
            "large-grid": {
                "geotile_grid": {
                    "field": "location",
                    "precision": 8
                }
            }
        }
    }
    # 高精度切片聚合,范围聚合以切片聚合
    # _body={
    #     "aggregations" : {
    #         "zoomed-in" : {
    #             "filter" : {
    #                 "geo_bounding_box" : {
    #                     "location" : {
    #                         "top_left": "18.4748659238899933,109.0007435371629470",
    #                          "bottom_right": "18.4698857850465471,108.9991036098896730"
    #                     }
    #                 }
    #             },
    #             "aggregations":{
    #                 "zoom1":{
    #                     "geotile_grid" : {
    #                         "field": "location",
    #                         "precision": 18
    #                     }
    #                 }
    #             }
    #         }
    #     }
    # }
    es = Elasticsearch(hosts=[ip], timeout=5000)
    scanResp = es.search(body=_body, index=_index)
    for i in scanResp['aggregations']['large-grid']['buckets']:
        print(i)
    # for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']:
    #      print(i)
    


    Elasticsearch和PostGIS相同功能对比

    PostGIS最近点查询

    SELECT  id,geom, ST_DistanceSphere(geom,'SRID=4326;POINT(109.1681036098896730 18.1299957850465471)'::geometry) 
    FROM  h5 
    ORDER BY  geom <->
    'SRID=4326;POINT(109.1681036098896730 18.1299957850465471)'::geometry
    LIMIT 1 
    

    Elasticsearch最近点查询

    from elasticsearch import Elasticsearch
    import time
    starttime = time.time()
    _index = "gis_point"
    _doc_type = "20190824"
    ip = "127.0.0.1:9200"
    
    _body={
      "sort": [
        {
          "_geo_distance": {
            "unit": "m",
            "order": "asc",
            "location": [
              109.1681036098896730,
              18.1299957850465471
            ],
            "distance_type": "arc",
            "mode": "min",
            "ignore_unmapped": True
          }
        }
      ],
      "from": 0,
      "size": 1,
        "query": {
            "bool": {
              "must": {
                "match_all": {}
              }
            }
          }
    
    }
    es = Elasticsearch(hosts=[ip], timeout=5000)
    scanResp = es.search(body=_body, index=_index)
    endtime = time.time()
    print(endtime - starttime)
    

    PostGIS范围查询

    select id,geom,fid  FROM public."California"
    where 
    ST_Intersects(geom,ST_MakeEnvelope(-117.987103609889,33.40988578504,-117.003537162947,33.494865923889993, 4326))=true
    [-117.987103609889, 33.494865923889993], [-117.003537162947, 33.40988578504]
    

    Elasticsearch范围查询

    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import scan
    import time
    starttime = time.time()
    _index = "gis_california"
    ip = "127.0.0.1:9200"
    # envelope format, [[minlon,maxlat],[maxlon,minlat]]
    
    _body = {
        "query": {
            "bool": {
                "must": {
                    "match_all": {}
                },
                "filter": {
                    "geo_shape": {
                        "geom": {
                            "shape": {
                                "type": "envelope",
                                "coordinates": [[-117.987103609889, 33.494865923889993], [-117.003537162947, 33.40988578504]]
                            },
                            "relation": "INTERSECTS"
                        }
                    }
                }
            }
        }
    }
    es = Elasticsearch(hosts=[ip], timeout=5000)
    scanResp = scan(es, query=_body, scroll="1m", index=_index, timeout="1m")
    i=0
    for resp in scanResp:
        i=i+1
        a=resp
    print(i)
    endtime = time.time()
    print(endtime - starttime)
    

    两种场景中PostGIS的性能更好


    参考资料:

    1.Elasticsearch(GEO)空间检索查询

    2.Elasticsearch官网

    3.PostGIS拆分LineString为segment,point

    4.亿级“附近的人”,打通“特殊服务”通道

    5.PostGIS教程二十二:最近邻域搜索

  • 相关阅读:
    深度学习之TensorFlow(一)——基本使用
    64位win10+cuda8.0+vs2013+cuDNN V5下Caffe的编译安装教程并配置matlab2014a 接口
    Win10+vs2012+cuda8.0的安装与配置
    图像处理与matlab实例之图像平滑(一)
    Windows下pycharm使用theano的方法
    Python中的支持向量机SVM的使用(有实例)
    混淆矩阵在Matlab中PRtools模式识别工具箱的应用
    模式识别与机器学习—bagging与boosting
    微服务架构下分布式事务解决方案——阿里GTS
    谈谈分布式事务
  • 原文地址:https://www.cnblogs.com/polong/p/11523955.html
Copyright © 2011-2022 走看看