数据源类型:数组列表
[{field:value}, {field:value}, {field:value}, {field:value}]
1. 定义http数据源链接
package com.etl.datalink; import java.util.Map; public class LinkHttp { private String url; private Map<String,Object> params; public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public Map<String, Object> getParams() { return params; } public void setParams(Map<String, Object> params) { this.params = params; } }
2. 定义hdfs链接配置
package com.etl.datalink; import org.apache.hadoop.conf.Configuration; public class LinkHdfs { private Configuration conf = new Configuration(); private String fsName="fs.defaultFS"; private String fsURI; public LinkHdfs(String fsName, String fsURI) { this.fsName = fsName; this.fsURI = fsURI; conf.set(this.fsName, this.fsURI); } public LinkHdfs(String fsURI) { this.fsURI = fsURI; conf.set(this.fsName, this.fsURI); } public String getFsName() { return fsName; } public void setFsName(String fsName) { this.fsName = fsName; } public String getFsURI() { return fsURI; } public void setFsURI(String fsURI) { this.fsURI = fsURI; } public Configuration getConf() { return conf; } public void setConf(Configuration conf) { this.conf = conf; } }
3. 定义泛型类用于传送http的内容到hdfs
这里存在一点小问题:由于json是数组列表,所以需要获取每条记录,然后加入换行符号 写入hdfs。这样在hive中查询才能获取到多个记录。否则会全部当作一条记录。
/** * 通用的http抽取数据到hdfs文件中 * @author KingWang * @date 2018-10-15 * @description */ public class Api2Hdfs{ private static Logger log = Logger.getLogger(Api2Hdfs.class); public static <T> void run(String[] args, Class<T> clazz) { //http String url = args[0]; String method = args[1]; String startTime = args[2]; String endTime = args[3]; //hdfs String fsName = args[4]; String fsURI = args[5]; String targetFilePath = args[6]; //http config Map<String,Object> params = new HashMap<String,Object>(); //....省略部分参数 params.put("timestamp", System.currentTimeMillis()/1000L); params.put("start_time", startTime); params.put("end_time", endTime); LinkHttp http = new LinkHttp(); http.setUrl(url); http.setParams(params); //hdfs config LinkHdfs hdfs = new LinkHdfs(fsName, fsURI); try { Api2Hdfs.process(http, hdfs, targetFilePath, clazz); } catch(Exception e) { e.printStackTrace(); } } private static <T> void process(LinkHttp http,LinkHdfs hdfs, String hdfsFile, Class<T> clazz) throws Exception{ if(null==http) { log.error("请求参数http未设置"); throw new Exception("请求参数http未设置"); } if(null==hdfs) { log.error("请求参数hdfs未设置"); throw new Exception("请求参数hdfs未设置"); } //创建http请求 String url = http.getUrl(); Map<String,Object> params = http.getParams(); OkHttpClient client = new OkHttpClient(); //添加参数 FormBody.Builder bodyParams=new FormBody.Builder(); if(params!=null && params.size() > 0) { Iterator<Map.Entry<String,Object>> it = params.entrySet().iterator(); while(it.hasNext()) { Map.Entry<String, Object> entry = it.next(); bodyParams.add(entry.getKey(), entry.getValue().toString()); } } final Request request = new Request.Builder().url(url).post(bodyParams.build()).build(); Call call = client.newCall(request); call.enqueue(new Callback() { //网络错误延迟处理 @Override public void onFailure(Call call, IOException e) { e.printStackTrace(); log.error(e.getMessage()); } @Override public void onResponse(Call call, Response response) throws IOException { FileSystem fs = null; try { Path dstPath = new Path(hdfsFile); fs = FileSystem.get(hdfs.getConf()); DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); if(response.isSuccessful()) { //对后台返回的数据进行处理 System.out.println(df.format(LocalDateTime.now()) +" response.code:" +response.code()); if (200 == response.code()) { //注意:response.body().string()只能有效调用一次 ResponseInfo info = JSONObject.parseObject(response.body().string(), ResponseInfo.class); //error不为空,则错误 if(StringUtils.isNotBlank(info.getError())) { log.error(info.getError()); } else { String rspcode = info.getResult().getRsp(); //写入hdfs if(rspcode.equalsIgnoreCase(ResultCode.SUCCESS.getCode())) { System.out.println(info.getResult().getData()); if(info.getResult().getData().equals("[]")) { System.out.println(df.format(LocalDateTime.now()) + " " + info.getResult().getMsg()); } else { List<T> objList = JSON.parseArray(info.getResult().getData(),clazz); // byte[] bt = info.getResult().getData().getBytes(); FSDataOutputStream outputStream = fs.create(dstPath); int size = objList.size(); for(int i=0;i<size; i++) { String orderstr = JSON.toJSONString(objList.get(i)) + ' '; System.out.println(orderstr); outputStream.write(orderstr.getBytes()); if(i % 1000==0) { outputStream.flush(); } } outputStream.flush(); outputStream.close(); log.info("create file " + hdfsFile + " success!"); } } else { log.error(info.getResult().getMsg()); } } } //对后台返回200~300之间的错误进行处理 else { log.error(response.message()); } //fs.close(); } }catch (Exception e){ e.printStackTrace(); log.error(e.getMessage()); }finally { fs.close(); //关闭 if(response.body()!=null) { response.body().close(); } } log.info("write hdfs file end: " + hdfsFile); } }); } }
4. 定义bean用于解析, 由于定义了泛型,可以针对不同到接口定义不同的bean。
类似如下
5. 定义执行的每个接口主类:
public class MemberApi extends Api2Hdfs{ public static void main(String[] args) { Api2Hdfs.run(args, Member.class); } }
public class OrderApi extends Api2Hdfs{ public static void main(String[] args) { Api2Hdfs.run(args, Order.class); } }
6. 定义每个接口的shell脚本,执行即可。
java -Djava.ext.dirs=lib com.etl.MemberApi ${url} ${method} ${startDate} ${endDate} ${fsName} ${fsURI} ${targetFilePath} ${salt} >> ./logs/${table}.log 2>&1 &
java -Djava.ext.dirs=lib com.etl.OrderApi ${url} ${method} ${startDate} ${endDate} ${fsName} ${fsURI} ${targetFilePath} ${salt} >> ./logs/${table}.log 2>&1 &