1, 版本匹配:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/requirements.html
2, maven集成:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html
3, pom文件配置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>xiaoniubigdata</artifactId> <groupId>com.wenbronk</groupId> <version>1.0</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>hadoop03-es</artifactId> <properties> <hadoop.version>2.7.2</hadoop.version> <hive.version>1.2.1</hive.version> <eslasticsearch.version>6.3.2</eslasticsearch.version> </properties> <dependencies> <!--<dependency>--> <!--<groupId>org.elasticsearch</groupId>--> <!--<artifactId>elasticsearch-hadoop</artifactId>--> <!--<version>6.3.2</version>--> <!--</dependency>--> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop-mr</artifactId> <version>${eslasticsearch.version}</version> </dependency> <!--<dependency>--> <!--<groupId>org.elasticsearch</groupId>--> <!--<artifactId>elasticsearch-hadoop-hive</artifactId>--> <!--<version>${eslasticsearch.version}</version>--> <!--</dependency>--> <!-- hadoop --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-annotations</artifactId> <version>${hadoop.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-archives</artifactId> <version>${hadoop.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hive.hcatalog</groupId> <artifactId>hive-hcatalog-core</artifactId> <version>${hive.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-serde</artifactId> <version>${hive.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>24.1-jre</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> <configuration> <skip>true</skip> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <!--<minimizeJar>true</minimizeJar>--> <createDependencyReducedPom>false</createDependencyReducedPom> <relocations> <relocation> <pattern>com.google.common</pattern> <shadedPattern>shadowing.com.google.common</shadedPattern> </relocation> <relocation> <pattern>io.netty</pattern> <shadedPattern>shadowing.io.netty</shadedPattern> </relocation> </relocations> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
4, 从es中读取数据:
mainjob:
package com.wenbronk.mr.es.read; import com.wenbronk.mr.es.rw.ESRWJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.elasticsearch.hadoop.mr.EsInputFormat; public class ESReadJob extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { GenericOptionsParser parser = new GenericOptionsParser(strings); if (parser.getCommandLine() == null) { throw new RuntimeException("args can not be null"); } Job job = Job.getInstance(getConf()); job.setJobName("es-hadoop-read"); job.setJarByClass(ESRWJob.class); job.setInputFormatClass(EsInputFormat.class); job.setMapperClass(ESReadMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MapWritable.class); job.setReducerClass(ESReaderReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); Path outPath = new Path("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/hadoop03-es/target/out"); FileSystem fileSystem = FileSystem.get(getConf()); if (fileSystem.exists(outPath)) { fileSystem.delete(outPath, true); } FileOutputFormat.setOutputPath(job, outPath); return job.waitForCompletion(true) ? 0 : -1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); conf.set("es.nodes", "10.124.147.22:9200"); // conf.set("es.resource", "macsearch_fileds/mac"); conf.set("es.resource.read", "macsearch_fileds/mac"); conf.set("es.resource.write", "sink/group"); conf.set("es.query", "?q=me*"); // 设置读入格式为 json, map 的 inputvalue 为 text // conf.set("es.output.json", "true"); conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); int result = ToolRunner.run(conf, new ESReadJob(), args); System.exit(result); } }
2, mapper
package com.wenbronk.mr.es.read; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ESReadMapper extends Mapper<Text, MapWritable, Text, MapWritable> { private Text text; private NullWritable nullWritable; @Override protected void setup(Context context) throws IOException, InterruptedException { this.text = new Text(); this.nullWritable = NullWritable.get(); } // docId, source @Override protected void map(Text key, MapWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } }
reducer
package com.wenbronk.mr.es.read; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.gson.Gson; import org.apache.commons.lang.ObjectUtils; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class ESReaderReducer extends Reducer<Text, MapWritable, NullWritable, Text> { private Text text; private NullWritable nullWritable; @Override protected void setup(Context context) throws IOException, InterruptedException { this.text = new Text(); this.nullWritable = NullWritable.get(); } @Override protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException { for (MapWritable value : values) { JSONObject jsonObject = new JSONObject(); value.entrySet().forEach(entry -> { jsonObject.put(String.valueOf(entry.getKey()), entry.getValue()); }); text.set(jsonObject.toString()); context.write(nullWritable, text); } } }
如果需要更改json格式, 可见:
git@gitlab.com:wenbronk/xiaoniubigdata.git
5, 写入es中
json格式写入,
mainjob
package com.wenbronk.mr.es.writeJson; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.elasticsearch.hadoop.mr.EsOutputFormat; public class ESWriteJobWriteJson extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { GenericOptionsParser parser = new GenericOptionsParser(strings); if (parser.getCommandLine() == null) { throw new RuntimeException("args can not be null"); } Job job = Job.getInstance(getConf()); job.setJobName("es-hadoop-write"); job.setJarByClass(ESWriteJobWriteJson.class); job.setMapperClass(ESWriterMapperJson.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(ESWriterReducerJson.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(EsOutputFormat.class); Path inPath = new Path("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/hadoop03-es/target/out/part-r-00000"); FileInputFormat.setInputPaths(job, inPath); return job.waitForCompletion(true) ? 0 : -1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); conf.set("es.nodes", "10.124.147.22:9200"); // conf.set("es.resource", "macsearch_fileds/mac"); conf.set("es.resource.read", "macsearch_fileds/mac"); conf.set("es.resource.write", "sink/group"); conf.set("es.query", "?q=me*"); // 设置读入格式为 json, map 的 inputvalue 为 text conf.set("es.input.json", "true"); int result = ToolRunner.run(conf, new ESWriteJobWriteJson(), args); System.exit(result); } }
mapper
package com.wenbronk.mr.es.writeJson; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ESWriterMapperJson extends Mapper<LongWritable, Text, NullWritable, Text> { private NullWritable nullWritable; @Override protected void setup(Context context) throws IOException, InterruptedException { this.nullWritable = NullWritable.get(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(nullWritable, value); } }
reducer
package com.wenbronk.mr.es.writeJson; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Map; public class ESWriterReducerJson extends Reducer<NullWritable, Text, NullWritable, Text> { private NullWritable nullWritable; // private BytesWritable bytesWritable; @Override protected void setup(Context context) throws IOException, InterruptedException { this.nullWritable = NullWritable.get(); // this.bytesWritable = new BytesWritable(); } @Override protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { JSONObject jsonObject = new JSONObject(); for (Text value : values) { context.write(nullWritable, value); } } }
6 从一个index读取写入另一个index
mainjob
package com.wenbronk.mr.es.rw; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.elasticsearch.hadoop.mr.EsInputFormat; import org.elasticsearch.hadoop.mr.EsOutputFormat; public class ESRWJob extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { GenericOptionsParser parser = new GenericOptionsParser(strings); if (parser.getCommandLine() == null) { throw new RuntimeException("args can not be null"); } Job job = Job.getInstance(getConf()); job.setJobName("es-hadoop-write"); job.setJarByClass(ESRWJob.class); job.setInputFormatClass(EsInputFormat.class); job.setMapperClass(EsRWMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MapWritable.class); job.setReducerClass(EsRWReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(MapWritable.class); job.setOutputFormatClass(EsOutputFormat.class); job.setNumReduceTasks(3); return job.waitForCompletion(true) ? 0 : -1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // conf.setSpeculativeExecution(false); // 设置用户目录优先 conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); // 设置es, 给定集群中一个节点的名字即可 conf.set("es.nodes", "10.124.147.22:9200"); // index/type, 可设置 es。resource.wirte/read, 设置为只读或者只写 // conf.set("es.resource", "macsearch_fileds/mac"); conf.set("es.resource.read", "macsearch_fileds/mac"); conf.set("es.resource.write", "sink/group"); conf.set("es.query", "?q=me*"); conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); int result = ToolRunner.run(conf, new ESRWJob(), args); System.exit(result); } }
mapper
package com.wenbronk.mr.es.rw; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class EsRWMapper extends Mapper<Text, MapWritable, Text, MapWritable> { @Override protected void setup(Context context) throws IOException, InterruptedException { } @Override protected void map(Text key, MapWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } }
reducer
package com.wenbronk.mr.es.rw; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class EsRWReducer extends Reducer<Text, MapWritable, NullWritable, MapWritable> { private NullWritable nullWritable; @Override protected void setup(Context context) throws IOException, InterruptedException { this.nullWritable = NullWritable.get(); } @Override protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException { for (MapWritable value : values) { context.write(nullWritable, value); } } }