zoukankan      html  css  js  c++  java
  • 079 微博表的设计,以及代码的设计

    一:表的设计

    1.需求分析

      用户发表微博,关注人可以接收到被关注人的微博

      设计三张表格。

    2.第一张,微博内容表weibo-content

      RowKey : uid_timestamp 用户账号结合内容发布的时间戳

      Column Family:cf 因为rowkey是使用用户账号结合内容发布的时间戳,所以这里内容保存的版本只会有一个版本

      Column Qualifier :
        theme 主题
        content 内容
        photo 图片
        mp4 视频
        link 链接

    3.第二张,用户关系表relations

      RowKey:uid

      Column Family:cf1 关注用户
        Column Qualifier:使用关注用户的uid作为列标签,
        value:也用关注用户的uid

      Column Family:cf2 粉丝用户
        Column Qualifier:使用粉丝用户的uid作为列标签,
        value:也用粉丝用户的uid

    4.第三张,接受内容邮箱表 receive-content-email

      RowKey:uid 用户账号

      Column Family:cf

      Column Qualifier:以关注用户账号作为列标签
        value: 以微博内容表的rowkey作为value
        保留版本数 1000(最大版本1000,最小版本1000,版本存留时间365*24*60*60)

    5.梳理框架

      当用户发表了一篇微博,内容将会保存到第一张表格,再根据第二张表格的rowkey(uid),找到关注自己的列uid。

      第三张表的rowkey都是粉丝的uid,这时候再根据第三张表找到关注人的最新的内容。

    二:编程实现

    1.message.java

     1 package org.apache.hadoop.hbase.weibo;
     2 
     3 import java.io.Serializable;
     4 
     5 /**
     6  * 微博内容实体类
     7  * @author ibeifeng
     8  *
     9  */
    10 public class Message implements Serializable{
    11     
    12     private static final long serialVersionUID = 2789732708160004861L;
    13 
    14     private String uid;
    15     
    16     private  String content;
    17     
    18     private String timestamp;
    19 
    20     @Override
    21     public boolean equals(Object obj) {
    22         return super.equals(obj);
    23     }
    24 
    25 
    26     @Override
    27     public String toString() {
    28         return "uid=" + uid +",timestamp=" 
    29                 + timestamp + ",content="" + content+""";
    30     }
    31 
    32     public String getUid() {
    33         return uid;
    34     }
    35 
    36     public void setUid(String uid) {
    37         this.uid = uid;
    38     }
    39 
    40     public String getContent() {
    41         return content;
    42     }
    43 
    44     public void setContent(String content) {
    45         this.content = content;
    46     }
    47 
    48     public String getTimestamp() {
    49         return timestamp;
    50     }
    51 
    52     public void setTimestamp(String timestamp) {
    53         this.timestamp = timestamp;
    54     }
    55     
    56 }

    2.weibo.java

      1 package org.apache.hadoop.hbase.weibo;
      2 
      3 import java.io.IOException;
      4 import java.util.ArrayList;
      5 import java.util.Iterator;
      6 import java.util.List;
      7 
      8 import org.apache.hadoop.conf.Configuration;
      9 import org.apache.hadoop.hbase.Cell;
     10 import org.apache.hadoop.hbase.CellUtil;
     11 import org.apache.hadoop.hbase.HBaseConfiguration;
     12 import org.apache.hadoop.hbase.HColumnDescriptor;
     13 import org.apache.hadoop.hbase.HTableDescriptor;
     14 import org.apache.hadoop.hbase.MasterNotRunningException;
     15 import org.apache.hadoop.hbase.NamespaceDescriptor;
     16 import org.apache.hadoop.hbase.TableName;
     17 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
     18 import org.apache.hadoop.hbase.client.Delete;
     19 import org.apache.hadoop.hbase.client.Get;
     20 import org.apache.hadoop.hbase.client.HBaseAdmin;
     21 import org.apache.hadoop.hbase.client.HConnection;
     22 import org.apache.hadoop.hbase.client.HConnectionManager;
     23 import org.apache.hadoop.hbase.client.HTableInterface;
     24 import org.apache.hadoop.hbase.client.Put;
     25 import org.apache.hadoop.hbase.client.Result;
     26 import org.apache.hadoop.hbase.client.ResultScanner;
     27 import org.apache.hadoop.hbase.client.Scan;
     28 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
     29 import org.apache.hadoop.hbase.filter.RowFilter;
     30 import org.apache.hadoop.hbase.filter.SubstringComparator;
     31 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
     32 import org.apache.hadoop.hbase.util.Bytes;
     33 
     34 /**
     35  * 微博类
     36  * @author ibeifeng
     37  *
     38  */
     39 public class Weibo {
     40     
     41     static  final Configuration conf = HBaseConfiguration.create();
     42     
     43     private static final byte[] weibo_content = Bytes.toBytes("weibo:weibo-content");
     44     
     45     private static final byte[] relations =  Bytes.toBytes("weibo:relations");
     46     
     47     private static final byte[] receive_content_email = Bytes.toBytes("weibo:receive-content-email");
     48     
     49     
     50     /**
     51      * 初始化命名空间
     52      */
     53     public void initNameSpace(){
     54         HBaseAdmin admin = null;
     55         try {
     56             admin = new HBaseAdmin(conf);
     57             
     58             NamespaceDescriptor descriptor = NamespaceDescriptor.create("weibo")
     59                     .addConfiguration("creator", "ibeifeng")
     60                     .addConfiguration("createTime", System.currentTimeMillis()+"").build();
     61             admin.createNamespace(descriptor);
     62             
     63         } catch (MasterNotRunningException e) {
     64             e.printStackTrace();
     65         } catch (ZooKeeperConnectionException e) {
     66             e.printStackTrace();
     67         } catch (IOException e) {
     68             e.printStackTrace();
     69         }finally{
     70             if(admin!=null)
     71                 try {
     72                     admin.close();
     73                 } catch (IOException e) {
     74                     e.printStackTrace();
     75                 }
     76         }
     77     }
     78     
     79     
     80     /**
     81      * 初始化表
     82      */
     83     public void initTable(){
     84         HBaseAdmin admin = null;
     85         try {
     86             admin = new HBaseAdmin(conf);
     87             
     88             /*
     89              * 1、微博内容表
     90                 TableName:   weibo:weibo-content
     91                 RowKey:用户ID_timestamp
     92                 列簇:cf
     93                 列标签:    
     94                         cf:content
     95                         cf:title
     96                         cf:photo
     97                         
     98                     版本设计:只需要保留一个版本
     99              */
    100             HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(weibo_content));
    101             HColumnDescriptor family = new HColumnDescriptor(Bytes.toBytes("cf"));
    102             // 开启列簇 -- store的块缓存
    103             family.setBlockCacheEnabled(true);
    104             family.setBlocksize(1024*1024*2);
    105             
    106             family.setCompressionType(Algorithm.SNAPPY);
    107             
    108             family.setMaxVersions(1);
    109             family.setMinVersions(1);
    110             
    111             desc.addFamily(family);
    112             
    113             //admin.createTable(desc);
    114             byte[][] splitKeys = {
    115                     Bytes.toBytes("100"),
    116                     Bytes.toBytes("200"),
    117                     Bytes.toBytes("300")
    118             };
    119             admin.createTable(desc,splitKeys);
    120             
    121             
    122             /*
    123              * 2、用户关系表
    124                 TableName: weibo:relations
    125                 RowKey: 用户ID
    126                 列簇:attend 关注用户
    127                         fan  粉丝用户
    128                 列标签:使用用户ID作为列标签,值为用户ID
    129                 
    130                 版本:只需要一个版本
    131              */
    132             HTableDescriptor relationTbl = new HTableDescriptor(TableName.valueOf(relations));
    133             HColumnDescriptor attend = new HColumnDescriptor(Bytes.toBytes("attend"));
    134             // 开启列簇 -- store的块缓存
    135             attend.setBlockCacheEnabled(true);
    136             attend.setBlocksize(1024*1024*2);
    137             
    138             attend.setCompressionType(Algorithm.SNAPPY);
    139             
    140             attend.setMaxVersions(1);
    141             attend.setMinVersions(1);
    142             
    143             relationTbl.addFamily(attend);
    144             
    145             HColumnDescriptor fans = new HColumnDescriptor(Bytes.toBytes("fans"));
    146             // 开启列簇 -- store的块缓存
    147             fans.setBlockCacheEnabled(true);
    148             fans.setBlocksize(1024*1024*2);
    149             
    150             fans.setCompressionType(Algorithm.SNAPPY);
    151             
    152             fans.setMaxVersions(1);
    153             fans.setMinVersions(1);
    154             
    155             relationTbl.addFamily(fans);
    156             
    157             admin.createTable(relationTbl);
    158             
    159             /*
    160              * 3、用户微博内容接收邮件箱表
    161             TableName:   weibo:receive-content-email
    162             RowKey:用户ID
    163             列簇:cf
    164             列标签:
    165                 直接使用用户ID,value值取微博内容的RowKey
    166                 
    167                 版本:设置最大版本为1000
    168              */
    169             HTableDescriptor receiveContentEmail = 
    170                     new HTableDescriptor(TableName.valueOf(receive_content_email));
    171             HColumnDescriptor rce_cf = new HColumnDescriptor(Bytes.toBytes("cf"));
    172             // 开启列簇 -- store的块缓存
    173             rce_cf.setBlockCacheEnabled(true);
    174             rce_cf.setBlocksize(1024*1024*2);
    175             
    176             rce_cf.setCompressionType(Algorithm.SNAPPY);
    177             
    178             rce_cf.setMaxVersions(1000);
    179             rce_cf.setMinVersions(1000);
    180             
    181             receiveContentEmail.addFamily(rce_cf);
    182             
    183             admin.createTable(receiveContentEmail);
    184             
    185         } catch (MasterNotRunningException e) {
    186             e.printStackTrace();
    187         } catch (ZooKeeperConnectionException e) {
    188             e.printStackTrace();
    189         } catch (IOException e) {
    190             e.printStackTrace();
    191         }finally{
    192             if(admin!=null)
    193                 try {
    194                     admin.close();
    195                 } catch (IOException e) {
    196                     e.printStackTrace();
    197                 }
    198         }
    199     }
    200     
    201     /*
    202      *  发布微博内容:
    203      *      1)在微博内容表中插入一行数据
    204      *      2)在用户微博内容接收邮件箱表对用户的所有粉丝用户添加数据
    205      *  
    206      *  Put
    207      *  put 'tablename','rowkey','cf:cq','value'
    208      */
    209     public void pubishWeiboContent(String uid,String content){
    210         HConnection hconn = null;
    211         try {
    212             hconn = HConnectionManager.createConnection(conf);
    213             // 1)在微博内容表中插入一行数据
    214             HTableInterface weiboContentTbl = hconn.getTable(TableName.valueOf(weibo_content));
    215             // rowkey : uid_timestamp
    216             long timestamp = System.currentTimeMillis();
    217             String rowkey = uid+"_"+timestamp;
    218             Put put = new Put(Bytes.toBytes(rowkey));
    219             put.add(Bytes.toBytes("cf"), Bytes.toBytes("content"), Bytes.toBytes(content));
    220             weiboContentTbl.put(put);
    221                     
    222             // 查询该用户的粉丝用户
    223             HTableInterface relationsTbl = hconn.getTable(TableName.valueOf(relations));
    224             // get 'tablename','rowkey','cf','cq'
    225             Get get = new Get(Bytes.toBytes(uid));
    226             // 查询粉丝列簇下的所有粉丝
    227             get.addFamily(Bytes.toBytes("fans"));
    228             Result r = relationsTbl.get(get);
    229             
    230             List<byte[]> fans = new ArrayList<byte[]>();
    231             Cell[] cells = r.rawCells();
    232             for(Cell c : cells){
    233                 fans.add(CellUtil.cloneQualifier(c));
    234             }
    235             
    236             if(fans.size() > 0){
    237                 //2)在用户微博内容接收邮件箱表对用户的所有粉丝用户添加数据
    238                 HTableInterface rceTbl = hconn.getTable(TableName.valueOf(receive_content_email));
    239                 List<Put> ps = new  ArrayList<Put>();
    240                 for(byte[] fanId : fans){
    241                     Put p = new Put(fanId);
    242 //                    p.add(Bytes.toBytes("cf"), 
    243 //                            Bytes.toBytes(uid), 
    244 //                            Bytes.toBytes(uid+"_"+System.currentTimeMillis()));
    245                     
    246                     p.add(Bytes.toBytes("cf"), 
    247                             Bytes.toBytes(uid), timestamp,
    248                             Bytes.toBytes(rowkey));
    249                     ps.add(p);
    250                 }
    251                 rceTbl.put(ps);
    252             }
    253         } catch (IOException e) {
    254             
    255             e.printStackTrace();
    256         }finally{
    257             if(hconn!=null)
    258                 try {
    259                     hconn.close();
    260                 } catch (IOException e) {
    261                     e.printStackTrace();
    262                 }
    263         }
    264     }
    265     
    266     
    267     //添加关注用户
    268     /**
    269      * 添加关注用户
    270      *     1)在微博用户关系表中,新增数据(关注用户列簇下添加标签)
    271      *     2)从被添加的关注用户角度,新增粉丝用户
    272      *     3)在微博邮件箱中添加关注用户发布的微博内容通知
    273      * 
    274      * @param uid
    275      * @param attends
    276      */
    277     public void addAttends(String uid,String... attends){
    278         
    279         if(attends == null || attends.length <= 0) return ;
    280         
    281         HConnection hconn = null;
    282         try {
    283             hconn = HConnectionManager.createConnection(conf);
    284             //1)在微博用户关系表中,新增数据(关注用户列簇下添加标签)
    285             HTableInterface relationsTbl = hconn.getTable(TableName.valueOf(relations));
    286             List<Put> ps = new ArrayList<Put>();
    287             Put put = new Put(Bytes.toBytes(uid));
    288             for(String attend:attends){
    289                 put.add(Bytes.toBytes("attend"), Bytes.toBytes(attend), Bytes.toBytes(attend));
    290                 // 2)从被添加的关注用户角度,新增粉丝用户
    291                 Put attendPut = new Put(Bytes.toBytes(attend));
    292                 attendPut.add(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
    293                 ps.add(attendPut);
    294             }
    295             ps.add(put);
    296             relationsTbl.put(ps);
    297             
    298             
    299             //3)在微博邮件箱中添加关注用户发布的微博内容通知
    300             // 先查询关注用户发布微博内容
    301             HTableInterface weiboContentTbl = hconn.getTable(TableName.valueOf(weibo_content));
    302             List<byte[]> rks = new ArrayList<byte[]>();
    303             Scan scan = new Scan();
    304             for(String attend:attends){
    305                 // Filter
    306                 // 扫描表的rowkey,只有rowkey含有字符串("关注用户ID_"),取出
    307                 RowFilter rowFilter = 
    308                         new RowFilter(CompareOp.EQUAL, new SubstringComparator(attend+"_"));
    309                 scan.setFilter(rowFilter);
    310                 ResultScanner resultScanner = weiboContentTbl.getScanner(scan);
    311                 Iterator<Result> it = resultScanner.iterator();
    312                 while(it.hasNext()){
    313                     Result r = it.next();
    314                     Cell[] cells = r.rawCells();
    315                     for(Cell c : cells){
    316                         rks.add(CellUtil.cloneRow(c));
    317                     }
    318                 }
    319             }
    320             if(rks.size() > 0){
    321                 //List<byte[]> rks = new ArrayList<byte[]>();
    322                 HTableInterface rceTbl = hconn.getTable(TableName.valueOf(receive_content_email));
    323                 List<Put> puts = new ArrayList<Put>();
    324                 for(byte[] rk : rks){
    325                     Put p = new Put(Bytes.toBytes(uid));
    326                     String rowkey =  Bytes.toString(rk);
    327                     Long timestamp = Long.valueOf(rowkey.substring(rowkey.indexOf("_")+1));
    328                     String attendId = rowkey.substring(0, rowkey.indexOf("_"));
    329                     p.add(Bytes.toBytes("cf"), 
    330                             Bytes.toBytes(attendId), timestamp,rk);
    331                     
    332                     puts.add(p);
    333                 }
    334                 rceTbl.put(puts);
    335             }
    336             
    337         } catch (IOException e) {
    338             e.printStackTrace();
    339         }finally{
    340             if(hconn!=null)
    341                 try {
    342                     hconn.close();
    343                 } catch (IOException e) {
    344                     e.printStackTrace();
    345                 }
    346         }
    347         
    348     }
    349     
    350     /**
    351      * 取消关注用户
    352      * 1)在微博用户关系表,针对该用户,删除被取消的关注用户所对应的单元格
    353      * 2)在微博用户关系表,针对被取消用户,删除它们的粉丝用户
    354      * 3)在微博内容接收邮件箱表中,移除该用户的这些被取消关注用户微博内容通知记录
    355      * @param uid
    356      * @param attends  可变长度的参数列表
    357      */
    358     public void removeAttends(String uid,String... attends){
    359         
    360         if(attends == null || attends.length <= 0) return ;
    361         
    362         HConnection hconn = null;
    363         try {
    364             hconn = HConnectionManager.createConnection(conf);
    365             
    366             // 1)在微博用户关系表,针对该用户,删除被取消的关注用户所对应的单元格
    367             HTableInterface relationsTbl = hconn.getTable(TableName.valueOf(relations));
    368             List<Delete> deletes = new ArrayList<Delete>();
    369             Delete delete = new Delete(Bytes.toBytes(uid));
    370             for(String attend:attends){
    371                 delete.deleteColumn(Bytes.toBytes("attend"), Bytes.toBytes(attend));
    372                 // 2)在微博用户关系表,针对被取消用户,删除它们的粉丝用户
    373                 Delete deleteFan = new Delete(Bytes.toBytes(attend));
    374                 deleteFan.deleteColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
    375                 deletes.add(deleteFan);
    376             }
    377             deletes.add(delete);
    378             relationsTbl.delete(deletes);
    379             
    380             
    381             //3)在微博内容接收邮件箱表中,移除该用户的这些被取消关注用户微博内容通知记录
    382             HTableInterface rceTbl = hconn.getTable(TableName.valueOf(receive_content_email));
    383             Delete deleteRCE = new Delete(Bytes.toBytes(uid));
    384             for(String attend:attends){
    385                 // deleteColumn删除最近版本
    386                 //deleteRCE.deleteColumn(Bytes.toBytes("cf"), Bytes.toBytes(attend));
    387                 // 删除单元格的所有版本
    388                 Long timestamp = System.currentTimeMillis();
    389                 deleteRCE.deleteColumns(Bytes.toBytes("cf"), Bytes.toBytes(attend),timestamp+1000000);
    390             }
    391             rceTbl.delete(deleteRCE);
    392             
    393         } catch (IOException e) {
    394             e.printStackTrace();
    395         }finally{
    396             if(hconn!=null)
    397                 try {
    398                     hconn.close();
    399                 } catch (IOException e) {
    400                     e.printStackTrace();
    401                 }
    402         }
    403     }
    404     
    405     /**
    406      * 用户获取所关注用户的微博内容
    407      * 1) 从微博内容接收邮件箱表中获取用户其关注用户的微博内容 rowkey
    408      * 2)从微博内容表中取出微博内容
    409      * 
    410      * 
    411      * @param uid
    412      * @return
    413      */
    414     public List<Message> getAttendContents(String uid){
    415         
    416         List<Message>  msgs = new ArrayList<Message>();
    417         HConnection hconn = null;
    418         try {
    419             hconn = HConnectionManager.createConnection(conf);
    420             // 1) 从微博内容接收邮件箱表中获取用户其关注用户的微博内容 rowkey
    421             HTableInterface rceTbl = hconn.getTable(TableName.valueOf(receive_content_email));
    422             Get get = new Get(Bytes.toBytes(uid));
    423             get.setMaxVersions(5);
    424             Result r = rceTbl.get(get);
    425             List<byte[]> rks = new ArrayList<byte[]>();
    426             Cell[] cells = r.rawCells();
    427             if(cells != null && cells.length > 0){
    428                 for(Cell c : cells){
    429                     
    430                     byte[] rk = CellUtil.cloneValue(c);
    431                     rks.add(rk);
    432                 }
    433             }
    434             
    435             //2)从微博内容表中取出微博内容
    436             if(rks.size() > 0){
    437                 HTableInterface weiboContentTbl = hconn.getTable(TableName.valueOf(weibo_content));
    438                 List<Get> gets = new  ArrayList<Get>();
    439                 for(byte[] rk : rks){
    440                     Get g = new Get(rk);
    441                     gets.add(g);
    442                 }
    443                 
    444                 Result[] results = weiboContentTbl.get(gets);
    445                 for(Result result : results){
    446                     Cell[] cls = result.rawCells();
    447                     for(Cell cell : cls){
    448                         Message msg = new Message();
    449                         String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
    450                         String attendUid = rowkey.substring(0, rowkey.indexOf("_"));
    451                         msg.setUid(attendUid);
    452                         String timestamp = rowkey.substring(rowkey.indexOf("_")+1);
    453                         msg.setTimestamp(timestamp);
    454                         
    455                         String content = Bytes.toString(CellUtil.cloneValue(cell));
    456                         msg.setContent(content);
    457                         
    458                         msgs.add(msg);
    459                     }
    460                 }
    461             }
    462             
    463         } catch (IOException e) {
    464             e.printStackTrace();
    465         }finally{
    466             if(hconn!=null)
    467                 try {
    468                     hconn.close();
    469                 } catch (IOException e) {
    470                     e.printStackTrace();
    471                 }
    472         }
    473         return msgs;
    474     } 
    475     
    476     public static void main(String[] args) {
    477         Weibo wb = new  Weibo();
    478         wb.initNameSpace();
    479         wb.initTable();
    480         //wb.pubishWeiboContent("0001", "今天天气真不错!");
    481         //wb.pubishWeiboContent("0003", "今天天气真不错!");
    482         //wb.pubishWeiboContent("0003", "今天天气真不错!");
    483         //wb.pubishWeiboContent("0004", "今天天气真不错!");
    484         //wb.pubishWeiboContent("0004", "今天天气真不错!");
    485         //wb.pubishWeiboContent("0005", "今天天气真不错!");
    486         
    487         //wb.addAttends("0001", "0003","0004","0005");
    488         //wb.removeAttends("0001", "0003");
    489         
    490         //List<Message> msgs = wb.getAttendContents("0001");
    491         
    492         //System.out.println(msgs);
    493         
    494         for(int i=0;i < 1000 ;i++){
    495             wb.pubishWeiboContent(String.format("%04d", i), "今天天气真不错!" + i);
    496         }
    497         
    498         
    499     }
    500 
    501 }

      

      

  • 相关阅读:
    LVS负载均衡软件使用及(LVS简介、三种工作模式、十种调度算法)
    Nginx+Tomcat实现负载均衡!
    Nginx实现HTTP及TCP负载均衡
    Nginx 反向代理报400错误解决方法!
    Session服务器之Session复制!
    反射、内置方法和元类
    多态和绑定方法
    封装和继承
    面向对象编程
    软件目录规范下的AMT+购物车(简易版)的实现
  • 原文地址:https://www.cnblogs.com/juncaoit/p/6168907.html
Copyright © 2011-2022 走看看