    public abstract class InputFormat<K, V> {
      public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; // 获取Map阶段的数据分片集合信息
      public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 创建具体的数据读取对象



    public abstract class InputSplit {  
      public abstract long getLength() throws IOException, InterruptedException; // 获取当前分片的长度大小  
      public abstract String[] getLocations() throws IOException, InterruptedException; // 获取当前分片的位置信息  



    public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
      public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 初始化,如果在构造函数中初始化了,那么该方法可以为空
      public abstract boolean nextKeyValue() throws IOException, InterruptedException; //是否存在下一个key/value,如果存在返回true。否则返回false。
      public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;  // 获取当然key
      public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;  // 获取当然value
      public abstract float getProgress() throws IOException, InterruptedException;  // 获取进度信息
      public abstract void close() throws IOException; // 关闭资源



    public abstract class OutputFormat<K, V> { 
      public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取具体的数据写出对象
      public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException; // 检查输出配置信息是否正确
      public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取输出job的提交者对象



    public abstract class RecordWriter<K, V> {  
      public abstract void write(K key, V value) throws IOException, InterruptedException;  // 具体的写数据的方法
      public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException; // 关闭资源



      1 package com.gerry.mongo.hadoop2x.mr.mongodb.lib;
      3 import java.io.DataInput;
      4 import java.io.DataOutput;
      5 import java.io.IOException;
      6 import java.util.ArrayList;
      7 import java.util.List;
      8 import java.util.Map;
     10 import org.apache.hadoop.conf.Configurable;
     11 import org.apache.hadoop.conf.Configuration;
     12 import org.apache.hadoop.io.LongWritable;
     13 import org.apache.hadoop.io.Writable;
     14 import org.apache.hadoop.mapreduce.InputFormat;
     15 import org.apache.hadoop.mapreduce.InputSplit;
     16 import org.apache.hadoop.mapreduce.JobContext;
     17 import org.apache.hadoop.mapreduce.MRJobConfig;
     18 import org.apache.hadoop.mapreduce.RecordReader;
     19 import org.apache.hadoop.mapreduce.TaskAttemptContext;
     20 import org.apache.log4j.Logger;
     22 import com.mongodb.BasicDBObject;
     23 import com.mongodb.BasicDBObjectBuilder;
     24 import com.mongodb.DB;
     25 import com.mongodb.DBCollection;
     26 import com.mongodb.DBObject;
     27 import com.mongodb.Mongo;
     28 import com.mongodb.MongoException;
     30 public class MongoDBInputFormat<T extends MongoDBWritable> extends InputFormat<LongWritable, T> implements Configurable {
     31     private static final Logger LOG = Logger.getLogger(MongoDBInputFormat.class);
     33     /**
     34      * 空的对象,主要作用是不进行任何操作,类似于NullWritable
     35      */
     36     public static class NullMongoDBWritable implements MongoDBWritable, Writable {
     37         @Override
     38         public void write(DBCollection collection) throws MongoException {
     39             // TODO Auto-generated method stub
     40         }
     42         @Override
     43         public void readFields(DBObject object) throws MongoException {
     44             // TODO Auto-generated method stub
     45         }
     47         @Override
     48         public void write(DataOutput out) throws IOException {
     49             // TODO Auto-generated method stub
     50         }
     52         @Override
     53         public void readFields(DataInput in) throws IOException {
     54             // TODO Auto-generated method stub
     55         }
     57         @Override
     58         public DBObject fetchWriteDBObject(DBObject old) throws MongoException {
     59             // TODO Auto-generated method stub
     60             return old;
     61         }
     63     }
     65     /**
     66      * MongoDB的input split类
     67      */
     68     public static class MongoDBInputSplit extends InputSplit implements Writable {
     69         private long end = 0;
     70         private long start = 0;
     72         /**
     73          * 默认构造方法
     74          */
     75         public MongoDBInputSplit() {
     76         }
     78         /**
     79          * 便利的构造方法
     80          * 
     81          * @param start
     82          *            集合中查询的文档开始行号
     83          * @param end
     84          *            集合中查询的文档结束行号
     85          */
     86         public MongoDBInputSplit(long start, long end) {
     87             this.start = start;
     88             this.end = end;
     89         }
     91         public long getEnd() {
     92             return end;
     93         }
     95         public long getStart() {
     96             return start;
     97         }
     99         @Override
    100         public void write(DataOutput out) throws IOException {
    101             out.writeLong(this.start);
    102             out.writeLong(this.end);
    103         }
    105         @Override
    106         public void readFields(DataInput in) throws IOException {
    107             this.start = in.readLong();
    108             this.end = in.readLong();
    109         }
    111         @Override
    112         public long getLength() throws IOException, InterruptedException {
    113             // 分片大小
    114             return this.end - this.start;
    115         }
    117         @Override
    118         public String[] getLocations() throws IOException, InterruptedException {
    119             // TODO 返回一个空的数组,表示不进行数据本地化的优化,那么map执行节点随机选择。
    120             return new String[] {};
    121         }
    123     }
    125     protected MongoDBConfiguration mongoConfiguration; // mongo相关配置信息
    126     protected Mongo mongo; // mongo连接
    127     protected String databaseName; // 连接的数据库名称
    128     protected String collectionName; // 连接的集合名称
    129     protected DBObject conditionQuery; // 选择条件
    130     protected DBObject fieldQuery; // 需要的字段条件
    132     @Override
    133     public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
    134         DBCollection dbCollection = null;
    135         try {
    136             dbCollection = this.getDBCollection();
    137             // 获取数量大小
    138             long count = dbCollection.count(this.getConditionQuery());
    139             int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
    140             long chunkSize = (count / chunks); // 分片数量
    142             // 开始分片,只是简单的分配每个分片的数据量
    143             List<InputSplit> splits = new ArrayList<InputSplit>();
    144             for (int i = 0; i < chunks; i++) {
    145                 MongoDBInputSplit split = null;
    146                 if ((i + 1) == chunks) {
    147                     split = new MongoDBInputSplit(i * chunkSize, count);
    148                 } else {
    149                     split = new MongoDBInputSplit(i * chunkSize, (i * chunkSize) + chunkSize);
    150                 }
    151                 splits.add(split);
    152             }
    153             return splits;
    154         } catch (Exception e) {
    155             throw new IOException(e);
    156         } finally {
    157             dbCollection = null;
    158             closeConnection(); // 关闭资源的连接
    159         }
    160     }
    162     @Override
    163     public RecordReader<LongWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    164         return createRecordReader((MongoDBInputSplit) split, context.getConfiguration());
    165     }
    167     protected RecordReader<LongWritable, T> createRecordReader(MongoDBInputSplit split, Configuration conf) {
    168         // 获取从mongodb中读取数据需要转换成的value class,默认为NullMongoDBWritable
    169         Class<? extends MongoDBWritable> valueClass = this.mongoConfiguration.getValueClass();
    170         return new MongoDBRecordReader<T>(split, valueClass, conf, getDBCollection(), getConditionQuery(), getFieldQuery());
    171     }
    173     @Override
    174     public void setConf(Configuration conf) {
    175         mongoConfiguration = new MongoDBConfiguration(conf);
    176         databaseName = this.mongoConfiguration.getInputDatabaseName(); // 输入数据的数据库
    177         collectionName = this.mongoConfiguration.getInputCollectionName(); // 输入数据的集合
    178         getMongo(); // 初始化
    179         getConditionQuery(); // 初始化
    180         getFieldQuery(); // 初始化
    181     }
    183     @Override
    184     public Configuration getConf() {
    185         return this.mongoConfiguration.getConfiguration();
    186     }
    188     public Mongo getMongo() {
    189         try {
    190             if (null == this.mongo) {
    191                 this.mongo = this.mongoConfiguration.getMongoConnection();
    192             }
    193         } catch (Exception e) {
    194             throw new RuntimeException(e);
    195         }
    196         return mongo;
    197     }
    199     public DBObject getConditionQuery() {
    200         if (null == this.conditionQuery) {
    201             Map<String, String> conditions = this.mongoConfiguration.getInputConditions();
    202             BasicDBObjectBuilder builder = new BasicDBObjectBuilder();
    203             for (Map.Entry<String, String> entry : conditions.entrySet()) {
    204                 if (entry.getValue() != null) {
    205                     builder.append(entry.getKey(), entry.getValue());
    206                 } else {
    207                     builder.push(entry.getKey());
    208                 }
    209             }
    210             if (builder.isEmpty()) {
    211                 this.conditionQuery = new BasicDBObject();
    212             } else {
    213                 this.conditionQuery = builder.get();
    214             }
    215         }
    216         return this.conditionQuery;
    217     }
    219     public DBObject getFieldQuery() {
    220         if (fieldQuery == null) {
    221             String[] fields = this.mongoConfiguration.getInputFieldNames();
    222             if (fields != null && fields.length > 0) {
    223                 BasicDBObjectBuilder builder = new BasicDBObjectBuilder();
    224                 for (String field : fields) {
    225                     builder.push(field);
    226                 }
    227                 fieldQuery = builder.get();
    228             } else {
    229                 fieldQuery = new BasicDBObject();
    230             }
    231         }
    232         return fieldQuery;
    233     }
    235     protected DBCollection getDBCollection() {
    236         DB db = getMongo().getDB(this.databaseName);
    237         if (this.mongoConfiguration.isEnableAuth()) {
    238             String username = this.mongoConfiguration.getUsername();
    239             String password = this.mongoConfiguration.getPassword();
    240             if (!db.authenticate(username, password.toCharArray())) {
    241                 throw new RuntimeException("authenticate failure with the username:" + username + ",pwd:" + password);
    242             }
    243         }
    244         return db.getCollection(collectionName);
    245     }
    247     protected void closeConnection() {
    248         try {
    249             if (null != this.mongo) {
    250                 this.mongo.close();
    251                 this.mongo = null;
    252             }
    253         } catch (Exception e) {
    254             LOG.debug("Exception on close", e);
    255         }
    256     }
    257 }


    package com.gerry.mongo.hadoop2x.mr.mongodb.lib;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.util.ReflectionUtils;
    import com.mongodb.DBCollection;
    import com.mongodb.DBCursor;
    import com.mongodb.DBObject;
    public class MongoDBRecordReader<T extends MongoDBWritable> extends RecordReader<LongWritable, T> {
        private Class<? extends MongoDBWritable> valueClass;
        private LongWritable key;
        private T value;
        private long pos;
        private Configuration conf;
        private MongoDBInputFormat.MongoDBInputSplit split;
        private DBCollection collection;
        private DBObject conditionQuery;
        private DBObject fieldQuery;
        private DBCursor cursor;
        public MongoDBRecordReader(MongoDBInputFormat.MongoDBInputSplit split, Class<? extends MongoDBWritable> valueClass, Configuration conf, DBCollection collection, DBObject conditionQuery,
                DBObject fieldQuery) {
            this.split = split;
            this.valueClass = valueClass;
            this.collection = collection;
            this.conditionQuery = conditionQuery;
            this.fieldQuery = fieldQuery;
            this.conf = conf;
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            // do nothing
        public boolean nextKeyValue() throws IOException, InterruptedException {
            try {
                if (key == null) {
                    key = new LongWritable();
                if (value == null) {
                    value = (T) ReflectionUtils.newInstance(valueClass, conf);
                if (null == cursor) {
                    cursor = executeQuery();
                if (!cursor.hasNext()) {
                    return false;
                key.set(pos + split.getStart()); // 设置key
                value.readFields(cursor.next()); // 设置value
            } catch (Exception e) {
                throw new IOException("Exception in nextKeyValue", e);
            return true;
        protected DBCursor executeQuery() {
            try {
                return collection.find(conditionQuery, fieldQuery).skip((int) split.getStart()).limit((int) split.getLength());
            } catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
        public LongWritable getCurrentKey() throws IOException, InterruptedException {
            return this.key;
        public T getCurrentValue() throws IOException, InterruptedException {
            return this.value;
        public float getProgress() throws IOException, InterruptedException {
            return pos;
        public void close() throws IOException {
            if (collection != null) {


    package com.gerry.mongo.hadoop2x.mr.mongodb.lib;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.OutputCommitter;
    import org.apache.hadoop.mapreduce.OutputFormat;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.log4j.Logger;
    import com.mongodb.DB;
    import com.mongodb.DBCollection;
    import com.mongodb.DBObject;
    import com.mongodb.Mongo;
    public class MongoDBOutputFormat<K extends MongoDBWritable, V extends MongoDBWritable> extends OutputFormat<K, V> {
        private static Logger LOG = Logger.getLogger(MongoDBOutputFormat.class);
         * A RecordWriter that writes the reduce output to a MongoDB collection
         * @param <K>
         * @param <T>
        public static class MongoDBRecordWriter<K extends MongoDBWritable, V extends MongoDBWritable> extends RecordWriter<K, V> {
            private Mongo mongo;
            private String databaseName;
            private String collectionName;
            private MongoDBConfiguration dbConf;
            private DBCollection dbCollection;
            private DBObject dbObject;
            private boolean enableFetchMethod;
            public MongoDBRecordWriter(MongoDBConfiguration dbConf, Mongo mongo, String databaseName, String collectionName) {
                this.mongo = mongo;
                this.databaseName = databaseName;
                this.collectionName = collectionName;
                this.dbConf = dbConf;
                this.enableFetchMethod = this.dbConf.isEnableUseFetchMethod();
                getDbCollection();// 创建连接
            protected DBCollection getDbCollection() {
                if (null == this.dbCollection) {
                    DB db = this.mongo.getDB(this.databaseName);
                    if (this.dbConf.isEnableAuth()) {
                        String username = this.dbConf.getUsername();
                        String password = this.dbConf.getPassword();
                        if (!db.authenticate(username, password.toCharArray())) {
                            throw new RuntimeException("authenticate failure, the username:" + username + ", pwd:" + password);
                    this.dbCollection = db.getCollection(this.collectionName);
                return this.dbCollection;
            public void write(K key, V value) throws IOException, InterruptedException {
                if (this.enableFetchMethod) {
                    this.dbObject = key.fetchWriteDBObject(null);
                    this.dbObject = value.fetchWriteDBObject(this.dbObject);
                    // 写数据
                    this.dbCollection.insert(this.dbObject);// 在这里可以做一个缓存,一起提交,如果数据量大的情况下。
                    this.dbObject = null;
                } else {
                    // 直接调用写方法
            public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                if (this.mongo != null) {
                    this.dbCollection = null;
        public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
            try {
                MongoDBConfiguration dbConf = new MongoDBConfiguration(context.getConfiguration());
                String databaseName = dbConf.getOutputDatabaseName();
                String collectionName = dbConf.getOutputCollectionName();
                Mongo mongo = dbConf.getMongoConnection();
                return new MongoDBRecordWriter<K, V>(dbConf, mongo, databaseName, collectionName);
            } catch (Exception e) {
                LOG.error("Create the record writer occur exception.", e);
                throw new IOException(e);
        public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
            // 不进行检测
        public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
            // 由于outputcommitter主要作用是提交jar,分配jar的功能。所以我们这里直接使用FileOutputCommitter
            return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
         * 设置output属性
         * @param job
         * @param databaseName
         * @param collectionName
        public static void setOutput(Job job, String databaseName, String collectionName) {
            MongoDBConfiguration mdc = new MongoDBConfiguration(job.getConfiguration());
         * 静止使用fetch方法
         * @param conf
        public static void disableFetchMethod(Configuration conf) {
            conf.setBoolean(MongoDBConfiguration.OUTPUT_USE_FETCH_METHOD_PROPERTY, false);


    package com.gerry.mongo.hadoop2x.mr.mongodb.lib;
    import java.net.UnknownHostException;
    import java.util.HashMap;
    import java.util.Map;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.mapreduce.lib.db.DBWritable;
    import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat.NullMongoDBWritable;
    import com.mongodb.Mongo;
    import com.mongodb.ServerAddress;
    public class MongoDBConfiguration {
        public static final String BIND_HOST_PROPERTY = "mapreduce.mongo.host";
        public static final String BIND_PORT_PROPERTY = "mapreduce.mongo.port";
        public static final String AUTH_ENABLE_PROPERTY = "mapreduce.mongo.auth.enable";
        public static final String USERNAME_PROPERTY = "mapreduce.mongo.username";
        public static final String PASSWORD_PROPERTY = "mapreduce.mongo.password";
        public static final String PARTITION_PROPERTY = "mapreduce.mongo.partition";
        public static final String INPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.input.database.name";
        public static final String INPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.input.collection.name";
        public static final String INPUT_FIELD_NAMES_PROPERTY = "mapreduce.mongo.input.field.names";
        public static final String INPUT_CONDITIONS_PROPERTY = "mapreduce.mongo.input.conditions";
        public static final String INPUT_CLASS_PROPERTY = "mapreduce.mongo.input.class";
        public static final String OUTPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.output.database.name";
        public static final String OUTPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.output.collection.name";
        // 在recordwriter中到底是否调用fetch方法,默认调用。如果设置为不调用,那么就直接使用writer方法
        public static final String OUTPUT_USE_FETCH_METHOD_PROPERTY = "mapreduce.mongo.output.use.fetch.method";
        private Configuration conf;
        public MongoDBConfiguration(Configuration conf) {
            this.conf = conf;
         * 获取Configuration对象
         * @return
        public Configuration getConfiguration() {
            return this.conf;
         * 设置连接信息
         * @param host
         * @param port
         * @return
        public MongoDBConfiguration configureDB(String host, int port) {
            return this.configureDB(host, port, false, null, null);
         * 设置连接信息
         * @param host
         * @param port
         * @param enableAuth
         * @param username
         * @param password
         * @return
        public MongoDBConfiguration configureDB(String host, int port, boolean enableAuth, String username, String password) {
            this.conf.set(BIND_HOST_PROPERTY, host);
            this.conf.setInt(BIND_PORT_PROPERTY, port);
            if (enableAuth) {
                this.conf.setBoolean(AUTH_ENABLE_PROPERTY, true);
                this.conf.set(USERNAME_PROPERTY, username);
                this.conf.set(PASSWORD_PROPERTY, password);
            return this;
         * 获取MongoDB的连接对象Connection对象
         * @return
         * @throws UnknownHostException
        public Mongo getMongoConnection() throws UnknownHostException {
            return new Mongo(new ServerAddress(this.getBindHost(), this.getBindPort()));
         * 获取设置的host
         * @return
        public String getBindHost() {
            return this.conf.get(BIND_HOST_PROPERTY, "localhost");
         * 获取设置的port
         * @return
        public int getBindPort() {
            return this.conf.getInt(BIND_PORT_PROPERTY, 27017);
         * 获取是否开启安全验证,默认的Mongodb是不开启的。
         * @return
        public boolean isEnableAuth() {
            return this.conf.getBoolean(AUTH_ENABLE_PROPERTY, false);
         * 获取完全验证所需要的用户名
         * @return
        public String getUsername() {
            return this.conf.get(USERNAME_PROPERTY);
         * 获取安全验证所需要的密码
         * @return
        public String getPassword() {
            return this.conf.get(PASSWORD_PROPERTY);
        public String getPartition() {
            return conf.get(PARTITION_PROPERTY, "|");
        public MongoDBConfiguration setPartition(String partition) {
            conf.set(PARTITION_PROPERTY, partition);
            return this;
        public String getInputDatabaseName() {
            return conf.get(INPUT_DATABASE_NAME_PROPERTY, "test");
        public MongoDBConfiguration setInputDatabaseName(String databaseName) {
            conf.set(INPUT_DATABASE_NAME_PROPERTY, databaseName);
            return this;
        public String getInputCollectionName() {
            return conf.get(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "test");
        public void setInputCollectionName(String tableName) {
            conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, tableName);
        public String[] getInputFieldNames() {
            return conf.getStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
        public void setInputFieldNames(String... fieldNames) {
            conf.setStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
        public Map<String, String> getInputConditions() {
            Map<String, String> result = new HashMap<String, String>();
            String[] conditions = conf.getStrings(INPUT_CONDITIONS_PROPERTY);
            if (conditions != null && conditions.length > 0) {
                String partition = this.getPartition();
                String[] values = null;
                for (String condition : conditions) {
                    values = condition.split(partition);
                    if (values != null && values.length == 2) {
                        result.put(values[0], values[1]);
                    } else {
                        result.put(condition, null);
            return result;
        public void setInputConditions(Map<String, String> conditions) {
            if (conditions != null && conditions.size() > 0) {
                String[] values = new String[conditions.size()];
                String partition = this.getPartition();
                int k = 0;
                for (Map.Entry<String, String> entry : conditions.entrySet()) {
                    if (entry.getValue() != null) {
                        values[k++] = entry.getKey() + partition + entry.getValue();
                    } else {
                        values[k++] = entry.getKey();
                conf.setStrings(INPUT_CONDITIONS_PROPERTY, values);
        public Class<? extends MongoDBWritable> getValueClass() {
            return conf.getClass(INPUT_CLASS_PROPERTY, NullMongoDBWritable.class, MongoDBWritable.class);
        public void setInputClass(Class<? extends DBWritable> inputClass) {
            conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);
        public String getOutputDatabaseName() {
            return conf.get(OUTPUT_DATABASE_NAME_PROPERTY, "test");
        public MongoDBConfiguration setOutputDatabaseName(String databaseName) {
            conf.set(OUTPUT_DATABASE_NAME_PROPERTY, databaseName);
            return this;
        public String getOutputCollectionName() {
            return conf.get(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, "test");
        public void setOutputCollectionName(String tableName) {
            conf.set(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, tableName);
        public boolean isEnableUseFetchMethod() {
            return conf.getBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, true);
        public void setOutputUseFetchMethod(boolean useFetchMethod) {
            conf.setBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, useFetchMethod);
    package com.gerry.mongo.hadoop2x.mr.mongodb.lib;
    import com.mongodb.DBCollection;
    import com.mongodb.DBObject;
    import com.mongodb.MongoException;
    public interface MongoDBWritable {
         * 往mongodb的集合中写数据
         * @param collection
         * @throws MongoException
        public void write(DBCollection collection) throws MongoException;
         * 获取要写的mongoDB对象
         * @param old
         * @return
         * @throws MongoException
        public DBObject fetchWriteDBObject(DBObject old) throws MongoException;
         * 从mongodb的集合中读数据
         * @param collection
         * @throws MongoException
        public void readFields(DBObject object) throws MongoException;
    package com.gerry.mongo.hadoop2x.mr.mongodb.nw;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.util.Date;
    import java.util.HashSet;
    import java.util.Set;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBConfiguration;
    import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat;
    import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBOutputFormat;
    import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBWritable;
    import com.mongodb.BasicDBObject;
    import com.mongodb.BasicDBObjectBuilder;
    import com.mongodb.DBCollection;
    import com.mongodb.DBObject;
    import com.mongodb.MongoException;
    public class Demo {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            // 设置输入的mongodb的数据库和集合,以及对应的输入对象value,这里的数据库和集合要求存在,否则是没有数据的,当然没有数据不会出问题
            conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "users");
            conf.set(MongoDBConfiguration.INPUT_DATABASE_NAME_PROPERTY, "db_java");
            conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, DemoInputValueAndOutputKey.class, MongoDBWritable.class);
            Job job = Job.getInstance(conf, "mongodb-demo");
            MongoDBOutputFormat.setOutput(job, "foobar2", "users"); // 这个可以不存在
        public static class DemoOutputValue implements Writable, MongoDBWritable {
            private Date clientTime;
            private long count;
            public void write(DBCollection collection) throws MongoException {
                throw new UnsupportedOperationException();
            public DBObject fetchWriteDBObject(DBObject old) throws MongoException {
                BasicDBObjectBuilder builder = null;
                Set<String> keys = new HashSet<String>();
                if (old != null) {
                    keys = old.keySet();
                    builder = BasicDBObjectBuilder.start(old.toMap());
                } else {
                    builder = new BasicDBObjectBuilder();
                // 添加当前对象的value值,如果存在同样的key,那么加序号
                builder.append(getKey(keys, "time", 0), clientTime).append(getKey(keys, "count", 0), this.count);
                return builder.get();
            public void readFields(DBObject object) throws MongoException {
                throw new UnsupportedOperationException();
            public void write(DataOutput out) throws IOException {
            public void readFields(DataInput in) throws IOException {
                this.clientTime = new Date(in.readLong());
                this.count = in.readLong();
            public Date getClientTime() {
                return clientTime;
            public void setClientTime(Date clientTime) {
                this.clientTime = clientTime;
            public long getCount() {
                return count;
            public void setCount(long count) {
                this.count = count;
        public static class DemoInputValueAndOutputKey implements MongoDBWritable, WritableComparable<DemoInputValueAndOutputKey> {
            private String name;
            private Integer age;
            private String sex;
            public void write(DataOutput out) throws IOException {
                if (this.name == null) {
                } else {
                if (this.age == null) {
                } else {
                if (this.sex == null) {
                } else {
            public void readFields(DataInput in) throws IOException {
                this.name = in.readBoolean() ? in.readUTF() : null;
                this.age = in.readBoolean() ? Integer.valueOf(in.readInt()) : null;
                this.sex = in.readBoolean() ? in.readUTF() : null;
            public void write(DBCollection collection) throws MongoException {
                DBObject object = new BasicDBObject();
                object.put("name", this.name);
                object.put("age", this.age.intValue());
                object.put("sex", this.sex);
            public void readFields(DBObject object) throws MongoException {
                this.name = (String) object.get("name");
                this.age = (Integer) object.get("age");
                this.sex = (String) object.get("sex");
            public DBObject fetchWriteDBObject(DBObject old) throws MongoException {
                BasicDBObjectBuilder builder = null;
                Set<String> keys = new HashSet<String>();
                if (old != null) {
                    keys = old.keySet();
                    builder = BasicDBObjectBuilder.start(old.toMap());
                } else {
                    builder = new BasicDBObjectBuilder();
                // 添加当前对象的value值,如果存在同样的key,那么加序号
                if (this.name != null) {
                    builder.append(getKey(keys, "name", 0), this.name);
                if (this.age != null) {
                    builder.append(getKey(keys, "age", 0), this.age.intValue());
                if (this.sex != null) {
                    builder.append(getKey(keys, "sex", 0), this.sex);
                return builder.get();
            public String toString() {
                return "DemoInputValue [name=" + name + ", age=" + age + ", sex=" + sex + "]";
            public int compareTo(DemoInputValueAndOutputKey o) {
                int tmp;
                if (this.name == null) {
                    if (o.name != null) {
                        return -1;
                } else if (o.name == null) {
                    return 1;
                } else {
                    tmp = this.name.compareTo(o.name);
                    if (tmp != 0) {
                        return tmp;
                if (this.age == null) {
                    if (o.age != null) {
                        return -1;
                } else if (o.age == null) {
                    return 1;
                } else {
                    tmp = this.age - o.age;
                    if (tmp != 0) {
                        return tmp;
                if (this.sex == null) {
                    if (o.sex != null) {
                        return -1;
                } else if (o.sex == null) {
                    return 1;
                } else {
                    return this.sex.compareTo(o.sex);
                return 0;
         * 直接输出
         * @author jsliuming
        public static class DemoMapper extends Mapper<LongWritable, DemoInputValueAndOutputKey, DemoInputValueAndOutputKey, NullWritable> {
            protected void map(LongWritable key, DemoInputValueAndOutputKey value, Context context) throws IOException, InterruptedException {
                context.write(value, NullWritable.get());
         * 写出数据,只做一个统计操作
         * @author jsliuming
        public static class DemoReducer extends Reducer<DemoInputValueAndOutputKey, NullWritable, DemoInputValueAndOutputKey, DemoOutputValue> {
            private DemoOutputValue outputValue = new DemoOutputValue();
            protected void reduce(DemoInputValueAndOutputKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
                long sum = 0;
                for (@SuppressWarnings("unused")
                NullWritable value : values) {
                outputValue.setClientTime(new Date());
                context.write(key, outputValue);
         * 转换key,作用是当key存在keys集合中的时候,在key后面添加序号
         * @param keys
         * @param key
         * @param index
         * @return
        public static String getKey(Set<String> keys, String key, int index) {
            while (keys.contains(key)) {
                key = key + (index++);
            return key;


