zoukankan      html  css  js  c++  java
  • java实现服务端守护进程来监听客户端通过上传json文件写数据到hbase中

    1、项目介绍:

      由于大数据部门涉及到其他部门将数据传到数据中心,大部分公司采用的方式是用json文件的方式传输,因此就需要编写服务端和客户端的小程序了。而我主要实现服务端的代码,也有相应的客户端的测试代码。这里须有一个需要提到的是,我在实现接收json文件的同时,而且还需将数据写到hbase中。写入到hbase当中采用的是批量插入的方式,即一次插入多条记录。

      好了,有了前面的说明,下面来简单的说一下我实现的服务端的小程序把。

    2、为了实现服务端能够监听客户端的行为,因此我在服务端采用多线程的技术来实现,并用socket的方式来实现网络通信。具体实现如下:

    服务端的主程序:

    package com.yiban.datacenter.finalversion;
    
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    public class HbaseServer {
    	
    	public static void main(String[] args) {
    		// TODO Auto-generated method stub
    		backprocess();
    	}
    	
    	public static void backprocess(){
    		try {
    			ServerSocket ss=new ServerSocket(11111);
    			while(true){
    				Socket s=ss.accept();
    				
    				Thread deal=new Thread(new DealUserThread(s));
    				deal.setDaemon(true);  //这里设置对应线程是后台线程
    				deal.start();
    			}
    			
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    }
    

     处理数据的线程类:

    在这里我实现了接收数据,并将数据写入hbase中。

    在实现这些大的目标的同时,也将客户端的请求通过日志文件的形式存到服务端的本地磁盘上,供后续查看。

    package com.yiban.datacenter.finalversion;
    
    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.File;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.OutputStreamWriter;
    import java.net.Socket;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Set;
    import java.util.TreeMap;
    
    import net.sf.json.JSONArray;
    import net.sf.json.JSONObject;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HConstants;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.MasterNotRunningException;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.ZooKeeperConnectionException;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.util.Bytes;
    
    public class DealUserThread implements Runnable {
    
    	private String testconnect = "username=chenpiao,password=123;username=liujiyu,password=123"; // 这个可以用来验证用户名和密码
    
    	private static Configuration conf = HBaseConfiguration.create();
    
    	private static Connection connection = null;
    	
    	private String logFile=null;
    	// 配置hbase的信息
    	static {
    		try {
    			conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.27.233");
    			conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
    			connection = ConnectionFactory.createConnection(conf);
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    	}
    
    	private Socket s;
    
    	public DealUserThread(Socket s) {
    		this.s = s;
    	}
    
    	private String userTableName = "nihao";
    	private String columnFamilyName = null;
    	private String rowKey = null;
    
    	private BufferedReader serverread = null;
    	private BufferedWriter serverwrite = null;
    
    	@Override
    	public void run() {
    		// TODO Auto-generated method stub
    		try {
    			// 将通道内的字节流转换成字符流,并用bufferedreader进行封装,InputStreamReader是将字节流转换成字符流
    			serverread = new BufferedReader(new InputStreamReader(
    					s.getInputStream()));
    
    			// 询问客户端连接是否准备好,接受客户端的连接请求
    			String line = serverread.readLine(); // 阻塞
    			//System.out.println(line);// 输出客户端的连接请求
    			
    			//为日志文件命名,并创建文件
    			SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); 
    			logFile="/var/log/datacenter/"+"user.log";
    			//System.out.println(logFile);
    			File destFile = new File(logFile);
    			if (!destFile.exists()) {
    				destFile.createNewFile();
    			}
    			writeByFileWrite(logFile, line+"
    "+sdf.format(System.currentTimeMillis()));
    			// 将通道内的字符写入到对应的文件中,利用bufferedwrite进行封装,FileWriter是将字符流写入到文件中
    			serverwrite = new BufferedWriter(new OutputStreamWriter(
    					s.getOutputStream()));
    			String[] strArray = testconnect.split("\;");
    			boolean flag = false;
    			for (String str : strArray) {
    				if (str.equals(line)) {
    					/*
    					 * serverwrite.write("连接成功,你可以发送数据了,发送数据前,请先发送你要用的数据库表名!");
    					 * serverwrite.newLine(); serverwrite.flush();
    					 */
    					printInfomationForClient("connection successful ,now you can send data,befor send data ,you must send tablename!");
    					flag = true;
    					break;
    				}
    			}
    
    			if (!flag) {
    				/*
    				 * serverwrite.write("密码或者用户名错误,连接失败!"); serverwrite.newLine();
    				 * serverwrite.flush();
    				 */
    				printInfomationForClient("username or password is error! connection failed!");
    				s.close();
    			}
    			
    			// 准备接收表名
    			userTableName = serverread.readLine();
    			//System.out.println("tablename:" + userTableName);// 输出客户端的连接请求的表名
    			
    			writeByFileWrite(logFile, "tablename="+userTableName);//将内容写到日志文件中
    			
    			// 告诉客户端,我接受成功
    			if (TableIsExist(userTableName)) {
    				printInfomationForClient("received tablename successful!");
    			} else {
    				printInfomationForClient("tablename Is not exist ");
    				s.close();
    			}
    
    
    			line = "[";
    			StringBuffer temp = new StringBuffer(line);
    			while ((line = serverread.readLine()) != null) {
    				temp.append(line);
    			}
    			temp.append("]");
    			//System.out.println(temp.toString());
    
    			// 对接收到的数据进行异常处理,如上传的数据格式不正确等等。
    			try {
    				// 对json文件进行解析
    				JSONArray jsonArray = JSONArray.fromObject(temp.toString());
    				// JSONObject jsonobject=JSONObject.fromObject(temp.toString());
    
    				// 解析之后进行输出
    				//PrintJsonArray(jsonArray);
    
    				//获取所有的表名
    				//getAllTables(conf);
    
    				// 将接收到的数据写入hbase中的表中
    				insertData(jsonArray, userTableName);
    				
    				//System.out.println("存入数据成功!");
    
    			} catch (Exception e) {
    				printErrorForClient(e);
    			}
    			// 给出一个反馈,提示数据上传成功
    			// 封装通道内的输出流,方便对他进行写字符数据
    			// BufferedWriter bwserver = new BufferedWriter(new
    			// OutputStreamWriter(s.getOutputStream()));
    
    			/*
    			 * serverwrite.write("文件上传成功!"); // bwserver.newLine();
    			 * serverwrite.flush(); serverwrite.close();
    			 */
    			printInfomationForClient("upload data successful!");
    
    			// 释放资源
    			s.close();
    
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    			try {
    				printErrorForClient(e);
    				writeByFileWrite(logFile,e.getMessage()+e.toString());
    			} catch (IOException e1) {
    				// TODO Auto-generated catch block
    				e1.printStackTrace();
    			}
    		}
    	}
    
    	private void printInfomationForClient(String s) throws IOException {
    		try {
    			serverwrite.write(s);
    			writeByFileWrite(logFile, s);//将内容写到日志文件中
    			serverwrite.newLine();
    			serverwrite.flush();
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    			writeByFileWrite(logFile, e.getMessage()+e.toString());//将内容写到日志文件中
    		}
    	}
    
    	private void printErrorForClient(Exception e) throws IOException {
    		try {
    			serverwrite.write("found a error:" + e.getMessage() + e.toString());
    			writeByFileWrite(logFile,"found a error:" + e.getMessage() + e.toString());
    			serverwrite.newLine();
    			serverwrite.flush();
    			s.close();
    		} catch (IOException e1) {
    			// TODO Auto-generated catch block
    			e1.printStackTrace();
    			writeByFileWrite(logFile,e1.getMessage()+e1.toString());
    		}
    
    	}
    
    	Map<String, String> colvalue = new TreeMap<String, String>();
    
    	private void insertData(JSONArray jsonArray, String userTableName)
    			throws IOException {
    
    		// connect the table
    		Table table = null;
    		try {
    			table = connection.getTable(TableName.valueOf(userTableName));
    		} catch (IOException e1) {
    			// TODO Auto-generated catch block
    			e1.printStackTrace();
    			printErrorForClient(e1);
    		}
    
    		colvalue.clear();
    
    		for (int i = 0; i < jsonArray.size(); i++) {
    			JSONObject obj = jsonArray.getJSONObject(i);
    			Set<String> keysets = obj.keySet();
    			for (String key : keysets) {
    				switch (key) {
    				case "category":
    					columnFamilyName = obj.getString(key);
    					break;
    				case "version":
    					colvalue.put("version", obj.getString("version"));
    					break;
    				case "DocumentType":
    					colvalue.put("DocumentType", obj.getString("DocumentType"));
    					break;
    				case "articles":
    					JSONArray articlesjars = obj.getJSONArray("articles");
    					dealjsonArray(table,articlesjars);
    					break;
    				default:
    					printErrorForClient(new Exception("send datatype is error!"));
    				}
    			}
    
    		}
    	}
    
    	private void insertColDataToHbase(Table table) throws IOException {
    		// 判断是否包含对应的列族,若不包含则添加
    		HTableDescriptor desc = new HTableDescriptor(table.getName());
    		Collection<HColumnDescriptor> familys=desc.getFamilies();
    		if (  familys.contains(new HColumnDescriptor(columnFamilyName)) 
    				&& columnFamilyName != null) {
    			addColFamily(table, desc, columnFamilyName);
    		}
    
    		// insert data
    		List<Put> putlist = new ArrayList<Put>();
    
    		if (!colvalue.isEmpty() && rowKey != null && columnFamilyName != null) {
    			Put put = new Put(Bytes.toBytes(rowKey));// 指定行,也就是键值
    			// 下面就是循环存储列
    			for (Entry<String, String> col : colvalue.entrySet()) {
    				put.add(Bytes.toBytes(columnFamilyName),
    						Bytes.toBytes(col.getKey()),
    						Bytes.toBytes(col.getValue()));
    				putlist.add(put);
    			}
    
    		} else {
    			printErrorForClient(new Exception("send datatype is error!"));
    		}
    
    		try {
    			table.put(putlist);
    			table.close();
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    			printErrorForClient(e);
    		}
    	}
    
    	private void dealjsonArray(Table table,JSONArray articlesjars) throws IOException {
    		// TODO Auto-generated method stub
    		if(articlesjars.isEmpty()){
    			System.out.println("articlesjars is empty");
    			printErrorForClient(new Exception("send datatype is error!"));
    			return;
    		}
    		for(int i=0;i<articlesjars.size();i++){
    			JSONObject obj=articlesjars.getJSONObject(i);
    			Set<String>keysets=obj.keySet();
    			for(String key:keysets){
    				switch(key){
    				case "content":
    					colvalue.put("content", obj.getString("content"));
    					break;
    				case "picture_url":
    					colvalue.put("picture_url", obj.getString("picture_url"));
    					break;
    				case "time":
    					colvalue.put("time", obj.getString("time"));
    					break;
    				case "author":
    					colvalue.put("author", obj.getString("author"));
    					break;
    				case "url":
    					rowKey=obj.getString("url");
    					break;
    				case "title":
    					colvalue.put("title", obj.getString("title"));
    					break;
    				default:
    					printErrorForClient(new Exception("send datatype is error!"));
    				}
    			}
    			try {
    				insertColDataToHbase(table);
    			} catch (IOException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    				printErrorForClient(e);
    			}	
    		}
    		
    	}
    
    	private void addColFamily(Table table, HTableDescriptor desc,
    			String colFamily) throws IOException {
    
    		Admin ad = connection.getAdmin();
    
    		HColumnDescriptor family = new HColumnDescriptor(
    				Bytes.toBytes(colFamily));// 列簇
    		desc.addFamily(family);
    		ad.addColumn(table.getName(), family);
    		ad.close();
    
    	}
    
    	private boolean TableIsExist(String userTableName2) throws IOException {
    		boolean flag = false;
    		try {
    			// Connection connection = ConnectionFactory.createConnection(conf);
    			Admin ad = connection.getAdmin();
    			if (ad.tableExists(TableName.valueOf(userTableName2))) {
    				flag = true;
    				//System.out.println("表存在");
    			} else {
    				//System.out.println("表不存在");
    				//printErrorForClient(new Exception("表不存在"));
    			}
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    			printErrorForClient(e);
    		}
    
    		return flag;
    		// TODO Auto-generated method stub
    
    	}
    
    	private void PrintJsonArray(JSONArray jsonArray) {
    		int size = jsonArray.size();
    		System.out.println("Size: " + size);
    		for (int i = 0; i < size; i++) {
    			JSONObject jsonObject = jsonArray.getJSONObject(i);
    			Set<String> keysets = jsonObject.keySet();
    			for (String keyset : keysets) {
    				System.out.println(keyset);
    			}
    		}
    	}
    
    	private void PrintJsonArray(JSONObject jsonobject, String... keys) {
    		int size = jsonobject.size();
    		System.out.println("Size: " + size);
    		for (int i = 0; i < size; i++) {
    			for (String key : keys) {
    				System.out.println(key + ":" + jsonobject.get(key));
    			}
    
    			// System.out.println("[" + i + "]id=" + jsonObject.get("id"));
    			// System.out.println("[" + i + "]name=" + jsonObject.get("name"));
    			// System.out.println("[" + i + "]role=" + jsonObject.get("role"));
    		}
    	}
    
    	//写日志文件
    	public static void writeByFileWrite(String _sDestFile, String _sContent)
    			throws IOException {
    		FileWriter fw = null;
    		try {
    			fw = new FileWriter(_sDestFile,true);
    			fw.write(_sContent);
    			fw.write('
    ');
    		} catch (Exception ex) {
    			ex.printStackTrace();
    		} finally {
    			if (fw != null) {
    				fw.close();
    				fw = null;
    			}
    		}
    	}
    	
    	// create table
    	private void createTable(Configuration conf) {
    		// HBaseAdmin ha=new HBaseAdmin(conf);
    		try {
    			// Connection connection = ConnectionFactory.createConnection(conf);
    			Table table = connection.getTable(TableName.valueOf(userTableName));
    			Admin ad = connection.getAdmin();
    
    			// TableName name= TableName.valueOf(Bytes.toBytes(tablename));//表名
    			HTableDescriptor desc = new HTableDescriptor(table.getName());
    
    			HColumnDescriptor family = new HColumnDescriptor(
    					Bytes.toBytes(columnFamilyName));// 列簇
    			desc.addFamily(family);
    
    			ad.createTable(desc);
    			ad.close();
    
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    
    	}
    
    	// Hbase获取所有的表信息
    	public static List getAllTables(Configuration conf)
    			throws MasterNotRunningException, ZooKeeperConnectionException,
    			IOException {
    
    		HBaseAdmin ad = new HBaseAdmin(conf);
    		List<String> tables = null;
    		if (ad != null) {
    			try {
    				HTableDescriptor[] allTable = ad.listTables();
    				if (allTable.length > 0)
    					tables = new ArrayList<String>();
    				for (HTableDescriptor hTableDescriptor : allTable) {
    					tables.add(hTableDescriptor.getNameAsString());
    					System.out.println(hTableDescriptor.getNameAsString());
    				}
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    		return tables;
    	}
    
    }
    

     实现了这些之后,通过eclipse将其导出程可运行的jar包,并将jar包放到服务器上进行部署,部署的方式很简单,但是也要注意一下:

      java -jar myserver.jar com.yiban.datacenter.finalversion.HbaseServer &

    最后一个&表示后台守护启动该进程。

  • 相关阅读:
    Laravel5.0学习--01 入门
    MySQL账户安全设置
    360路由器c301最新固件支持万能中继
    PsySH:PHP交互运行环境
    PHP-CS-Fixer:格式化你的PHP代码
    JVM 类加载机制详解
    Java虚拟机(JVM)概述
    聊一聊 Spring 中的线程安全性
    Java 里如何实现线程间通信
    Java 数据结构
  • 原文地址:https://www.cnblogs.com/ljy2013/p/4897707.html
Copyright © 2011-2022 走看看