zoukankan      html  css  js  c++  java
  • 使用Java API连接和操作HBase数据库

    创建的数据库存储如下数据

    插入数据示意图完整版

    表结构

    image

    java代码

      1 
      2 public class HbaseTest {
      3 
      4 	/**
      5 	 * 配置ss
      6 	 */
      7 	static Configuration config = null;
      8 	private Connection connection = null;
      9 	private Table table = null;
     10 
     11 	@Before
     12 	public void init() throws Exception {
     13 		config = HBaseConfiguration.create();// 配置
     14 		config.set("hbase.zookeeper.quorum", "192.168.33.61");// zookeeper地址
     15 		config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
     16 		connection = ConnectionFactory.createConnection(config);
     17 		table = connection.getTable(TableName.valueOf("dept"));
     18 	}
     19 
     20 	/**
     21 	 * 创建数据库表dept,并增加列族info和subdept
     22 	 *
     23 	 * @throws Exception
     24 	 */
     25 	@Test
     26 	public void createTable() throws Exception {
     27 		// 创建表管理类
     28 		HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理
     29 		// 创建表描述类
     30 		TableName tableName = TableName.valueOf("dept"); // 表名称
     31 		HTableDescriptor desc = new HTableDescriptor(tableName);
     32 		// 创建列族的描述类
     33 		HColumnDescriptor family = new HColumnDescriptor("info"); // 列族
     34 		// 将列族添加到表中
     35 		desc.addFamily(family);
     36 		HColumnDescriptor family2 = new HColumnDescriptor("subdept"); // 列族
     37 		// 将列族添加到表中
     38 		desc.addFamily(family2);
     39 		// 创建表
     40 		admin.createTable(desc); // 创建表
     41 		System.out.println("创建表成功!");
     42 	}
     43 
     44 	/**
     45 	 * 向hbase中插入前三行网络部、开发部、测试部的相关数据,
     46 	 * 即加入表中的前三条数据
     47 	 *
     48 	 * @throws Exception
     49 	 */
     50 	@SuppressWarnings({ "deprecation", "resource" })
     51 	@Test
     52 	public void insertData() throws Exception {
     53 		table.setAutoFlushTo(false);
     54 		table.setWriteBufferSize(534534534);
     55 		ArrayList<Put> arrayList = new ArrayList<Put>();
     56 
     57 		Put put = new Put(Bytes.toBytes("0_1"));
     58 		put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("网络部"));
     59 		put.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept1"), Bytes.toBytes("1_1"));
     60 		put.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept2"), Bytes.toBytes("1_2"));
     61 		arrayList.add(put);
     62 
     63 		Put put1 = new Put(Bytes.toBytes("1_1"));
     64 		put1.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("开发部"));
     65 		put1.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1"));
     66 
     67 		Put put2 = new Put(Bytes.toBytes("1_2"));
     68 		put2.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("测试部"));
     69 		put2.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1"));
     70 
     71 		for (int i = 1; i <= 100; i++) {
     72 
     73 			put1.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept"+i), Bytes.toBytes("2_"+i));
     74 			put2.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept"+i), Bytes.toBytes("3_"+i));
     75 		}
     76 		arrayList.add(put1);
     77 		arrayList.add(put2);
     78 		//插入数据
     79 		table.put(arrayList);
     80 		//提交
     81 		table.flushCommits();
     82 		System.out.println("数据插入成功!");
     83 	}
     84 
     85 	/**
     86 	 * 向hbase中插入开发部、测试部下的所有子部门数据
     87 	 * @throws Exception
     88 	 */
     89 	@Test
     90 	public void insertOtherData() throws Exception {
     91 		table.setAutoFlushTo(false);
     92 		table.setWriteBufferSize(534534534);
     93 		ArrayList<Put> arrayList = new ArrayList<Put>();
     94 		for (int i = 1; i <= 100; i++) {
     95 			Put put_development = new Put(Bytes.toBytes("2_"+i));
     96 			put_development.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("开发"+i+""));
     97 			put_development.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("1_1"));
     98 			arrayList.add(put_development);
     99 
    100 			Put put_test = new Put(Bytes.toBytes("3_"+i));
    101 			put_test.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("测试"+i+""));
    102 			put_test.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("1_2"));
    103 			arrayList.add(put_test);
    104 		}
    105 
    106 		//插入数据
    107 		table.put(arrayList);
    108 		//提交
    109 		table.flushCommits();
    110 		System.out.println("插入其他数据成功!");
    111 	}
    112 
    113 	/**
    114 	 * 查询所有一级部门(没有上级部门的部门)
    115 	 * @throws Exception
    116 	 */
    117 	@Test
    118 	public void scanDataStep1() throws Exception {
    119 
    120 		// 创建全表扫描的scan
    121 		Scan scan = new Scan();
    122 		System.out.println("查询到的所有一级部门如下:");
    123 		// 打印结果集
    124 		ResultScanner scanner = table.getScanner(scan);
    125 		for (Result result : scanner) {
    126 			if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("f_pid")) == null) {
    127 				for (KeyValue kv : result.raw()) {
    128 				     System.out.print(new String(kv.getRow()) + " ");
    129 				     System.out.print(new String(kv.getFamily()) + ":");
    130 				     System.out.print(new String(kv.getQualifier()) + " = ");
    131 				     System.out.print(new String(kv.getValue()));
    132 				     System.out.print(" timestamp = " + kv.getTimestamp() + "
    ");
    133 				}
    134 		}
    135 		}
    136 	}
    137 
    138 	/**
    139 	 * 已知rowkey,查询该部门的所有(直接)子部门信息 rowkey=1_1
    140 	 * @throws Exception
    141 	 */
    142 	@Test
    143 	public void scanDataStep2() throws Exception {
    144 		Get g = new Get("1_1".getBytes());
    145 		g.addFamily("subdept".getBytes());
    146 		// 打印结果集
    147 		Result result = table.get(g);
    148 		for (KeyValue kv : result.raw()) {
    149 		     Get g1 = new Get(kv.getValue());
    150 		     Result result1 = table.get(g1);
    151 		     for (KeyValue kv1 : result1.raw()) {
    152 			     System.out.print(new String(kv1.getRow()) + " ");
    153 			     System.out.print(new String(kv1.getFamily()) + ":");
    154 			     System.out.print(new String(kv1.getQualifier()) + " = ");
    155 			     System.out.print(new String(kv1.getValue()));
    156 			     System.out.print(" timestamp = " + kv1.getTimestamp() + "
    ");
    157 			}
    158 		}
    159 	}
    160 
    161 	/**
    162 	 * 已知rowkey,向该部门增加一个子部门
    163 	 * rowkey:0_1
    164 	 * 增加的部门名:我增加的部门
    165 	 * @throws Exception
    166 	 */
    167 	@Test
    168 	public void scanDataStep3() throws Exception {
    169 		//新增一个部门
    170 		Put put = new Put(Bytes.toBytes("4_1"));
    171 		put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("我增加的部门"));
    172 		put.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1"));
    173 		//插入数据
    174 		table.put(put);
    175 		//提交
    176 		table.flushCommits();
    177 
    178 		//更新网络部
    179 		Put put1 = new Put(Bytes.toBytes("0_1"));
    180 		put1.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept3"), Bytes.toBytes("4_1"));
    181 		//插入数据
    182 		table.put(put1);
    183 		//提交
    184 		table.flushCommits();
    185 	}
    186 
    187 	/**
    188 	 * 已知rowkey(且该部门存在子部门),删除该部门信息,该部门所有(直接)子部门被调整到其他部门中
    189 	 * @throws Exception
    190 	 */
    191 	@Test
    192 	public void scanDataStep4() throws Exception {
    193 		/**
    194 		 * 向部门"我增加的部门"添加两个子部门"
    195 		 */
    196 		table.setAutoFlushTo(false);
    197 		table.setWriteBufferSize(534534534);
    198 		ArrayList<Put> arrayList = new ArrayList<Put>();
    199 		Put put1 = new Put(Bytes.toBytes("5_1"));
    200 		put1.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("新增子部门1"));
    201 		put1.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("4_1"));
    202 		Put put2 = new Put(Bytes.toBytes("5_2"));
    203 		put2.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("新增子部门2"));
    204 		put2.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("4_1"));
    205 
    206 		arrayList.add(put1);
    207 		arrayList.add(put2);
    208 		//插入数据
    209 		table.put(arrayList);
    210 		//提交
    211 		table.flushCommits();
    212 
    213 		/**
    214 		 * 目的:删除"我增加的部门"的部门信息,该部门所有(直接)子部门被调整到其他部门中
    215 		 * 使用策略:更新部门名就可以了,也就是说一个部门可能有多个rowkey
    216 		 */
    217 		Put put = new Put(Bytes.toBytes("4_1"));
    218 		put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("开发部"));
    219 		//插入数据
    220 		table.put(put);
    221 		//提交
    222 		table.flushCommits();
    223 	}
    224 
    225 	@After
    226 	public void close() throws Exception {
    227 		table.close();
    228 		connection.close();
    229 	}
    230 
    231 }
  • 相关阅读:
    项目有大小,生存各有道
    学习Spark——那些让你精疲力尽的坑
    学习Spark——环境搭建(Mac版)
    小程序新能力-个人开发者尝鲜微信小程序
    如何写出好代码
    华为手机nova2s使用第三方字体库
    std::string与std::wstring互相转换
    Steam安装Google Earth VR
    osgearth2.8关于RectangleNodeEditor编辑点不可见的问题
    Qt生成ui文件对应的.h和.cpp文件
  • 原文地址:https://www.cnblogs.com/daleyzou/p/8668727.html
Copyright © 2011-2022 走看看