package com.laiwang.algo.antispam.event.job;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Created by gray on 14-8-3.
*/
public class GetVersion extends Configured implements Tool {
public static class GetVersionMap extends Mapper<Object, Text, Text, Text> {
@Override
public void map(Object key, Text value, Context context)
throws IOException ,InterruptedException {
List<String> requests = new ArrayList<String>();
String[] parts = value.toString().split("1",-1);
String url = parts[2] + parts[6];
if(!url.contains("/v2/"))
return;
String[] tmps = url.split("\?",-1);
if(tmps.length == 1)
return;
int len = tmps[1].length();
String temp = "";
boolean falg = true;
for(int i = 0; i < len; i++) {
if(tmps[1].charAt(i) == '=') {
requests.add(temp);
temp = "";
falg = false;
}
if(falg)
temp += tmps[1].charAt(i);
if(tmps[1].charAt(i) == '&') {
falg = true;
}
}
int state = 0;
for(int i = 0; i < requests.size(); i++) {
if(requests.get(i).equals("_s_"))
state ^= 1;
if(requests.get(i).equals("_v_"))
state ^= 2;
if(requests.get(i).equals("_c_"))
state ^= 4;
if(requests.get(i).equals("_t_"))
state ^= 8;
}
if(state == 15) {
int index = parts[16].indexOf("(");
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, NullWritable> {
@Override
public void reduce(Text key,Iterable<Text> values, Context context)
throws IOException, InterruptedException {
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job();
job.setJobName("GetVersion");
job.setJarByClass(GetVersion.class);
FileInputFormat.addInputPath(job, new Path(conf.get("")));
FileOutputFormat.setOutputPath(job, new Path(conf.get("")));
job.setMapperClass(GetVersionMap.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new AggregateUidBySession(), args);
}
}
import java.io.*;
import java.util.Random;
/**
* Created by gray on 14-8-4.
*/
public class ReadFiles {
public static void main(String[] args) throws IOException {
String in = "G:\homeG\data\readfile\in.txt";
String out = "G:\homeG\data\readfile\out.txt";
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File(in)), "UTF-8"));
BufferedWriter pr = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(out)),"UTF-8"));
String line;
int time = 7300000;
int end = 7302400;
while((line = br.readLine()) != null) {
String[] parts = line.split(" ",-1);
int now = Integer.parseInt(parts[0]);
while (now > time) {
pr.append(time + " 0
");
if(time % 100 == 59) {
time -= 59;
time += 100;
}
else {
time ++;
}
}
if(now == time) {
if(time % 100 == 59) {
time -= 59;
time += 100;
}
else {
time ++;
}
pr.append(line + "
");
}
}
while(end > time) {
pr.append(time + " 0
");
if(time % 100 == 59) {
time -= 59;
time += 100;
}
else {
time ++;
}
}
br.close();
pr.close();
}
}
@Override
public void reduce(Text key,Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for(Text value : values) {
sum += 1;
context.write(key,value);
}
context.write(new Text(key),new Text(String.valueOf(sum)));
}
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.conf.Configured; 3 import org.apache.hadoop.fs.FSDataInputStream; 4 import org.apache.hadoop.fs.FileStatus; 5 import org.apache.hadoop.fs.FileSystem; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 15 import org.apache.hadoop.util.Tool; 16 import org.apache.hadoop.util.ToolRunner; 17 18 import java.io.BufferedReader; 19 import java.io.IOException; 20 import java.io.InputStreamReader; 21 import java.util.HashSet; 22 import java.util.Set; 23 24 /** 25 * Created by gray on 14-8-5. 26 */ 27 public class UnnormalUid extends Configured implements Tool { 28 29 public static class UnnormalUidMap extends Mapper<Object, Text, Text, Text> { 30 31 private Set<String> uids = new HashSet<String>(); 32 33 @Override 34 public void setup(Context context) { 35 try { 36 Configuration conf = context.getConfiguration(); 37 FileSystem fs = FileSystem.get(conf); 38 Path path = new Path(conf.get("")); 39 FileStatus[] status = fs.listStatus(path); 40 for (FileStatus file : status) { 41 Path filePath = new Path(path.toString() 42 + "//" + file.getPath().getName()); 43 FSDataInputStream fin = fs.open(filePath); 44 BufferedReader br = new BufferedReader( 45 new InputStreamReader(fin, "utf-8")); 46 String line = br.readLine(); 47 while (line != null) { 48 String[] tmps = line.split(" ",-1); 49 uids.add(tmps[0]); 50 line = br.readLine(); 51 } 52 br.close(); 53 fin.close(); 54 } 55 } catch (IOException e) { 56 e.printStackTrace(); 57 } 58 } 59 60 @Override 61 public void map(Object key, Text value, Context context) 62 throws IOException,InterruptedException { 63 String[] parts = value.toString().split("1",-1); 64 String uid = parts[26]; 65 if(!uids.contains(uid)) 66 return; 67 String url = parts[2] + parts[6]; 68 String httpstate = parts[8]; 69 String restate = parts[5]; 70 String time = parts[4]; 71 String refer = parts[14]; 72 String agent = parts[16];//n-5 n-4 73 context.write(new Text(uid + " " + httpstate),new Text(url + " " + refer + " " + time + " " + restate + " " + agent)); 74 } 75 } 76 77 public static class Reduce extends Reducer<Text, Text, Text, Text> { 78 79 @Override 80 public void reduce(Text key,Iterable<Text> values, Context context) 81 throws IOException, InterruptedException { 82 for(Text value : values) { 83 context.write(key,value); 84 } 85 } 86 } 87 88 @Override 89 public int run(String[] args) throws Exception { 90 Configuration conf = getConf(); 91 Job job = new Job(); 92 job.setJobName("UnnormalUid"); 93 job.setJarByClass(UnnormalUid.class); 94 95 FileInputFormat.addInputPath(job, new Path(conf.get(""))); 96 FileOutputFormat.setOutputPath(job, new Path(conf.get(""))); 97 98 job.setMapperClass(UnnormalUidMap.class); 99 job.setReducerClass(Reduce.class); 100 101 job.setInputFormatClass(SequenceFileInputFormat.class); 102 job.setOutputFormatClass(TextOutputFormat.class); 103 104 job.setMapOutputKeyClass(Text.class); 105 job.setMapOutputValueClass(Text.class); 106 job.setOutputKeyClass(Text.class); 107 job.setOutputValueClass(Text.class); 108 109 job.waitForCompletion(true); 110 111 return 0; 112 } 113 114 115 public static void main(String[] args) throws Exception { 116 ToolRunner.run(new UnnormalUid(), args); 117 } 118 119 }
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.conf.Configured; 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Job; 6 import org.apache.hadoop.mapreduce.Mapper; 7 import org.apache.hadoop.mapreduce.Reducer; 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 9 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 12 import org.apache.hadoop.util.Tool; 13 import org.apache.hadoop.util.ToolRunner; 14 15 import java.io.IOException; 16 import java.util.ArrayList; 17 import java.util.List; 18 19 20 /** 21 * Created by gray on 14-8-3. 22 */ 23 public class GetVersion extends Configured implements Tool { 24 25 public static class GetVersionMap extends Mapper<Object, Text, Text, Text> { 26 27 @Override 28 public void map(Object key, Text value, Context context) 29 throws IOException ,InterruptedException { 30 List<String> requests = new ArrayList<String>(); 31 String[] parts = value.toString().split("1",-1); 32 String url = parts[2] + parts[6]; 33 String httpstate = parts[4]; 34 String uid = parts[26]; 35 if(!url.contains("/v2/") || httpstate.equals("POST")) { 36 //can't do with post 37 return; 38 } 39 String[] tmps = url.split("\?",-1); 40 if(tmps.length == 1) { 41 context.write(new Text(uid), new Text(url)); 42 return; 43 } 44 int len = tmps[1].length(); 45 String temp = ""; 46 boolean falg = true; 47 for(int i = 0; i < len; i++) { 48 if(tmps[1].charAt(i) == '=') { 49 requests.add(temp); 50 temp = ""; 51 falg = false; 52 } 53 if(falg) 54 temp += tmps[1].charAt(i); 55 if(tmps[1].charAt(i) == '&') { 56 falg = true; 57 } 58 } 59 int state = 0; 60 for(int i = 0; i < requests.size(); i++) { 61 if(requests.get(i).equals("_s_")) 62 state ^= 1; 63 if(requests.get(i).equals("_v_")) 64 state ^= 2; 65 if(requests.get(i).equals("_c_")) 66 state ^= 4; 67 if(requests.get(i).equals("_t_")) 68 state ^= 8; 69 } 70 if((state & 8) != 0) { 71 context.write(new Text(uid), new Text(tmps[0] + requests.toString())); 72 } else if(state < 7) { 73 context.write(new Text(uid), new Text(tmps[0] + requests.toString())); 74 } 75 } 76 } 77 78 public static class Reduce extends Reducer<Text, Text, Text, Text> { 79 80 @Override 81 public void reduce(Text key,Iterable<Text> values, Context context) 82 throws IOException, InterruptedException { 83 String out = ""; 84 for(Text value : values) { 85 //context.write(key,value); 86 out += value.toString(); 87 } 88 context.write(key,new Text(out)); 89 } 90 } 91 92 @Override 93 public int run(String[] args) throws Exception { 94 Configuration conf = getConf(); 95 Job job = new Job(); 96 job.setJobName("GetVersion"); 97 job.setJarByClass(GetVersion.class); 98 99 FileInputFormat.addInputPath(job, new Path(conf.get(""))); 100 FileOutputFormat.setOutputPath(job, new Path(conf.get(""))); 101 102 job.setMapperClass(GetVersionMap.class); 103 job.setReducerClass(Reduce.class); 104 105 job.setInputFormatClass(SequenceFileInputFormat.class); 106 job.setOutputFormatClass(TextOutputFormat.class); 107 108 job.setMapOutputKeyClass(Text.class); 109 job.setMapOutputValueClass(Text.class); 110 job.setOutputKeyClass(Text.class); 111 job.setOutputValueClass(Text.class); 112 113 job.waitForCompletion(true); 114 115 return 0; 116 } 117 118 119 public static void main(String[] args) throws Exception { 120 ToolRunner.run(new GetVersion(), args); 121 } 122 123 }