zoukankan      html  css  js  c++  java
  • Hadoop-03 基于Hadoop的JavaEE数据可视化简易案例(升级使用HBase存储结果集)

    项目结构

    HBase工具类的编写

    在上一示例的基础上增加HBase工具类的编写

    package com.etc.util;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.TreeMap;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.util.Bytes;
    
    /**
     * HBase工具类
     * 
     * @author Administrator
     * 
     */
    public class HBaseUtil {
    
    
    	/**
    	 * 扫描全表
    	 * 
    	 * @param tableName表名
    	 * @return 键值集合
    	 * @throws IOException
    	 */
    	public static Map<String, Integer> scan(String tableName) {
    		//HBase配置对象
    		Configuration conf = HBaseConfiguration.create();
    		Map<String, Integer> map = new TreeMap<String, Integer>();// 键值集合
    		Connection conn = null;// Hbase数据库连接接口
    		try {
    			conn = ConnectionFactory.createConnection(conf);// 获取数据库连接
    			Table tb = conn.getTable(TableName.valueOf(tableName));// 获取Table对象
    			ResultScanner rs = tb.getScanner(new Scan());// 扫描全表
    			for (Result row : rs) {// 遍历每一行记录
    				// 获取Key
    				String key = Bytes.toString(row.getRow());
    				Integer value = null;
    				// 获取value
    				for (Cell cell : row.listCells()) {
    					if (Bytes.toString(CellUtil.cloneFamily(cell)).equals("COUNT")) {
    						value = Bytes.toInt(CellUtil.cloneValue(cell));
    					}
    				}
    				map.put(key, value);
    			}
    		} catch (IOException e) {
    			e.printStackTrace();
    		} finally {
    			try {
    				if (conn != null) {
    					conn.close();
    				}
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    		return map;
    	}
    
    	/**
    	 * 创建HBase数据表
    	 * 
    	 * @param tableName表名
    	 * @param columnFamily列族名
    	 * @throws IOException异常
    	 */
    	public static void createTable(String tableName, String columnFamily) throws IOException {
    		Configuration conf = HBaseConfiguration.create();
    		// 建立HBase数据库的连接
    		Connection conn = ConnectionFactory.createConnection(conf);
    		// 创建HBase数据库Admin对象
    		HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
    		if (admin.tableExists(tableName)) {
    			admin.disableTable(tableName);// 先使表无效
    			admin.deleteTable(tableName);// //删除表
    		}
    		// 新建一个表描述
    		HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
    		// 在表描述里添加列族
    		tableDesc.addFamily(new HColumnDescriptor(columnFamily));
    		// 根据配置好的表描述建表
    		admin.createTable(tableDesc);
    		conn.close();
    	}
    
    }
    

      

    Reduce和HBase的整合

    修改MapRecude,使得Reduce的输出为HBase。

    package com.etc.mc;
    
    import java.io.IOException;
    import java.util.HashMap;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
    import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
    import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import com.etc.util.HBaseUtil;
    
    /** 歌曲点播统计 */
    public class MusicCountJob {
    
    	//定义保存统计数据结果的map集合
    	public static HashMap<String, Integer> map=new HashMap<String, Integer>();
    	
    	
    	public static class MusicMapper extends Mapper<Object, Text, Text, IntWritable> {
    
    		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    
    			IntWritable valueOut = new IntWritable(1);
    			String keyInStr = value.toString();
    			String[] keyInStrArr = keyInStr.split("	");// 使用	将输入 文本行转换为字符串
    			String keyOut = keyInStrArr[0];// 获取歌曲名称
    			context.write(new Text(keyOut), valueOut);
    		}
    	}
    	
    	
    	/** 统计数据写入到HBase */
    	public static class MusicReducer extends
    			TableReducer<Text, IntWritable, Text> {
    		@Override
    		protected void reduce(Text key, Iterable<IntWritable> values,
    				Context context) throws IOException, InterruptedException {
    			int total = 0;// 统计浏览器
    			for (IntWritable item : values) {
    				total += item.get();
    			}
    			// 指定行键
    			Put put = new Put(Bytes.toBytes(key.toString()));
    			// 为Put对象指定列和值
    			put.addColumn(Bytes.toBytes("COUNT"), Bytes.toBytes(""),
    					Bytes.toBytes(total));
    			// 写入HBase
    			context.write(key, put);
    		}
    	}
    
    	
    	public static void main(String args[]) throws Exception {		
    		
    		Configuration conf = HBaseConfiguration.create();
    		// 增加HBase类型序列化工具类,根据输入类给出序列化接口和反序列化接口
    		conf.setStrings("io.serializations", conf.get("io.serializations"),
    				MutationSerialization.class.getName(),
    				ResultSerialization.class.getName(),
    				KeyValueSerialization.class.getName());
    		//项目配置文件读取
    		conf.addResource("mapred-site.xml");
    		conf.addResource("core-site.xml");
    		conf.addResource("hbase-site.xml");		
    		Job job = Job.getInstance(conf, "MusicCount_JOB");// 实例化作业		
    		job.setJarByClass(MusicCountJob.class);// 指定运行Job的jar的class		
    		job.setNumReduceTasks(1);// 指定ReduceTask任务为1个		
    		
    		job.setMapperClass(MusicMapper.class);// 指定Mapper		
    		job.setMapOutputKeyClass(Text.class);// 指定Mapper输出key格式		
    		job.setMapOutputValueClass(IntWritable.class);// 指定Mapper输出value格式		
    		// 设置输入文件路径
    		FileInputFormat.addInputPath(job, new Path("hdfs://centos:9000/music/music1.txt"));
    		FileInputFormat.addInputPath(job, new Path("hdfs://centos:9000/music/music2.txt"));
    		FileInputFormat.addInputPath(job, new Path("hdfs://centos:9000/music/music3.txt"));
    		FileInputFormat.addInputPath(job, new Path("hdfs://centos:9000/music/music4.txt"));
    		
    		//创建数据表
    		HBaseUtil.createTable("MyMusic", "COUNT");
    		job.setReducerClass(MusicReducer.class);// 指定Reducer		
    		job.setOutputFormatClass(TableOutputFormat.class);// 指定Reducer输出格式
    		job.setOutputKeyClass(ImmutableBytesWritable.class);
    		job.setOutputValueClass(Put.class);				
    		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "MyMusic");// 指定输出Table名
    		
    		job.waitForCompletion(true);
    
    
    	}
    }
    

      

    修改Servlet代码

    修改获取数据方式:从HBase获取数据。

    package com.etc.action;
    
    import java.io.IOException;
    import java.io.PrintWriter;
    import java.util.HashMap;
    import java.util.Map;
    
    import javax.servlet.ServletException;
    import javax.servlet.annotation.WebServlet;
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import com.alibaba.fastjson.JSON;
    import com.etc.mc.MusicCountJob;
    import com.etc.util.HBaseUtil;
    
    /**向客户端提供json数据*/
    @WebServlet("/CountServlet")
    public class CountServlet extends HttpServlet {
    	private static final long serialVersionUID = 1L;
    	
    
    	protected void doGet(HttpServletRequest request, HttpServletResponse response)
    			throws ServletException, IOException {
    		//post乱码处理	
    		request.setCharacterEncoding("utf-8");	
    		// 设置响应数据类型
    		response.setContentType("text/html");
    		// 设置响应编码格式
    		response.setCharacterEncoding("utf-8");
    		// 获取out对象
    		PrintWriter out = response.getWriter();		
    		//组织json数据
    		Map<String, Integer> map=null;
    		
    		try {
    			map=HBaseUtil.scan("MyMusic");
    		} catch (Exception e) {
    			System.out.println("获取数据出错");
    		}
    		
    		//通过构建map集合转换为嵌套json格式数据
    		HashMap jsonmap = new HashMap();
    		jsonmap.put("mytitle","歌词播放统计");
    		jsonmap.put("mylegend", "点播");
    		jsonmap.put("prolist", map);
    		
    		String str =JSON.toJSONString(jsonmap);			
    
    		out.print(str);
    		out.flush();
    		out.close();
    
    	}
    
    	protected void doPost(HttpServletRequest request, HttpServletResponse response)
    			throws ServletException, IOException {
    		doGet(request, response);
    	}
    
    }
    

      

  • 相关阅读:
    iOS项目之自定义斜向文字标签
    iOS进阶之两个模型数组的去重方法
    iOS进阶之正则表达式
    iOS项目之使用开关控制日志输出的功能
    iOS进阶之UDP代理鉴权过程
    990元外贸企业建站方案
    ¥990起,性价比最高的建站服务。
    PHP调用WEBSERVICE接口常见问题答疑以及总结
    Python消息队列工具 Python-rq 中文教程
    电商网站的用户停留时间越长越好吗?我看未必。
  • 原文地址:https://www.cnblogs.com/rask/p/11151158.html
Copyright © 2011-2022 走看看