1 package p3.hadoop.mapred;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.io.BytesWritable;
7 import p3.common.lib.BinaryUtils;
8 import p3.common.lib.Bytes;
9
10 public class PcapLineReader
11 {
12 private static final int DEFAULT_BUFFER_SIZE = 2048;
13 private int bufferSize;
14 private static final int PCAP_FILE_HEADER_LENGTH = 24;
15 private static final int PCAP_PACKET_HEADER_LENGTH = 16;
16 private static final int PCAP_PACKET_HEADER_CAPLEN_POS = 8;
17 private static final int PCAP_PACKET_HEADER_WIREDLEN_POS = 12;
18 private static final int PCAP_PACKET_HEADER_CAPLEN_LEN = 4;
19 private static final int PCAP_PACKET_HEADER_TIMESTAMP_LEN = 4;
20 private static final int PCAP_PACKET_MIN_LEN = 53;
21 private static final int PCAP_PACKET_MAX_LEN = 1519;
22 private static final int MAGIC_NUMBER = -725372255;
23 private static final int MIN_PKT_SIZE = 42;
24 private long min_captime;
25 private long max_captime;
26 private InputStream in;
27 private byte[] buffer;
28 byte[] pcap_header;
29 private int bufferLength;
30 int consumed;
31
32 public PcapLineReader(InputStream in, int bufferSize, long min_captime, long max_captime)
33 {
34 this.bufferSize = 2048;
35
36 this.bufferLength = 0;
37 this.consumed = 0;
38
39 this.in = in;
40 this.bufferSize = bufferSize;
41 this.buffer = new byte[this.bufferSize];
42 this.min_captime = min_captime;
43 this.max_captime = max_captime;
44 }
45
46 public PcapLineReader(InputStream in, Configuration conf)
47 throws IOException
48 {
49 this(in, 2048,
50 conf.getLong("pcap.file.captime.min", 1309412600L),
51 conf.getLong("pcap.file.captime.max", conf.getLong("pcap.file.captime.max", 1309412600L) + 172800L));
52 }
53
54 public void close()
55 throws IOException
56 {
57 this.in.close();
58 }
59
60 int skipPartialRecord(int fraction)
61 throws IOException
62 {
63 int pos = 0;
64 byte[] captured = new byte[fraction];
65 byte[] tmpTimestamp1 = new byte[4];
66 byte[] tmpTimestamp2 = new byte[4];
67 byte[] tmpCapturedLen1 = new byte[4];
68 byte[] tmpWiredLen1 = new byte[4];
69 byte[] tmpCapturedLen2 = new byte[4];
70 byte[] tmpWiredLen2 = new byte[4];
71 int caplen1 = 0;
72 int wiredlen1 = 0;
73 int caplen2 = 0;
74 int wiredlen2 = 0;
75 long timestamp2 = 0L;
76
77 int size = 0;
78 long endureTime = 100L;
79
80 if ((size = this.in.read(captured)) < 42) return 0;
81
82 do
83 {
84 if ((size - pos < 32) || (size - pos < 53)) {
85 pos = size;
86 break;
87 }
88
89 System.arraycopy(captured, pos, tmpTimestamp1, 0, 4);
90 long timestamp1 = Bytes.toLong(BinaryUtils.flipBO(tmpTimestamp1, 4));
91
92 System.arraycopy(captured, pos + 8, tmpCapturedLen1, 0, 4);
93 caplen1 = Bytes.toInt(BinaryUtils.flipBO(tmpCapturedLen1, 4));
94
95 System.arraycopy(captured, pos + 12, tmpWiredLen1, 0, 4);
96 wiredlen1 = Bytes.toInt(BinaryUtils.flipBO(tmpWiredLen1, 4));
97
98 if ((caplen1 > 53) && (caplen1 < 1519) && (size - pos - 32 - caplen1 > 0))
99 {
100 System.arraycopy(captured, pos + 16 + caplen1 + 8, tmpCapturedLen2, 0, 4);
101 caplen2 = Bytes.toInt(BinaryUtils.flipBO(tmpCapturedLen2, 4));
102
103 System.arraycopy(captured, pos + 16 + caplen1 + 12, tmpWiredLen2, 0, 4);
104 wiredlen2 = Bytes.toInt(BinaryUtils.flipBO(tmpWiredLen2, 4));
105
106 System.arraycopy(captured, pos + 16 + caplen1, tmpTimestamp2, 0, 4);
107 timestamp2 = Bytes.toLong(BinaryUtils.flipBO(tmpTimestamp2, 4));
108
109 if ((timestamp1 >= this.min_captime) && (timestamp1 < this.max_captime) && (this.min_captime <= timestamp2) && (timestamp2 < this.max_captime) &&
110 (wiredlen1 > 53) && (wiredlen1 < 1519) && (wiredlen2 > 53) && (wiredlen2 < 1519) &&
111 (caplen1 > 0) && (caplen1 <= wiredlen1) && (caplen2 > 0) && (caplen2 <= wiredlen2) &&
112 (timestamp2 >= timestamp1) && (timestamp2 - timestamp1 < endureTime)) {
113 return pos;
114 }
115
116 }
117
118 ++pos;
119 }
120 while (pos < size);
121
122 return pos;
123 }
124
125 int readPacket(int packetLen)
126 throws IOException
127 {
128 int bufferPosn = 16;
129 byte[] tmp_buffer = new byte[packetLen];
130
131 if ((this.bufferLength = this.in.read(tmp_buffer)) < packetLen) {
132 System.arraycopy(tmp_buffer, 0, this.buffer, bufferPosn, this.bufferLength);
133 bufferPosn += this.bufferLength;
134
135 byte[] newpacket = new byte[packetLen - this.bufferLength];
136
137 if ((this.bufferLength = this.in.read(newpacket)) < 0) return bufferPosn;
138 System.arraycopy(newpacket, 0, this.buffer, bufferPosn, this.bufferLength);
139 }
140 else
141 {
142 System.arraycopy(tmp_buffer, 0, this.buffer, bufferPosn, this.bufferLength);
143 }
144 bufferPosn += this.bufferLength;
145
146 return bufferPosn;
147 }
148
149 int readPacketHeader()
150 {
151 int headerLength = 0;
152 int headerPosn = 0;
153 this.pcap_header = new byte[16];
154
155 byte[] tmp_header = new byte[16];
156 BytesWritable capturedLen = new BytesWritable();
157 try
158 {
159 if ((headerLength = this.in.read(this.pcap_header)) < 16)
160 {
161 if (headerLength == -1) return 0;
162 headerPosn += headerLength;
163
164 byte[] newheader = new byte[16 - headerLength];
165
166 if ((headerLength = this.in.read(newheader)) < 0) {
167 this.consumed = headerPosn;
168 return -1;
169 }
170 System.arraycopy(newheader, 0, this.pcap_header, headerPosn, headerLength);
171 }
172 capturedLen.set(this.pcap_header, 8, 4);
173 System.arraycopy(this.pcap_header, 0, this.buffer, 0, 16);
174 headerPosn = 0;
175 }
176 catch (IOException e)
177 {
178 e.printStackTrace();
179 }
180 return Bytes.toInt(BinaryUtils.flipBO(capturedLen.getBytes(), 4));
181 }
182
183 public int readFileHeader()
184 {
185 try {
186 byte[] magic = new byte[4];
187 this.bufferLength = this.in.read(this.buffer, 0, 24);
188 System.arraycopy(this.buffer, 0, magic, 0, magic.length);
189
190 if (Bytes.toInt(magic) == -725372255) break label50;
191 return 0;
192 }
193 catch (IOException e) {
194 e.printStackTrace();
195 }
196 label50: return this.bufferLength;
197 }
198
199 public int readLine(BytesWritable bytes, int maxLineLength, int maxBytesToConsume)
200 throws IOException
201 {
202 bytes.set(new BytesWritable());
203 boolean hitEndOfFile = false;
204 long bytesConsumed = 0L;
205
206 int caplen = readPacketHeader();
207
208 if (caplen == 0) {
209 bytesConsumed = 0L;
210 } else if (caplen == -1) {
211 bytesConsumed += this.consumed;
212 }
213 else if ((caplen > 0) && (caplen < 1519)) {
214 if ((this.bufferLength = readPacket(caplen)) < caplen + 16) {
215 hitEndOfFile = true;
216 }
217 bytesConsumed += this.bufferLength;
218
219 if (!(hitEndOfFile)) {
220 bytes.set(this.buffer, 0, caplen + 16);
221 }
222 }
223
224 return (int)Math.min(bytesConsumed, 2147483647L);
225 }
226
227 public int readLine(BytesWritable str, int maxLineLength)
228 throws IOException
229 {
230 return readLine(str, maxLineLength, 2147483647);
231 }
232
233 public int readLine(BytesWritable str)
234 throws IOException
235 {
236 return readLine(str, 2147483647, 2147483647);
237 }
238 }