业务需求说明,由于公司数据中心处于刚开始部署的阶段,这需要涉及其它部分将数据全部汇总到数据中心,这实现的方式是同上传json文件,通过采用socket&serversocket实现传输。
其中,服务端采用多线程的方式,实现多用户传输的目的。并且实现可以将数据写入到hbase中。
具体步骤如下:
1、首先编写客户端的代码:
package com.yiban.datacenter.ToHbaseFromJson; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.Socket; import java.net.UnknownHostException; public class hbaseclient { public static void main(String[] args) { // TODO Auto-generated method stub try { // 创建客户端的socket Socket s = new Socket("192.168.27.47", 22222); // 首先确认连接上了我的服务器,通过接受服务器发送的确认信息 BufferedWriter firstclientwrite = new BufferedWriter( new OutputStreamWriter(s.getOutputStream())); firstclientwrite.write("我准备向你发送数据了,你准备好接收了吗?"); firstclientwrite.newLine(); firstclientwrite.flush(); // 经通道内的字节输入流进行一个封装程字符流,方便直接输出 BufferedReader testconnection = new BufferedReader( new InputStreamReader(s.getInputStream())); // 输出结果 String sss = testconnection.readLine(); System.out.println(sss); //发送表名和列族名 firstclientwrite.write("nihao"); firstclientwrite.newLine(); firstclientwrite.flush(); //确定表发送是否成功 System.out.println(testconnection.readLine()); // 封装客户端的文本文件 BufferedReader clientread = new BufferedReader(new FileReader( "file.json")); // 准备将客户端的字符流写入到对应的通道内,OutputStreamWriter是将字符流转换成字节流, // BufferedWriter封装字符流 BufferedWriter clientwirte = new BufferedWriter( new OutputStreamWriter(s.getOutputStream())); String line = null; while ((line = clientread.readLine()) != null) { clientwirte.write(line); clientwirte.newLine(); clientwirte.flush(); } // 提示发送完成 s.shutdownOutput(); // 准备接收一个反馈 // 经通道内的字节输入流进行一个封装程字符流,方便直接输出 BufferedReader ctread = new BufferedReader(new InputStreamReader( s.getInputStream())); // 输出结果 String fackcall = null; while ((fackcall = ctread.readLine()) != null) { System.out.println(fackcall); } // 释放资源 clientread.close(); s.close(); } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
2、服务端的代码:
(1)线程类的实现
package com.yiban.datacenter.ToHbaseFromJson; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.NavigableMap; import java.util.Map.Entry; import net.sf.json.JSONArray; import net.sf.json.JSONObject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; public class UserThread implements Runnable { private String testconnect = "我准备向你发送数据了,你准备好接收了吗?"; // 这个可以用来验证用户名和密码 private static Configuration conf = HBaseConfiguration.create(); private static Connection connection = null; // 配置信息 static { try { conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.27.233"); conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181); connection = ConnectionFactory.createConnection(conf); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } private Socket s; public UserThread(Socket s) { this.s = s; } private String userTableName = null; private String columnFamilyName = null; @Override public void run() { // TODO Auto-generated method stub try { // 将通道内的字节流转换成字符流,并用bufferedreader进行封装,InputStreamReader是将字节流转换成字符流 BufferedReader serverread = new BufferedReader( new InputStreamReader(s.getInputStream())); // 询问客户端连接是否准备好,接受客户端的连接请求 String line = serverread.readLine(); // 阻塞 System.out.println(line);// 输出客户端的连接请求 // 将通道内的字符写入到对应的文件中,利用bufferedwrite进行封装,FileWriter是将字符流写入到文件中 BufferedWriter serverwrite = new BufferedWriter( new OutputStreamWriter(s.getOutputStream())); if (line.equals(testconnect)) { serverwrite.write("连接成功,你可以发送数据了,发送数据前,请先发送你要用的数据库表名!"); serverwrite.newLine(); serverwrite.flush(); } else { serverwrite.write("连接失败!"); serverwrite.newLine(); serverwrite.flush(); } // 准备接收表名和列族名 userTableName = serverread.readLine(); System.out.println("tablename:" + userTableName);// 输出客户端的连接请求 // 告诉客户端,我接受成功 if (TableIsExist(userTableName)) { serverwrite.write("接收表名成功"); serverwrite.newLine(); serverwrite.flush(); } else { serverwrite.write("表不存在"); serverwrite.newLine(); serverwrite.flush(); } // 循环读取客户端的数据 line = ""; StringBuffer temp = new StringBuffer(line); while ((line = serverread.readLine()) != null) { temp.append(line); } // 对json文件进行解析 JSONArray jsonArray = JSONArray.fromObject(temp.toString()); // 解析之后进行输出,在这里可以直接写入到hbase中 PrintJsonArray(jsonArray); getAllTables(conf); // 将接收到的数据写入hbase中的表中 insertData(jsonArray, userTableName); // 给出一个反馈,提示数据上传成功 // 封装通道内的输出流,方便对他进行写字符数据 BufferedWriter bwserver = new BufferedWriter( new OutputStreamWriter(s.getOutputStream())); bwserver.write("文件上传成功!"); // bwserver.newLine(); bwserver.flush(); bwserver.close(); // 释放资源 // serverwrite.close(); s.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @SuppressWarnings("deprecation") private void insertData(JSONArray jsonArray, String userTableName) { // TODO Auto-generated method stub Table table = null; try { table = connection.getTable(TableName.valueOf(userTableName)); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } List<Put> putlist = new ArrayList<Put>(); for (int i = 0; i < jsonArray.size(); i++) { JSONObject jsonobject = jsonArray.getJSONObject(i); Put put = new Put(Bytes.toBytes(jsonobject.getString("DocumentID")));// 指定行,也就是键值 // 参数分别:列族、列、值 put.add(Bytes.toBytes("info"), Bytes.toBytes("DocumentContent"), Bytes.toBytes(jsonobject.getString("DocumentContent"))); putlist.add(put); } try { table.put(putlist); table.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private boolean TableIsExist(String userTableName2) { boolean flag = false; try { // Connection connection = ConnectionFactory.createConnection(conf); Admin ad = connection.getAdmin(); if (ad.tableExists(TableName.valueOf(userTableName2))) { flag = true; System.out.println("表存在"); } else { System.out.println("表不存在"); } } catch (Exception e) { // TODO: handle exception } return flag; // TODO Auto-generated method stub } private void PrintJsonArray(JSONArray jsonArray) { int size = jsonArray.size(); System.out.println("Size: " + size); for (int i = 0; i < size; i++) { JSONObject jsonObject = jsonArray.getJSONObject(i); System.out.println("[" + i + "]id=" + jsonObject.get("id")); System.out.println("[" + i + "]name=" + jsonObject.get("name")); System.out.println("[" + i + "]role=" + jsonObject.get("role")); } } // create table private void createTable(Configuration conf) { // HBaseAdmin ha=new HBaseAdmin(conf); try { // Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(userTableName)); Admin ad = connection.getAdmin(); // TableName name= TableName.valueOf(Bytes.toBytes(tablename));//表名 HTableDescriptor desc = new HTableDescriptor(table.getName()); HColumnDescriptor family = new HColumnDescriptor( Bytes.toBytes(columnFamilyName));// 列簇 desc.addFamily(family); ad.createTable(desc); ad.close(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } // Hbase获取所有的表信息 public static List getAllTables(Configuration conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { HBaseAdmin ad = new HBaseAdmin(conf); List<String> tables = null; if (ad != null) { try { HTableDescriptor[] allTable = ad.listTables(); if (allTable.length > 0) tables = new ArrayList<String>(); for (HTableDescriptor hTableDescriptor : allTable) { tables.add(hTableDescriptor.getNameAsString()); System.out.println(hTableDescriptor.getNameAsString()); } } catch (IOException e) { e.printStackTrace(); } } return tables; } // 按顺序输出 public void printResult(Result rs) { if (rs.isEmpty()) { System.out.println("result is empty!"); return; } // new API and print Map of families to all versions of its qualifiers // and values. NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temps = rs .getMap(); String rowkey = Bytes.toString(rs.getRow()); // actain rowkey System.out.println("rowkey->" + rowkey); for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temp : temps .entrySet()) { System.out.print(" family->" + Bytes.toString(temp.getKey())); for (Entry<byte[], NavigableMap<Long, byte[]>> value : temp .getValue().entrySet()) { System.out.print(" col->" + Bytes.toString(value.getKey())); for (Entry<Long, byte[]> va : value.getValue().entrySet()) { System.out.print(" vesion->" + va.getKey()); System.out.print(" value->" + Bytes.toString(va.getValue())); System.out.println(); } } } } }
(2)主函数的实现
package com.yiban.datacenter.ToHbaseFromJson; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; public class HbaseServer { public static void main(String[] args) { // TODO Auto-generated method stub try { @SuppressWarnings("resource") ServerSocket ss=new ServerSocket(22222); while(true){ Socket s=ss.accept(); new Thread(new UserThread3(s)).start(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }