zoukankan      html  css  js  c++  java
  • hadoop 分布式缓存

    Hadoop 分布式缓存实现目的是在所有的MapReduce调用一个统一的配置文件,首先将缓存文件放置在HDFS中,然后程序在执行的过程中会可以通过设定将文件下载到本地具体设定如下:

    public static void main(String[] arge) throws IOException, ClassNotFoundException, InterruptedException{
        
            Configuration conf=new Configuration();
            conf.set("fs.default.name", "hdfs://192.168.1.45:9000");
            FileSystem fs=FileSystem.get(conf);
            fs.delete(new Path("CASICJNJP/gongda/Test_gd20140104"));
            
            conf.set("mapred.job.tracker", "192.168.1.45:9001");
            conf.set("mapred.jar", "/home/hadoop/workspace/jar/OBDDataSelectWithImeiTxt.jar");
            Job job=new Job(conf,"myTaxiAnalyze");
            
            
            DistributedCache.createSymlink(job.getConfiguration());//
            try {
                DistributedCache.addCacheFile(new URI("/user/hadoop/CASICJNJP/DistributeFiles/imei.txt"), job.getConfiguration());
            } catch (URISyntaxException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }            
            job.setMapperClass(OBDDataSelectMaper.class);
            job.setReducerClass(OBDDataSelectReducer.class);
            //job.setNumReduceTasks(10);
            //job.setCombinerClass(IntSumReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            FileInputFormat.addInputPath(job, new Path("/user/hadoop/CASICJNJP/SortedData/20140104"));
            FileOutputFormat.setOutputPath(job, new Path("CASICJNJP/gongda/SelectedData"));
            
            System.exit(job.waitForCompletion(true)?0:1);
            
        }

        代码中标红的为将HDFS中的/user/hadoop/CASICJNJP/DistributeFiles/imei.txt作为分布式缓存

    public class OBDDataSelectMaper extends Mapper<Object, Text, Text, Text> {
        String[] strs;
        String[] ImeiTimes;
        String timei;
        String time;
        private java.util.List<Integer> ImeiList = new java.util.ArrayList<Integer>();

        protected void setup(Context context) throws IOException,
                InterruptedException {

          
     try {
                Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context
                        .getConfiguration());
                if (cacheFiles != null && cacheFiles.length > 0) {
                    String line;
                    BufferedReader br = new BufferedReader(new FileReader(
                            cacheFiles[0].toString()));
                    try {
                        line = br.readLine();
                        while ((line = br.readLine()) != null) {
                            ImeiList.add(Integer.parseInt(line));
                        }
                    } finally {
                        br.close();
                    }
                }
            } catch (IOException e) {
                System.err.println("Exception reading DistributedCache: " + e);
            }
        }

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {

            try {
                strs = value.toString().split(" ");
                ImeiTimes = strs[0].split("_");
                timei = ImeiTimes[0];
                if (ImeiList.contains(Integer.parseInt(timei))) {
                    context.write(new Text(strs[0]), value);
                }
            } catch (Exception ex) {

            }
        }
    }

    上述标红代码中在Map的setup函数中加载分布式缓存。

  • 相关阅读:
    利用DTrace实时检测MySQl
    改进MySQL Order By Rand()的低效率
    RDS for MySQL查询缓存 (Query Cache) 的设置和使用
    RDS For MySQL 字符集相关说明
    RDS for MySQL 通过 mysqlbinlog 查看 binlog 乱码
    RDS for MySQL Mysqldump 常见问题和处理
    RDS for MySQL Online DDL 使用
    RDS MySQL 表上 Metadata lock 的产生和处理
    RDS for MySQL 如何使用 Percona Toolkit
    北京已成为投融资诈骗重灾区:存好骗子公司黑名单,谨防上当!
  • 原文地址:https://www.cnblogs.com/oftenlin/p/3592005.html
Copyright © 2011-2022 走看看