一:表的设计
1.需求分析
用户发表微博,关注人可以接收到被关注人的微博
设计三张表格。
2.第一张,微博内容表weibo-content
RowKey : uid_timestamp 用户账号结合内容发布的时间戳
Column Family:cf 因为rowkey是使用用户账号结合内容发布的时间戳,所以这里内容保存的版本只会有一个版本
Column Qualifier :
theme 主题
content 内容
photo 图片
mp4 视频
link 链接
3.第二张,用户关系表relations
RowKey:uid
Column Family:cf1 关注用户
Column Qualifier:使用关注用户的uid作为列标签,
value:也用关注用户的uid
Column Family:cf2 粉丝用户
Column Qualifier:使用粉丝用户的uid作为列标签,
value:也用粉丝用户的uid
4.第三张,接受内容邮箱表 receive-content-email
RowKey:uid 用户账号
Column Family:cf
Column Qualifier:以关注用户账号作为列标签
value: 以微博内容表的rowkey作为value
保留版本数 1000(最大版本1000,最小版本1000,版本存留时间365*24*60*60)
5.梳理框架
当用户发表了一篇微博,内容将会保存到第一张表格,再根据第二张表格的rowkey(uid),找到关注自己的列uid。
第三张表的rowkey都是粉丝的uid,这时候再根据第三张表找到关注人的最新的内容。
二:编程实现
1.message.java
1 package org.apache.hadoop.hbase.weibo; 2 3 import java.io.Serializable; 4 5 /** 6 * 微博内容实体类 7 * @author ibeifeng 8 * 9 */ 10 public class Message implements Serializable{ 11 12 private static final long serialVersionUID = 2789732708160004861L; 13 14 private String uid; 15 16 private String content; 17 18 private String timestamp; 19 20 @Override 21 public boolean equals(Object obj) { 22 return super.equals(obj); 23 } 24 25 26 @Override 27 public String toString() { 28 return "uid=" + uid +",timestamp=" 29 + timestamp + ",content="" + content+"""; 30 } 31 32 public String getUid() { 33 return uid; 34 } 35 36 public void setUid(String uid) { 37 this.uid = uid; 38 } 39 40 public String getContent() { 41 return content; 42 } 43 44 public void setContent(String content) { 45 this.content = content; 46 } 47 48 public String getTimestamp() { 49 return timestamp; 50 } 51 52 public void setTimestamp(String timestamp) { 53 this.timestamp = timestamp; 54 } 55 56 }
2.weibo.java
1 package org.apache.hadoop.hbase.weibo; 2 3 import java.io.IOException; 4 import java.util.ArrayList; 5 import java.util.Iterator; 6 import java.util.List; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.hbase.Cell; 10 import org.apache.hadoop.hbase.CellUtil; 11 import org.apache.hadoop.hbase.HBaseConfiguration; 12 import org.apache.hadoop.hbase.HColumnDescriptor; 13 import org.apache.hadoop.hbase.HTableDescriptor; 14 import org.apache.hadoop.hbase.MasterNotRunningException; 15 import org.apache.hadoop.hbase.NamespaceDescriptor; 16 import org.apache.hadoop.hbase.TableName; 17 import org.apache.hadoop.hbase.ZooKeeperConnectionException; 18 import org.apache.hadoop.hbase.client.Delete; 19 import org.apache.hadoop.hbase.client.Get; 20 import org.apache.hadoop.hbase.client.HBaseAdmin; 21 import org.apache.hadoop.hbase.client.HConnection; 22 import org.apache.hadoop.hbase.client.HConnectionManager; 23 import org.apache.hadoop.hbase.client.HTableInterface; 24 import org.apache.hadoop.hbase.client.Put; 25 import org.apache.hadoop.hbase.client.Result; 26 import org.apache.hadoop.hbase.client.ResultScanner; 27 import org.apache.hadoop.hbase.client.Scan; 28 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 29 import org.apache.hadoop.hbase.filter.RowFilter; 30 import org.apache.hadoop.hbase.filter.SubstringComparator; 31 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 32 import org.apache.hadoop.hbase.util.Bytes; 33 34 /** 35 * 微博类 36 * @author ibeifeng 37 * 38 */ 39 public class Weibo { 40 41 static final Configuration conf = HBaseConfiguration.create(); 42 43 private static final byte[] weibo_content = Bytes.toBytes("weibo:weibo-content"); 44 45 private static final byte[] relations = Bytes.toBytes("weibo:relations"); 46 47 private static final byte[] receive_content_email = Bytes.toBytes("weibo:receive-content-email"); 48 49 50 /** 51 * 初始化命名空间 52 */ 53 public void initNameSpace(){ 54 HBaseAdmin admin = null; 55 try { 56 admin = new HBaseAdmin(conf); 57 58 NamespaceDescriptor descriptor = NamespaceDescriptor.create("weibo") 59 .addConfiguration("creator", "ibeifeng") 60 .addConfiguration("createTime", System.currentTimeMillis()+"").build(); 61 admin.createNamespace(descriptor); 62 63 } catch (MasterNotRunningException e) { 64 e.printStackTrace(); 65 } catch (ZooKeeperConnectionException e) { 66 e.printStackTrace(); 67 } catch (IOException e) { 68 e.printStackTrace(); 69 }finally{ 70 if(admin!=null) 71 try { 72 admin.close(); 73 } catch (IOException e) { 74 e.printStackTrace(); 75 } 76 } 77 } 78 79 80 /** 81 * 初始化表 82 */ 83 public void initTable(){ 84 HBaseAdmin admin = null; 85 try { 86 admin = new HBaseAdmin(conf); 87 88 /* 89 * 1、微博内容表 90 TableName: weibo:weibo-content 91 RowKey:用户ID_timestamp 92 列簇:cf 93 列标签: 94 cf:content 95 cf:title 96 cf:photo 97 98 版本设计:只需要保留一个版本 99 */ 100 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(weibo_content)); 101 HColumnDescriptor family = new HColumnDescriptor(Bytes.toBytes("cf")); 102 // 开启列簇 -- store的块缓存 103 family.setBlockCacheEnabled(true); 104 family.setBlocksize(1024*1024*2); 105 106 family.setCompressionType(Algorithm.SNAPPY); 107 108 family.setMaxVersions(1); 109 family.setMinVersions(1); 110 111 desc.addFamily(family); 112 113 //admin.createTable(desc); 114 byte[][] splitKeys = { 115 Bytes.toBytes("100"), 116 Bytes.toBytes("200"), 117 Bytes.toBytes("300") 118 }; 119 admin.createTable(desc,splitKeys); 120 121 122 /* 123 * 2、用户关系表 124 TableName: weibo:relations 125 RowKey: 用户ID 126 列簇:attend 关注用户 127 fan 粉丝用户 128 列标签:使用用户ID作为列标签,值为用户ID 129 130 版本:只需要一个版本 131 */ 132 HTableDescriptor relationTbl = new HTableDescriptor(TableName.valueOf(relations)); 133 HColumnDescriptor attend = new HColumnDescriptor(Bytes.toBytes("attend")); 134 // 开启列簇 -- store的块缓存 135 attend.setBlockCacheEnabled(true); 136 attend.setBlocksize(1024*1024*2); 137 138 attend.setCompressionType(Algorithm.SNAPPY); 139 140 attend.setMaxVersions(1); 141 attend.setMinVersions(1); 142 143 relationTbl.addFamily(attend); 144 145 HColumnDescriptor fans = new HColumnDescriptor(Bytes.toBytes("fans")); 146 // 开启列簇 -- store的块缓存 147 fans.setBlockCacheEnabled(true); 148 fans.setBlocksize(1024*1024*2); 149 150 fans.setCompressionType(Algorithm.SNAPPY); 151 152 fans.setMaxVersions(1); 153 fans.setMinVersions(1); 154 155 relationTbl.addFamily(fans); 156 157 admin.createTable(relationTbl); 158 159 /* 160 * 3、用户微博内容接收邮件箱表 161 TableName: weibo:receive-content-email 162 RowKey:用户ID 163 列簇:cf 164 列标签: 165 直接使用用户ID,value值取微博内容的RowKey 166 167 版本:设置最大版本为1000 168 */ 169 HTableDescriptor receiveContentEmail = 170 new HTableDescriptor(TableName.valueOf(receive_content_email)); 171 HColumnDescriptor rce_cf = new HColumnDescriptor(Bytes.toBytes("cf")); 172 // 开启列簇 -- store的块缓存 173 rce_cf.setBlockCacheEnabled(true); 174 rce_cf.setBlocksize(1024*1024*2); 175 176 rce_cf.setCompressionType(Algorithm.SNAPPY); 177 178 rce_cf.setMaxVersions(1000); 179 rce_cf.setMinVersions(1000); 180 181 receiveContentEmail.addFamily(rce_cf); 182 183 admin.createTable(receiveContentEmail); 184 185 } catch (MasterNotRunningException e) { 186 e.printStackTrace(); 187 } catch (ZooKeeperConnectionException e) { 188 e.printStackTrace(); 189 } catch (IOException e) { 190 e.printStackTrace(); 191 }finally{ 192 if(admin!=null) 193 try { 194 admin.close(); 195 } catch (IOException e) { 196 e.printStackTrace(); 197 } 198 } 199 } 200 201 /* 202 * 发布微博内容: 203 * 1)在微博内容表中插入一行数据 204 * 2)在用户微博内容接收邮件箱表对用户的所有粉丝用户添加数据 205 * 206 * Put 207 * put 'tablename','rowkey','cf:cq','value' 208 */ 209 public void pubishWeiboContent(String uid,String content){ 210 HConnection hconn = null; 211 try { 212 hconn = HConnectionManager.createConnection(conf); 213 // 1)在微博内容表中插入一行数据 214 HTableInterface weiboContentTbl = hconn.getTable(TableName.valueOf(weibo_content)); 215 // rowkey : uid_timestamp 216 long timestamp = System.currentTimeMillis(); 217 String rowkey = uid+"_"+timestamp; 218 Put put = new Put(Bytes.toBytes(rowkey)); 219 put.add(Bytes.toBytes("cf"), Bytes.toBytes("content"), Bytes.toBytes(content)); 220 weiboContentTbl.put(put); 221 222 // 查询该用户的粉丝用户 223 HTableInterface relationsTbl = hconn.getTable(TableName.valueOf(relations)); 224 // get 'tablename','rowkey','cf','cq' 225 Get get = new Get(Bytes.toBytes(uid)); 226 // 查询粉丝列簇下的所有粉丝 227 get.addFamily(Bytes.toBytes("fans")); 228 Result r = relationsTbl.get(get); 229 230 List<byte[]> fans = new ArrayList<byte[]>(); 231 Cell[] cells = r.rawCells(); 232 for(Cell c : cells){ 233 fans.add(CellUtil.cloneQualifier(c)); 234 } 235 236 if(fans.size() > 0){ 237 //2)在用户微博内容接收邮件箱表对用户的所有粉丝用户添加数据 238 HTableInterface rceTbl = hconn.getTable(TableName.valueOf(receive_content_email)); 239 List<Put> ps = new ArrayList<Put>(); 240 for(byte[] fanId : fans){ 241 Put p = new Put(fanId); 242 // p.add(Bytes.toBytes("cf"), 243 // Bytes.toBytes(uid), 244 // Bytes.toBytes(uid+"_"+System.currentTimeMillis())); 245 246 p.add(Bytes.toBytes("cf"), 247 Bytes.toBytes(uid), timestamp, 248 Bytes.toBytes(rowkey)); 249 ps.add(p); 250 } 251 rceTbl.put(ps); 252 } 253 } catch (IOException e) { 254 255 e.printStackTrace(); 256 }finally{ 257 if(hconn!=null) 258 try { 259 hconn.close(); 260 } catch (IOException e) { 261 e.printStackTrace(); 262 } 263 } 264 } 265 266 267 //添加关注用户 268 /** 269 * 添加关注用户 270 * 1)在微博用户关系表中,新增数据(关注用户列簇下添加标签) 271 * 2)从被添加的关注用户角度,新增粉丝用户 272 * 3)在微博邮件箱中添加关注用户发布的微博内容通知 273 * 274 * @param uid 275 * @param attends 276 */ 277 public void addAttends(String uid,String... attends){ 278 279 if(attends == null || attends.length <= 0) return ; 280 281 HConnection hconn = null; 282 try { 283 hconn = HConnectionManager.createConnection(conf); 284 //1)在微博用户关系表中,新增数据(关注用户列簇下添加标签) 285 HTableInterface relationsTbl = hconn.getTable(TableName.valueOf(relations)); 286 List<Put> ps = new ArrayList<Put>(); 287 Put put = new Put(Bytes.toBytes(uid)); 288 for(String attend:attends){ 289 put.add(Bytes.toBytes("attend"), Bytes.toBytes(attend), Bytes.toBytes(attend)); 290 // 2)从被添加的关注用户角度,新增粉丝用户 291 Put attendPut = new Put(Bytes.toBytes(attend)); 292 attendPut.add(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid)); 293 ps.add(attendPut); 294 } 295 ps.add(put); 296 relationsTbl.put(ps); 297 298 299 //3)在微博邮件箱中添加关注用户发布的微博内容通知 300 // 先查询关注用户发布微博内容 301 HTableInterface weiboContentTbl = hconn.getTable(TableName.valueOf(weibo_content)); 302 List<byte[]> rks = new ArrayList<byte[]>(); 303 Scan scan = new Scan(); 304 for(String attend:attends){ 305 // Filter 306 // 扫描表的rowkey,只有rowkey含有字符串("关注用户ID_"),取出 307 RowFilter rowFilter = 308 new RowFilter(CompareOp.EQUAL, new SubstringComparator(attend+"_")); 309 scan.setFilter(rowFilter); 310 ResultScanner resultScanner = weiboContentTbl.getScanner(scan); 311 Iterator<Result> it = resultScanner.iterator(); 312 while(it.hasNext()){ 313 Result r = it.next(); 314 Cell[] cells = r.rawCells(); 315 for(Cell c : cells){ 316 rks.add(CellUtil.cloneRow(c)); 317 } 318 } 319 } 320 if(rks.size() > 0){ 321 //List<byte[]> rks = new ArrayList<byte[]>(); 322 HTableInterface rceTbl = hconn.getTable(TableName.valueOf(receive_content_email)); 323 List<Put> puts = new ArrayList<Put>(); 324 for(byte[] rk : rks){ 325 Put p = new Put(Bytes.toBytes(uid)); 326 String rowkey = Bytes.toString(rk); 327 Long timestamp = Long.valueOf(rowkey.substring(rowkey.indexOf("_")+1)); 328 String attendId = rowkey.substring(0, rowkey.indexOf("_")); 329 p.add(Bytes.toBytes("cf"), 330 Bytes.toBytes(attendId), timestamp,rk); 331 332 puts.add(p); 333 } 334 rceTbl.put(puts); 335 } 336 337 } catch (IOException e) { 338 e.printStackTrace(); 339 }finally{ 340 if(hconn!=null) 341 try { 342 hconn.close(); 343 } catch (IOException e) { 344 e.printStackTrace(); 345 } 346 } 347 348 } 349 350 /** 351 * 取消关注用户 352 * 1)在微博用户关系表,针对该用户,删除被取消的关注用户所对应的单元格 353 * 2)在微博用户关系表,针对被取消用户,删除它们的粉丝用户 354 * 3)在微博内容接收邮件箱表中,移除该用户的这些被取消关注用户微博内容通知记录 355 * @param uid 356 * @param attends 可变长度的参数列表 357 */ 358 public void removeAttends(String uid,String... attends){ 359 360 if(attends == null || attends.length <= 0) return ; 361 362 HConnection hconn = null; 363 try { 364 hconn = HConnectionManager.createConnection(conf); 365 366 // 1)在微博用户关系表,针对该用户,删除被取消的关注用户所对应的单元格 367 HTableInterface relationsTbl = hconn.getTable(TableName.valueOf(relations)); 368 List<Delete> deletes = new ArrayList<Delete>(); 369 Delete delete = new Delete(Bytes.toBytes(uid)); 370 for(String attend:attends){ 371 delete.deleteColumn(Bytes.toBytes("attend"), Bytes.toBytes(attend)); 372 // 2)在微博用户关系表,针对被取消用户,删除它们的粉丝用户 373 Delete deleteFan = new Delete(Bytes.toBytes(attend)); 374 deleteFan.deleteColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid)); 375 deletes.add(deleteFan); 376 } 377 deletes.add(delete); 378 relationsTbl.delete(deletes); 379 380 381 //3)在微博内容接收邮件箱表中,移除该用户的这些被取消关注用户微博内容通知记录 382 HTableInterface rceTbl = hconn.getTable(TableName.valueOf(receive_content_email)); 383 Delete deleteRCE = new Delete(Bytes.toBytes(uid)); 384 for(String attend:attends){ 385 // deleteColumn删除最近版本 386 //deleteRCE.deleteColumn(Bytes.toBytes("cf"), Bytes.toBytes(attend)); 387 // 删除单元格的所有版本 388 Long timestamp = System.currentTimeMillis(); 389 deleteRCE.deleteColumns(Bytes.toBytes("cf"), Bytes.toBytes(attend),timestamp+1000000); 390 } 391 rceTbl.delete(deleteRCE); 392 393 } catch (IOException e) { 394 e.printStackTrace(); 395 }finally{ 396 if(hconn!=null) 397 try { 398 hconn.close(); 399 } catch (IOException e) { 400 e.printStackTrace(); 401 } 402 } 403 } 404 405 /** 406 * 用户获取所关注用户的微博内容 407 * 1) 从微博内容接收邮件箱表中获取用户其关注用户的微博内容 rowkey 408 * 2)从微博内容表中取出微博内容 409 * 410 * 411 * @param uid 412 * @return 413 */ 414 public List<Message> getAttendContents(String uid){ 415 416 List<Message> msgs = new ArrayList<Message>(); 417 HConnection hconn = null; 418 try { 419 hconn = HConnectionManager.createConnection(conf); 420 // 1) 从微博内容接收邮件箱表中获取用户其关注用户的微博内容 rowkey 421 HTableInterface rceTbl = hconn.getTable(TableName.valueOf(receive_content_email)); 422 Get get = new Get(Bytes.toBytes(uid)); 423 get.setMaxVersions(5); 424 Result r = rceTbl.get(get); 425 List<byte[]> rks = new ArrayList<byte[]>(); 426 Cell[] cells = r.rawCells(); 427 if(cells != null && cells.length > 0){ 428 for(Cell c : cells){ 429 430 byte[] rk = CellUtil.cloneValue(c); 431 rks.add(rk); 432 } 433 } 434 435 //2)从微博内容表中取出微博内容 436 if(rks.size() > 0){ 437 HTableInterface weiboContentTbl = hconn.getTable(TableName.valueOf(weibo_content)); 438 List<Get> gets = new ArrayList<Get>(); 439 for(byte[] rk : rks){ 440 Get g = new Get(rk); 441 gets.add(g); 442 } 443 444 Result[] results = weiboContentTbl.get(gets); 445 for(Result result : results){ 446 Cell[] cls = result.rawCells(); 447 for(Cell cell : cls){ 448 Message msg = new Message(); 449 String rowkey = Bytes.toString(CellUtil.cloneRow(cell)); 450 String attendUid = rowkey.substring(0, rowkey.indexOf("_")); 451 msg.setUid(attendUid); 452 String timestamp = rowkey.substring(rowkey.indexOf("_")+1); 453 msg.setTimestamp(timestamp); 454 455 String content = Bytes.toString(CellUtil.cloneValue(cell)); 456 msg.setContent(content); 457 458 msgs.add(msg); 459 } 460 } 461 } 462 463 } catch (IOException e) { 464 e.printStackTrace(); 465 }finally{ 466 if(hconn!=null) 467 try { 468 hconn.close(); 469 } catch (IOException e) { 470 e.printStackTrace(); 471 } 472 } 473 return msgs; 474 } 475 476 public static void main(String[] args) { 477 Weibo wb = new Weibo(); 478 wb.initNameSpace(); 479 wb.initTable(); 480 //wb.pubishWeiboContent("0001", "今天天气真不错!"); 481 //wb.pubishWeiboContent("0003", "今天天气真不错!"); 482 //wb.pubishWeiboContent("0003", "今天天气真不错!"); 483 //wb.pubishWeiboContent("0004", "今天天气真不错!"); 484 //wb.pubishWeiboContent("0004", "今天天气真不错!"); 485 //wb.pubishWeiboContent("0005", "今天天气真不错!"); 486 487 //wb.addAttends("0001", "0003","0004","0005"); 488 //wb.removeAttends("0001", "0003"); 489 490 //List<Message> msgs = wb.getAttendContents("0001"); 491 492 //System.out.println(msgs); 493 494 for(int i=0;i < 1000 ;i++){ 495 wb.pubishWeiboContent(String.format("%04d", i), "今天天气真不错!" + i); 496 } 497 498 499 } 500 501 }