- 场景:
假设我们拥有一个拥有了一系列经纬度的表my_latlng(lat string,lng string)表,还有一张给定的栅格表my_grid(gridid bigint,centerlng double,centerlat double,gridx int,gridy int,minlng double,maxlng double,minlat double,maxlat double)并且栅格的为一个边长为5m的正方形,其中:
gridid :栅格id
centerlng:栅格中心点经度
centerlat :栅格中心点纬度
gridx :栅格x轴方向的坐标位置
gridy :栅格y轴方向的坐标位置
需求:给my_latlng表找它所落的栅格的id,如果my_latlng中的经纬度,在总体栅格以外,就不参与运算。
- 解决方案一:
由于栅格有最大、最小经纬度,因此可以直接使用栅格的经纬度范围来给my_latlng表回填栅格id:
select t11.gridid,t10.lat,t10.lng from my_latlng t10 inner join my_grid t11 where t10.lat>=t11.minlat and t10.lat<=t11.maxlat and t10.lng>=t11.minlng and t10.lng<t11.minlng;
缺陷:该种方案缺陷inner join 是没有on条件的,如果在hive中是没有办法把>=,>,<,<=符号给写到inner join 中 on条件上的,语法问题吧。
因此,上边这条语句是执行的一个cross join,如果my_latlng表有1000wt条记录,而my_grid有10000w条记录时,这样的一个cross join 在加上 where条件,就会导致这个数据在集群中1000spark套餐(假设说1spark套餐:1vcore cpu+12g memory+500g disk。),5个小时也无法分析出结果。
- 解决方案二:
我们知道经纬度小数点第5位代表的基本就是米单位,纬度30°时,经度每变化0.00001相当于变化1.1m。因此,我们可以粗略的认为5m的栅格在经度、纬度上的变化为0.00005个单位的变化。
因此,利用上边的这个特性我们可以有以下方案:
第一步、可以找到某些栅格距离自己纬度和经度变化接近5m的周围8+1个左右的栅格;
( rpad(t10.lat+0.00005,7,'0')=rpad(t11.centerlat,7,'0') or rpad(t10.lat,7,'0')=rpad(t11.centerlat+0.00005,7,'0') or rpad(t10.lat,7,'0')=rpad(t11.centerlat,7,'0') ) and ( rpad(t10.lng+0.00005,8,'0')=rpad(t11.centerlng,8,'0') or rpad(t10.lng,8,'0')=rpad(t11.centerlng+0.00005,8,'0') or rpad(t10.lng,8,'0')=rpad(t11.centerlng,8,'0') )
备注:我们计算范围:经度范围100.0到180.0,纬度范围为:10.0到90.0。
第二步、从第一步中的栅格中挑选距离自己最近的一个栅格作为自己归属栅格。
(
(cast(t10.lng as double)-t11.centerlng)*(cast(t10.lng as double)-t11.centerlng)
+(cast(t10.lat as double)-t11.centerlat)*(cast(t10.lat as double)-t11.centerlat)
) distans
但是上边的程序如果放在hive中的语句因该是这么写:
select t11.gridid,t10.lat,t10.lng,(
(cast(t10.lng as double)-t11.centerlng)*(cast(t10.lng as double)-t11.centerlng)
+(cast(t10.lat as double)-t11.centerlat)*(cast(t10.lat as double)-t11.centerlat)) distans from my_latlng t10 inner join my_grid t11 where ( rpad(t10.lat+0.00005,7,'0')=rpad(t11.centerlat,7,'0') or rpad(t10.lat,7,'0')=rpad(t11.centerlat+0.00005,7,'0') or rpad(t10.lat,7,'0')=rpad(t11.centerlat,7,'0') ) and(
rpad(t10.lng+0.00005,8,'0')=rpad(t11.centerlng,8,'0') or rpad(t10.lng,8,'0')=rpad(t11.centerlng+0.00005,8,'0') or rpad(t10.lng,8,'0')=rpad(t11.centerlng,8,'0') );
但是上边的程序是有以下两个问题:
问题1)inner join 没有 on 条件,原因是where中语句不允许写到on中,也是hive的语法问题;
问题2)上边这段代码也是执行的cross join,而此自然执行的也很慢。
好的事情是:
1)从这段代码中,我们是可以把多个语句拆分出9个语句,而且这9个语句是可以具有on条件的;
2)之后把9个语句分析的结果union all后的结果,再进行按照my_latlng.lat,my_latlng.lng分组求出具体每个经纬度的最小距离值;
3)使用“my_latlng的经纬度+最小距离”与“union all后的结果”进行一次inner join,就可以得到具体每个经纬度对应的栅格id。
具体代码:
hiveContext.sql("create table my_latlng_gridid_distance(gridid bigint,lat string,lng string,distance decimal(38,5))") hiveContext.sql("create table my_latlng_mindistance(lat string,lng string,min_distans decimal(38,5))") hiveContext.sql("create table my_latlng_gridid_result(gridid bigint,lat string,lng string)") hiveContext.sql("select t11.gridid,t10.lat,t10.lng,cast(((cast(t10.lng as double)-t11.centerlng)*(cast(t10.lng as double)-t11.centerlng)
+(cast(t10.lat as double)-t11.centerlat)*(cast(t10.lat as double)-t11.centerlat)) *10000000000000 as decimal(38,5)) distans
from my_latlng t10 inner join my_grid t11
on rpad(t10.lat+0.00005,7,'0')=rpad(t11.centerlat,7,'0') and (rpad(t10.lng+0.00005,8,'0')=rpad(t11.centerlng,8,'0')").registerTempTable("temp00") hiveContext.sql("insert into my_latlng_gridid_distance select * from temp00") hiveContext.sql("select t11.gridid,t10.lat,t10.lng,cast(((cast(t10.lng as double)-t11.centerlng)*(cast(t10.lng as double)-t11.centerlng)
+(cast(t10.lat as double)-t11.centerlat)*(cast(t10.lat as double)-t11.centerlat)) *10000000000000 as decimal(38,5)) distans
from my_latlng t10 inner join my_grid t11
on rpad(t10.lat+0.00005,7,'0')=rpad(t11.centerlat,7,'0') and rpad(t10.lng,8,'0')=rpad(t11.centerlng+0.00005,8,'0')").registerTempTable("temp01") hiveContext.sql("insert into my_latlng_gridid_distance select * from temp01") hiveContext.sql("select t11.gridid,t10.lat,t10.lng,cast(((cast(t10.lng as double)-t11.centerlng)*(cast(t10.lng as double)-t11.centerlng)
+(cast(t10.lat as double)-t11.centerlat)*(cast(t10.lat as double)-t11.centerlat)) *10000000000000 as decimal(38,5)) distans
from my_latlng t10 inner join my_grid t11
on rpad(t10.lat+0.00005,7,'0')=rpad(t11.centerlat,7,'0') and rpad(t10.lng,8,'0')=rpad(t11.centerlng,8,'0')").registerTempTable("temp02") hiveContext.sql("insert into my_latlng_gridid_distance select * from temp02") hiveContext.sql("select t11.gridid,t10.lat,t10.lng,cast(((cast(t10.lng as double)-t11.centerlng)*(cast(t10.lng as double)-t11.centerlng)
+(cast(t10.lat as double)-t11.centerlat)*(cast(t10.lat as double)-t11.centerlat)) *10000000000000 as decimal(38,5)) distans
from my_latlng t10 inner join my_grid t11
on rpad(t10.lat,7,'0')=rpad(t11.centerlat+0.00005,7,'0') and (rpad(t10.lng+0.00005,8,'0')=rpad(t11.centerlng,8,'0')").registerTempTable("temp10") hiveContext.sql("insert into my_latlng_gridid_distance select * from temp10") hiveContext.sql("select t11.gridid,t10.lat,t10.lng,cast(((cast(t10.lng as double)-t11.centerlng)*(cast(t10.lng as double)-t11.centerlng)
+(cast(t10.lat as double)-t11.centerlat)*(cast(t10.lat as double)-t11.centerlat)) *10000000000000 as decimal(38,5)) distans
from my_latlng t10 inner join my_grid t11
on rpad(t10.lat,7,'0')=rpad(t11.centerlat+0.00005,7,'0') and rpad(t10.lng,8,'0')=rpad(t11.centerlng+0.00005,8,'0')").registerTempTable("temp11") hiveContext.sql("insert into my_latlng_gridid_distance select * from temp11") hiveContext.sql("select t11.gridid,t10.lat,t10.lng,cast(((cast(t10.lng as double)-t11.centerlng)*(cast(t10.lng as double)-t11.centerlng)
+(cast(t10.lat as double)-t11.centerlat)*(cast(t10.lat as double)-t11.centerlat)) *10000000000000 as decimal(38,5)) distans
from my_latlng t10 inner join my_grid t11
on rpad(t10.lat,7,'0')=rpad(t11.centerlat+0.00005,7,'0') and rpad(t10.lng,8,'0')=rpad(t11.centerlng,8,'0')").registerTempTable("temp12") hiveContext.sql("insert into my_latlng_gridid_distance select * from temp12") hiveContext.sql("select t11.gridid,t10.lat,t10.lng,cast(((cast(t10.lng as double)-t11.centerlng)*(cast(t10.lng as double)-t11.centerlng)
+(cast(t10.lat as double)-t11.centerlat)*(cast(t10.lat as double)-t11.centerlat)) *10000000000000 as decimal(38,5)) distans
from my_latlng t10 inner join my_grid t11
on rpad(t10.lat,7,'0')=rpad(t11.centerlat,7,'0') and (rpad(t10.lng+0.00005,8,'0')=rpad(t11.centerlng,8,'0')").registerTempTable("temp20") hiveContext.sql("insert into my_latlng_gridid_distance select * from temp20") hiveContext.sql("select t11.gridid,t10.lat,t10.lng,cast(((cast(t10.lng as double)-t11.centerlng)*(cast(t10.lng as double)-t11.centerlng)
+(cast(t10.lat as double)-t11.centerlat)*(cast(t10.lat as double)-t11.centerlat)) *10000000000000 as decimal(38,5)) distans
from my_latlng t10 inner join my_grid t11
on rpad(t10.lat,7,'0')=rpad(t11.centerlat,7,'0') and rpad(t10.lng,8,'0')=rpad(t11.centerlng+0.00005,8,'0')").registerTempTable("temp21") hiveContext.sql("insert into my_latlng_gridid_distance select * from temp21") hiveContext.sql("select t11.gridid,t10.lat,t10.lng,cast(((cast(t10.lng as double)-t11.centerlng)*(cast(t10.lng as double)-t11.centerlng)
+(cast(t10.lat as double)-t11.centerlat)*(cast(t10.lat as double)-t11.centerlat)) *10000000000000 as decimal(38,5)) distans
from my_latlng t10 inner join my_grid t11
on rpad(t10.lat,7,'0')=rpad(t11.centerlat,7,'0') and rpad(t10.lng,8,'0')=rpad(t11.centerlng,8,'0')").registerTempTable("temp22") hiveContext.sql("insert into my_latlng_gridid_distance select * from temp22") hiveContext.sql("select lat,lng,min(distans) as min_distans " + "from my_latlng_gridid_distance " + "group by lat,lng").repartition(200).persist().registerTempTable("temp_10000") hiveContext.sql("insert into my_latlng_mindistance select * from temp_10000") hiveContext.sql("select t11.gridid,t11.lat,t11.lng " + "from my_latlng_mindistance as t10 " + "inner join my_latlng_gridid_distance as t11 " + "on t10.lat=t11.lat and t10.lng=t11.lng and t10.min_distans=t11.distans") .distinct() // must use distinct .repartition(200).persist().registerTempTable("temp_20000") hiveContext.sql("insert into my_latlng_gridid_result select * from temp_20000")
上边分区了9中情况,实际上是可以在简化为4种情况如下:
val df00 = hiveContext.sql("select t11.gridid,t10.key,t10.objectid,t10.longitude,t10.latitude,cast(((t10.longitude-t11.longitude)*(t10.longitude-t11.longitude)+(t10.latitude-t11.latitude)*(t10.latitude-t11.latitude))*10000000000000 as decimal(38,5)) distans,t10.averageltescrsrp as rsrp,t10.samecount " + "from m_join_s_" + city + " as t10 " + "inner join fl_" + city + " as t11 on rpad(t10.latitude+0.00005,7,'0')=rpad(t11.latitude+0.00005,7,'0') and rpad(t10.longitude,8,'0')=rpad(t11.longitude,8,'0')") .repartition(200).persist() df00.registerTempTable("temp_df00" + city) hiveContext.sql("insert into my_result" + city + " select * from temp_df00" + city) val df01 = hiveContext.sql("select t11.gridid,t10.key,t10.objectid,t10.longitude,t10.latitude,cast(((t10.longitude-t11.longitude)*(t10.longitude-t11.longitude)+(t10.latitude-t11.latitude)*(t10.latitude-t11.latitude))*10000000000000 as decimal(38,5)) distans,t10.averageltescrsrp as rsrp,t10.samecount " + "from m_join_s_" + city + " as t10 " + "inner join fl_" + city + " as t11 on rpad(t10.latitude+0.00005,7,'0')=rpad(t11.latitude+0.00005,7,'0') and rpad(t10.longitude+0.00005,8,'0')=rpad(t11.longitude+0.00005,8,'0')") .repartition(200).persist() df01.registerTempTable("temp_df01" + city) hiveContext.sql("insert into my_result" + city + " select * from temp_df01" + city) val df02 = hiveContext.sql("select t11.gridid,t10.key,t10.objectid,t10.longitude,t10.latitude,cast(((t10.longitude-t11.longitude)*(t10.longitude-t11.longitude)+(t10.latitude-t11.latitude)*(t10.latitude-t11.latitude))*10000000000000 as decimal(38,5)) distans,t10.averageltescrsrp as rsrp,t10.samecount " + "from m_join_s_" + city + " as t10 " + "inner join fl_" + city + " as t11 on rpad(t10.latitude,7,'0')=rpad(t11.latitude,7,'0') and rpad(t10.longitude+0.00005,8,'0')=rpad(t11.longitude+0.00005,8,'0')") .repartition(200).persist() df02.registerTempTable("temp_df02" + city) hiveContext.sql("insert into my_result" + city + " select * from temp_df02" + city) val df03 = hiveContext.sql("select t11.gridid,t10.key,t10.objectid,t10.longitude,t10.latitude,cast(((t10.longitude-t11.longitude)*(t10.longitude-t11.longitude)+(t10.latitude-t11.latitude)*(t10.latitude-t11.latitude))*10000000000000 as decimal(38,5)) distans,t10.averageltescrsrp as rsrp,t10.samecount " + "from m_join_s_" + city + " as t10 " + "inner join fl_" + city + " as t11 on rpad(t10.latitude,7,'0')=rpad(t11.latitude,7,'0') and rpad(t10.longitude,8,'0')=rpad(t11.longitude,8,'0')") .repartition(200).persist() df03.registerTempTable("temp_df03" + city) hiveContext.sql("insert into my_result" + city + " select * from temp_df03" + city)
-
解决方案三:
1 select t10.CITY,t11.OBJECTID,t11.POINT_NAME,COUNT(0) OTTCOUNT, sum(CASE WHEN t10.RP<=-110 then 1 else 0 end) WEAKOTTCOUNT 2 from ( 3 select t1.OBJECTID,t2.city,t2.RP,t2.longitude,t2.latitude 4 from SENSE_ZJ t1 5 inner join TEMP_OTT_HANGZHOU t2 6 on rpad((t1.miny+t1.maxy)/2,5,'0')=rpad(t2.latitude,5,'0') and rpad((t1.minx+t1.maxx)/2,6,'0')=rpad(t2.longitude,6,'0') 7 where t1.SENSE_NAME='xxx' 8 )t10 9 inner join SENSE_ZJ t11 on t10.OBJECTID=t11.OBJECTID 10 where t10.longitude >= t11.minx and t10.longitude <= t11.maxx and t10.latitude >= t11.miny and t10.latitude <= t11.maxy 11 group by t10.CITY,t11.OBJECTID,t11.POINT_NAME 12 order by objectid