一、背景
有一些时候,多个团队需要共同完成一个任务,比如,A团队将Hadoop集群计算的结果交给B团队继续计算,B完成了自己任务再交给C团队继续做。这就有点像业务系统的工作流一样,一环一环地传下
去,直到最后一部分完成。在业务系统中,我们经常会用SOA的架构来解决这种问题,每个团队在ESB(企业服务股总线)服务器上部署自己的服务,然后通过消息中间件完成调度任务。对亍分步式的多个
Hadoop集群系统的协作,同样可以用这种架构来做只要把消息中间件引擎换成支持分步式的消息中间件的引擎就行了。
本文楼主将使用zookeeper做为分步式消息中间件构造一个大型超市的部分数据计算模型来完成各个区域利润计算的业务需求。
由于采购和销售分别是由不同厂商进行的软件开发和维护,而且业务往来也在不同的城市和地区。 所以在每月底结算时,工作量都特别大。 比如,计算利润表: 当月利润 = 当月销售金额 - 当月采购
额 - 当月其他支出(楼主只是粗略计算)。如果采购系统是单独的系统,销售是另外单独的系统,及以其他几十个大大小小的系统, 如何能让多个系统,配合起来完成该需求?
二、系统构思
楼主基于zookeeper来构建一个分步式队列的应用,来解决上面的功能需求。排除了ESB的部分,只保留zookeeper进行实现。
- 采购数据:海量数据,基于Hadoop存储和分析(楼主环境有限,只使用了很少的数据)
- 销售数据:海量数据,基于Hadoop存储和分析(楼主环境有限,只使用了很少的数据)
- 其他费用支出:为少量数据,基于文件或数据库存储和分析
我们设计一个同步队列,这个队列有3个条件节点,分别对应采购(purchase),销售 (sell),其他费用(other)3个部分。当3个节点都被创建后,程序会自动触发计算利润, 幵创建利润(profit)节点。上面3个节点的创建,无顺序要求。每个节点只能被创建一次 。
Hadoop mapreduce1,Hadoop mapreduce2 是2个独立的Hadoop集群应用。 Java App 是2个独立的Java应用 。ZooKeeper集群的有3个节点 。
- /queue,是znode的队列目录,假设队列长度为3
- /queue/purchase,是znode队列中,1号排对者,由Hadoop mapreduce1提交,用于统计采购金额
- /queue/sell,是znode队列中,2号排对者,由Hadoop mapreduce2提交,用于统计销售金额
- /queue/other,是znode队列中,3号排对者,由Java App提交,用于统计其他费用支出金额
- /queue/profit,当znode队列中满了,触发创建利润节点。
当/qeueu/profit被创建后,利润java app被启动,所有zookeeper的连接通知同步程序(红色线),队列已完成,所有程序结束。
三、环境准备
1)hadoop集群。楼主用的6个节点的hadoop2.7.3集群,各位同学可以根据自己的实际情况进行搭建,但至少需要1台伪分布式的。(参考http://www.cnblogs.com/qq503665965/p/6790580.html)
2)zookeeper集群。至少三个节点。安装参考楼主这篇文章(http://www.cnblogs.com/qq503665965/p/6790580.html)
3)java开发环境。
四、mapreduce及java app程序
计算采购金额:
1 package zkqueue; 2 import java.io.IOException; 3 import java.util.HashMap; 4 import java.util.Map; 5 import java.util.regex.Pattern; 6 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapred.JobConf; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 19 20 21 /** 22 * 采购金额计算 23 * @author Jon_China 24 * 25 */ 26 public class Purchase { 27 28 public static final String HDFS = "hdfs://192.168.8.101:9000"; 29 public static final Pattern DELIMITER = Pattern.compile("[ ,]"); 30 31 public static class PurchaseMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 32 33 private String month = "2017-01"; 34 private Text k = new Text(month); 35 private IntWritable v = new IntWritable(); 36 private int money = 0; 37 38 public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { 39 System.out.println(values.toString()); 40 String[] tokens = DELIMITER.split(values.toString());//拆分源数据 41 if (tokens[3].startsWith(month)) {// 过滤1月份数据 42 money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//计算 43 v.set(money); 44 context.write(k, v); 45 } 46 } 47 } 48 49 public static class PurchaseReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 50 private IntWritable v = new IntWritable(); 51 private int money = 0; 52 53 @Override 54 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 55 for (IntWritable line : values) { 56 money += line.get(); 57 } 58 v.set(money); 59 context.write(null, v); 60 System.out.println("Output:" + key + "," + money); 61 } 62 63 } 64 65 public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException { 66 JobConf conf = config(); 67 String local_data = path.get("purchase"); 68 String input = path.get("input"); 69 String output = path.get("output"); 70 71 72 HdfsDAO hdfs = new HdfsDAO(HDFS, conf); 73 hdfs.rmr(input); 74 hdfs.mkdirs(input); 75 hdfs.copyFile(local_data, input); 76 77 Job job = Job.getInstance(conf); 78 job.setJarByClass(Purchase.class); 79 80 job.setOutputKeyClass(Text.class); 81 job.setOutputValueClass(IntWritable.class); 82 83 job.setMapperClass(PurchaseMapper.class); 84 job.setReducerClass(PurchaseReducer.class); 85 86 job.setInputFormatClass(TextInputFormat.class); 87 job.setOutputFormatClass(TextOutputFormat.class); 88 89 FileInputFormat.setInputPaths(job, new Path(input)); 90 FileOutputFormat.setOutputPath(job, new Path(output)); 91 92 job.waitForCompletion(true); 93 } 94 95 public static JobConf config() { 96 JobConf conf = new JobConf(Purchase.class); 97 conf.setJobName("purchase"); 98 conf.addResource("classpath:/hadoop/core-site.xml"); 99 conf.addResource("classpath:/hadoop/hdfs-site.xml"); 100 conf.addResource("classpath:/hadoop/mapred-site.xml"); 101 conf.addResource("classpath:/hadoop/yarn-site.xml"); 102 return conf; 103 } 104 105 public static Map<String,String> path(){ 106 Map<String, String> path = new HashMap<String, String>(); 107 path.put("purchase", Purchase.class.getClassLoader().getResource("logfile/biz/purchase.csv").getPath());// 源文件数据 108 path.put("input", HDFS + "/user/hdfs/biz/purchase");//hdfs存储路径 109 path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); //hdfs输出路径 110 return path; 111 } 112 113 public static void main(String[] args) throws Exception { 114 run(path()); 115 } 116 117 }
销售数据计算:
1 package zkqueue; 2 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 import java.util.regex.Pattern; 7 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.IntWritable; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapred.JobConf; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 21 /** 22 * 销售数据计算 23 * @author Jon_China 24 * 25 */ 26 public class Sell { 27 28 public static final String HDFS = "hdfs://192.168.8.101:9000"; 29 public static final Pattern DELIMITER = Pattern.compile("[ ,]"); 30 31 public static class SellMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 32 33 private String month = "2013-01"; 34 private Text k = new Text(month); 35 private IntWritable v = new IntWritable(); 36 private int money = 0; 37 38 public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { 39 System.out.println(values.toString()); 40 String[] tokens = DELIMITER.split(values.toString()); 41 if (tokens[3].startsWith(month)) {// 1月的数据 42 money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量 43 v.set(money); 44 context.write(k, v); 45 } 46 } 47 } 48 49 public static class SellReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 50 private IntWritable v = new IntWritable(); 51 private int money = 0; 52 53 @Override 54 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 55 for (IntWritable line : values) { 56 money += line.get(); 57 } 58 v.set(money); 59 context.write(null, v); 60 System.out.println("Output:" + key + "," + money); 61 } 62 63 } 64 65 public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException { 66 JobConf conf = config(); 67 String local_data = path.get("sell"); 68 String input = path.get("input"); 69 String output = path.get("output"); 70 71 // 初始化sell 72 HdfsDAO hdfs = new HdfsDAO(HDFS, conf); 73 hdfs.rmr(input); 74 hdfs.mkdirs(input); 75 hdfs.copyFile(local_data, input); 76 77 Job job = Job.getInstance(conf); 78 job.setJarByClass(Sell.class); 79 80 job.setOutputKeyClass(Text.class); 81 job.setOutputValueClass(IntWritable.class); 82 83 job.setMapperClass(SellMapper.class); 84 job.setReducerClass(SellReducer.class); 85 86 job.setInputFormatClass(TextInputFormat.class); 87 job.setOutputFormatClass(TextOutputFormat.class); 88 89 FileInputFormat.setInputPaths(job, new Path(input)); 90 FileOutputFormat.setOutputPath(job, new Path(output)); 91 92 job.waitForCompletion(true); 93 } 94 95 public static JobConf config() {// Hadoop集群的远程配置信息 96 JobConf conf = new JobConf(Purchase.class); 97 conf.setJobName("purchase"); 98 conf.addResource("classpath:/hadoop/core-site.xml"); 99 conf.addResource("classpath:/hadoop/hdfs-site.xml"); 100 conf.addResource("classpath:/hadoop/mapred-site.xml"); 101 conf.addResource("classpath:/hadoop/yarn-site.xml"); 102 return conf; 103 } 104 105 public static Map<String,String> path(){ 106 Map<String, String> path = new HashMap<String, String>(); 107 path.put("sell", Sell.class.getClassLoader().getResource("logfile/biz/sell.csv").getPath());// 本地的数据文件 108 path.put("input", HDFS + "/user/hdfs/biz/sell");// HDFS的目录 109 path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // 输出目录 110 return path; 111 } 112 113 public static void main(String[] args) throws Exception { 114 run(path()); 115 } 116 117 }
其他金额计算:
1 package zkqueue; 2 3 import java.io.BufferedReader; 4 import java.io.File; 5 import java.io.FileReader; 6 import java.io.IOException; 7 import java.util.regex.Pattern; 8 9 public class Other { 10 11 public static String file = "/logfile/biz/other.csv"; 12 public static final Pattern DELIMITER = Pattern.compile("[ ,]"); 13 private static String month = "2017-01"; 14 15 public static void main(String[] args) throws IOException { 16 calcOther(file); 17 } 18 19 public static int calcOther(String file) throws IOException { 20 int money = 0; 21 BufferedReader br = new BufferedReader(new FileReader(new File(file))); 22 23 String s = null; 24 while ((s = br.readLine()) != null) { 25 String[] tokens = DELIMITER.split(s); 26 if (tokens[0].startsWith(month)) {// 1月的数据 27 money += Integer.parseInt(tokens[1]); 28 } 29 } 30 br.close(); 31 System.out.println("Output:" + month + "," + money); 32 return money; 33 } 34 }
计算利润:
1 package zkqueue; 2 3 import java.io.IOException; 4 5 6 /** 7 * 利润计算 8 * @author Jon_China 9 * 10 */ 11 public class Profit { 12 13 public static void main(String[] args) throws Exception { 14 profit(); 15 } 16 17 public static void profit() throws Exception { 18 int sell = getSell(); 19 int purchase = getPurchase(); 20 int other = getOther(); 21 int profit = sell - purchase - other; 22 System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d ", sell, purchase, other, profit); 23 } 24 25 public static int getPurchase() throws Exception { 26 HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config()); 27 return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim()); 28 } 29 30 public static int getSell() throws Exception { 31 HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config()); 32 return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim()); 33 } 34 35 public static int getOther() throws IOException { 36 return Other.calcOther(Other.file); 37 } 38 39 }
zookeeper任务调度:
1 package zkqueue; 2 3 import java.io.IOException; 4 import java.util.List; 5 6 import org.apache.zookeeper.CreateMode; 7 import org.apache.zookeeper.KeeperException; 8 import org.apache.zookeeper.WatchedEvent; 9 import org.apache.zookeeper.Watcher; 10 import org.apache.zookeeper.ZooDefs.Ids; 11 import org.apache.zookeeper.ZooKeeper; 12 /** 13 * 分布式队列zookeeper调度 14 * @author Jon_China 15 * 16 */ 17 public class QueueZookeeper { 18 //设置队列目录树 19 final public static String QUEUE = "/queue"; 20 final public static String PROFIT = "/queue/profit"; 21 final public static String PURCHASE = "/queue/purchase"; 22 final public static String SELL = "/queue/sell"; 23 final public static String OTHER = "/queue/other"; 24 25 public static void main(String[] args) throws Exception { 26 if (args.length == 0) { 27 System.out.println("Please start a task:"); 28 } else { 29 doAction(Integer.parseInt(args[0])); 30 } 31 } 32 public static void doAction(int client) throws Exception { 33 //zookeeper地址 34 String host1 = "192.168.8.104:2181"; 35 String host2 = "192.168.8.105:2181"; 36 String host3 = "192.168.8.106:2181"; 37 38 ZooKeeper zk = null; 39 switch (client) {//1,2,3分别将不同任务加入队列 40 case 1: 41 zk = connection(host1); 42 initQueue(zk); 43 doPurchase(zk); 44 break; 45 case 2: 46 zk = connection(host2); 47 initQueue(zk); 48 doSell(zk); 49 break; 50 case 3: 51 zk = connection(host3); 52 initQueue(zk); 53 doOther(zk); 54 break; 55 } 56 } 57 58 // 创建一个与服务器的连接 59 public static ZooKeeper connection(String host) throws IOException { 60 ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() { 61 // 监控所有被触发的事件 62 public void process(WatchedEvent event) { 63 if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) { 64 System.out.println("Queue has Completed!!!"); 65 } 66 } 67 }); 68 return zk; 69 } 70 /** 71 * 初始化队列 72 * @param zk 73 * @throws KeeperException 74 * @throws InterruptedException 75 */ 76 public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException { 77 System.out.println("WATCH => " + PROFIT); 78 zk.exists(PROFIT, true); 79 80 if (zk.exists(QUEUE, false) == null) { 81 System.out.println("create " + QUEUE); 82 zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 83 } else { 84 System.out.println(QUEUE + " is exist!"); 85 } 86 } 87 /** 88 * 采购任务 89 * @param zk 90 * @throws Exception 91 */ 92 public static void doPurchase(ZooKeeper zk) throws Exception { 93 if (zk.exists(PURCHASE, false) == null) { 94 95 Purchase.run(Purchase.path()); 96 97 System.out.println("create " + PURCHASE); 98 zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 99 } else { 100 System.out.println(PURCHASE + " is exist!"); 101 } 102 isCompleted(zk); 103 } 104 /** 105 * 销售任务 106 * @param zk 107 * @throws Exception 108 */ 109 public static void doSell(ZooKeeper zk) throws Exception { 110 if (zk.exists(SELL, false) == null) { 111 112 Sell.run(Sell.path()); 113 114 System.out.println("create " + SELL); 115 zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 116 } else { 117 System.out.println(SELL + " is exist!"); 118 } 119 isCompleted(zk); 120 } 121 /** 122 * 其他计算任务 123 * @param zk 124 * @throws Exception 125 */ 126 public static void doOther(ZooKeeper zk) throws Exception { 127 if (zk.exists(OTHER, false) == null) { 128 129 Other.calcOther(Other.file); 130 131 System.out.println("create " + OTHER); 132 zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 133 } else { 134 System.out.println(OTHER + " is exist!"); 135 } 136 isCompleted(zk); 137 } 138 /** 139 * 检测完成情况 140 * @param zk 141 * @throws Exception 142 */ 143 public static void isCompleted(ZooKeeper zk) throws Exception { 144 int size = 3; 145 List<String> children = zk.getChildren(QUEUE, true); 146 int length = children.size(); 147 148 System.out.println("Queue Complete:" + length + "/" + size); 149 if (length >= size) { 150 System.out.println("create " + PROFIT); 151 Profit.profit(); 152 zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 153 154 for (String child : children) {// 清空节点 155 zk.delete(QUEUE + "/" + child, -1); 156 } 157 158 } 159 } 160 }
四、运行结果
在最后一步,统计其他费用数据程序运行后,从日志中看到3个条件节点都已满足要求 。然后,通过同步的分步式队列自动启动了计算利润的程序,幵在日志中打印了2017 年1月的利润为-6693765。
示例代码地址:https://github.com/LJunChina/hadoop/tree/master/distributed_mq