zoukankan      html  css  js  c++  java
  • 简单通过java的socket&serversocket以及多线程技术实现多客户端的数据的传输,并将数据写入hbase中

    业务需求说明,由于公司数据中心处于刚开始部署的阶段,这需要涉及其它部分将数据全部汇总到数据中心,这实现的方式是同上传json文件,通过采用socket&serversocket实现传输。

    其中,服务端采用多线程的方式,实现多用户传输的目的。并且实现可以将数据写入到hbase中。

    具体步骤如下:

    1、首先编写客户端的代码:

    package com.yiban.datacenter.ToHbaseFromJson;
    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.FileReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.OutputStreamWriter;
    import java.net.Socket;
    import java.net.UnknownHostException;
    
    public class hbaseclient {
    
    	public static void main(String[] args) {
    		// TODO Auto-generated method stub
    		try {
    			// 创建客户端的socket
    			Socket s = new Socket("192.168.27.47", 22222);
    
    			// 首先确认连接上了我的服务器,通过接受服务器发送的确认信息
    			BufferedWriter firstclientwrite = new BufferedWriter(
    					new OutputStreamWriter(s.getOutputStream()));
    			firstclientwrite.write("我准备向你发送数据了,你准备好接收了吗?");
    			firstclientwrite.newLine();
    			firstclientwrite.flush();
    
    			// 经通道内的字节输入流进行一个封装程字符流,方便直接输出
    			BufferedReader testconnection = new BufferedReader(
    					new InputStreamReader(s.getInputStream()));
    			// 输出结果
    			String sss = testconnection.readLine();
    			System.out.println(sss);
    			
    			//发送表名和列族名
    			firstclientwrite.write("nihao");
    			firstclientwrite.newLine();
    			firstclientwrite.flush();
    			
    			//确定表发送是否成功
    			System.out.println(testconnection.readLine());
    
    			// 封装客户端的文本文件
    			BufferedReader clientread = new BufferedReader(new FileReader(
    					"file.json"));
    
    			// 准备将客户端的字符流写入到对应的通道内,OutputStreamWriter是将字符流转换成字节流,
    			// BufferedWriter封装字符流
    			BufferedWriter clientwirte = new BufferedWriter(
    					new OutputStreamWriter(s.getOutputStream()));
    
    			String line = null;
    			while ((line = clientread.readLine()) != null) {
    				clientwirte.write(line);
    				clientwirte.newLine();
    				clientwirte.flush();
    			}
    
    			// 提示发送完成
    			s.shutdownOutput();
    
    			// 准备接收一个反馈
    
    			// 经通道内的字节输入流进行一个封装程字符流,方便直接输出
    			BufferedReader ctread = new BufferedReader(new InputStreamReader(
    					s.getInputStream()));
    
    			// 输出结果
    			String fackcall = null;
    			while ((fackcall = ctread.readLine()) != null) {
    				System.out.println(fackcall);
    			}
    
    			// 释放资源
    			clientread.close();
    			s.close();
    
    		} catch (UnknownHostException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    }
    

    2、服务端的代码:

    (1)线程类的实现

    package com.yiban.datacenter.ToHbaseFromJson;
    
    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.OutputStreamWriter;
    import java.net.Socket;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.NavigableMap;
    import java.util.Map.Entry;
    
    import net.sf.json.JSONArray;
    import net.sf.json.JSONObject;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HConstants;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.MasterNotRunningException;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.ZooKeeperConnectionException;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.util.Bytes;
    
    public class UserThread implements Runnable {
    
    	private String testconnect = "我准备向你发送数据了,你准备好接收了吗?"; // 这个可以用来验证用户名和密码
    
    	private static Configuration conf = HBaseConfiguration.create();
    
    	private static Connection connection = null;
    	// 配置信息
    	static {
    		try {
    			conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.27.233");
    			conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
    			connection = ConnectionFactory.createConnection(conf);
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    	}
    
    	private Socket s;
    
    	public UserThread(Socket s) {
    		this.s = s;
    	}
    
    	private String userTableName = null;
    	private String columnFamilyName = null;
    
    	@Override
    	public void run() {
    		// TODO Auto-generated method stub
    		try {
    			// 将通道内的字节流转换成字符流,并用bufferedreader进行封装,InputStreamReader是将字节流转换成字符流
    			BufferedReader serverread = new BufferedReader(
    					new InputStreamReader(s.getInputStream()));
    
    			// 询问客户端连接是否准备好,接受客户端的连接请求
    			String line = serverread.readLine(); // 阻塞
    			System.out.println(line);// 输出客户端的连接请求
    
    			// 将通道内的字符写入到对应的文件中,利用bufferedwrite进行封装,FileWriter是将字符流写入到文件中
    			BufferedWriter serverwrite = new BufferedWriter(
    					new OutputStreamWriter(s.getOutputStream()));
    
    			if (line.equals(testconnect)) {
    				serverwrite.write("连接成功,你可以发送数据了,发送数据前,请先发送你要用的数据库表名!");
    				serverwrite.newLine();
    				serverwrite.flush();
    			} else {
    				serverwrite.write("连接失败!");
    				serverwrite.newLine();
    				serverwrite.flush();
    			}
    
    			// 准备接收表名和列族名
    			userTableName = serverread.readLine();
    			System.out.println("tablename:" + userTableName);// 输出客户端的连接请求
    
    			// 告诉客户端,我接受成功
    			if (TableIsExist(userTableName)) {
    				serverwrite.write("接收表名成功");
    				serverwrite.newLine();
    				serverwrite.flush();
    			} else {
    				serverwrite.write("表不存在");
    				serverwrite.newLine();
    				serverwrite.flush();
    			}
    
    			// 循环读取客户端的数据
    			line = "";
    			StringBuffer temp = new StringBuffer(line);
    			while ((line = serverread.readLine()) != null) {
    				temp.append(line);
    			}
    
    			// 对json文件进行解析
    			JSONArray jsonArray = JSONArray.fromObject(temp.toString());
    
    			// 解析之后进行输出,在这里可以直接写入到hbase中
    			PrintJsonArray(jsonArray);
    			getAllTables(conf);
    
    			// 将接收到的数据写入hbase中的表中
    			insertData(jsonArray, userTableName);
    
    			// 给出一个反馈,提示数据上传成功
    			// 封装通道内的输出流,方便对他进行写字符数据
    			BufferedWriter bwserver = new BufferedWriter(
    					new OutputStreamWriter(s.getOutputStream()));
    
    			bwserver.write("文件上传成功!");
    			// bwserver.newLine();
    			bwserver.flush();
    			bwserver.close();
    
    			// 释放资源
    			// serverwrite.close();
    			s.close();
    
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    	@SuppressWarnings("deprecation")
    	private void insertData(JSONArray jsonArray, String userTableName) {
    		// TODO Auto-generated method stub
    		Table table = null;
    		try {
    			table = connection.getTable(TableName.valueOf(userTableName));
    		} catch (IOException e1) {
    			// TODO Auto-generated catch block
    			e1.printStackTrace();
    		}
    		List<Put> putlist = new ArrayList<Put>();
    		for (int i = 0; i < jsonArray.size(); i++) {
    			JSONObject jsonobject = jsonArray.getJSONObject(i);
    
    			Put put = new Put(Bytes.toBytes(jsonobject.getString("DocumentID")));// 指定行,也就是键值
    			// 参数分别:列族、列、值
    			
    			put.add(Bytes.toBytes("info"),
    					Bytes.toBytes("DocumentContent"),
    					Bytes.toBytes(jsonobject.getString("DocumentContent")));
    			putlist.add(put);
    		}
    
    		try {
    			table.put(putlist);
    			table.close();
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    
    	}
    
    	private boolean TableIsExist(String userTableName2) {
    		boolean flag = false;
    		try {
    			// Connection connection = ConnectionFactory.createConnection(conf);
    			Admin ad = connection.getAdmin();
    			if (ad.tableExists(TableName.valueOf(userTableName2))) {
    				flag = true;
    				System.out.println("表存在");
    			} else {
    				System.out.println("表不存在");
    			}
    		} catch (Exception e) {
    			// TODO: handle exception
    		}
    
    		return flag;
    		// TODO Auto-generated method stub
    
    	}
    
    	private void PrintJsonArray(JSONArray jsonArray) {
    		int size = jsonArray.size();
    		System.out.println("Size: " + size);
    		for (int i = 0; i < size; i++) {
    			JSONObject jsonObject = jsonArray.getJSONObject(i);
    			System.out.println("[" + i + "]id=" + jsonObject.get("id"));
    			System.out.println("[" + i + "]name=" + jsonObject.get("name"));
    			System.out.println("[" + i + "]role=" + jsonObject.get("role"));
    		}
    	}
    
    	// create table
    	private void createTable(Configuration conf) {
    		// HBaseAdmin ha=new HBaseAdmin(conf);
    		try {
    			// Connection connection = ConnectionFactory.createConnection(conf);
    			Table table = connection.getTable(TableName.valueOf(userTableName));
    			Admin ad = connection.getAdmin();
    
    			// TableName name= TableName.valueOf(Bytes.toBytes(tablename));//表名
    			HTableDescriptor desc = new HTableDescriptor(table.getName());
    
    			HColumnDescriptor family = new HColumnDescriptor(
    					Bytes.toBytes(columnFamilyName));// 列簇
    			desc.addFamily(family);
    
    			ad.createTable(desc);
    			ad.close();
    
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    
    	}
    
    	// Hbase获取所有的表信息
    	public static List getAllTables(Configuration conf)
    			throws MasterNotRunningException, ZooKeeperConnectionException,
    			IOException {
    
    		HBaseAdmin ad = new HBaseAdmin(conf);
    		List<String> tables = null;
    		if (ad != null) {
    			try {
    				HTableDescriptor[] allTable = ad.listTables();
    				if (allTable.length > 0)
    					tables = new ArrayList<String>();
    				for (HTableDescriptor hTableDescriptor : allTable) {
    					tables.add(hTableDescriptor.getNameAsString());
    					System.out.println(hTableDescriptor.getNameAsString());
    				}
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    		return tables;
    	}
    
    	// 按顺序输出
    	public void printResult(Result rs) {
    		if (rs.isEmpty()) {
    			System.out.println("result is empty!");
    			return;
    		}
    		// new API and print Map of families to all versions of its qualifiers
    		// and values.
    		NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temps = rs
    				.getMap();
    		String rowkey = Bytes.toString(rs.getRow()); // actain rowkey
    		System.out.println("rowkey->" + rowkey);
    		for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temp : temps
    				.entrySet()) {
    			System.out.print("	family->" + Bytes.toString(temp.getKey()));
    			for (Entry<byte[], NavigableMap<Long, byte[]>> value : temp
    					.getValue().entrySet()) {
    				System.out.print("	col->" + Bytes.toString(value.getKey()));
    				for (Entry<Long, byte[]> va : value.getValue().entrySet()) {
    					System.out.print("	vesion->" + va.getKey());
    					System.out.print("	value->"
    							+ Bytes.toString(va.getValue()));
    					System.out.println();
    				}
    			}
    		}
    	}
    	
    }
    

     (2)主函数的实现

    package com.yiban.datacenter.ToHbaseFromJson;
    
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HConstants;
    
    public class HbaseServer {
    
    	public static void main(String[] args) {
    		// TODO Auto-generated method stub
    		try {
    			@SuppressWarnings("resource")
    			ServerSocket ss=new ServerSocket(22222);
    			
    			while(true){
    				Socket s=ss.accept();
    				
    				new Thread(new UserThread3(s)).start();
    			}
    			
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    }
    
  • 相关阅读:
    Flask基础教程
    htmlrunner-unittest生成多样的测试报告
    requestz-基于pycurl封装的requests库
    emailz-使发送邮件更方便
    filez-方便的数据文件加载器
    logz-易用的日志记录器
    基本语言(二)
    基本语言(一)
    编程范式(Programming paradigm)
    deque 双向队列知识点汇总
  • 原文地址:https://www.cnblogs.com/ljy2013/p/4839017.html
Copyright © 2011-2022 走看看