root / project / src / main / java / service / PacketHandler.java @ 1
History | View | Annotate | Download (13.2 KB)
1 | 1 | up20120064 | package main.java.service; |
---|---|---|---|
2 | |||
3 | |||
4 | import main.java.file.FileChunk; |
||
5 | import main.java.file.FileChunkID; |
||
6 | import main.java.file.FileID; |
||
7 | import main.java.listeners.Broker; |
||
8 | import main.java.peer.Peer; |
||
9 | import main.java.protocols.Backup; |
||
10 | import main.java.protocols.BackupChunk; |
||
11 | import main.java.utils.Constants.*; |
||
12 | |||
13 | import java.io.*; |
||
14 | import java.net.DatagramPacket; |
||
15 | import java.net.InetAddress; |
||
16 | import java.util.ArrayList; |
||
17 | import java.util.Arrays; |
||
18 | import java.util.List; |
||
19 | |||
20 | import static main.java.utils.Constants.*; |
||
21 | import static main.java.utils.Utilities.getLocalAddress; |
||
22 | import static main.java.utils.Utilities.sha256; |
||
23 | |||
24 | public class PacketHandler implements Runnable { |
||
25 | |||
26 | private DatagramPacket packetToHandle; |
||
27 | private String packet_header; |
||
28 | private byte[] packet_body; |
||
29 | private String subprotocol; |
||
30 | private FileID fileID;
|
||
31 | private float protocolVersion; |
||
32 | private int replicationDegree; |
||
33 | private int chunkNo; |
||
34 | private int senderID; |
||
35 | private int bodyStartingIndex; |
||
36 | private String[] header_splitted; |
||
37 | |||
38 | private InetAddress senderIP; |
||
39 | |||
40 | private boolean listeningChunk; |
||
41 | private boolean receivedChunk; |
||
42 | private boolean badMessage; |
||
43 | |||
44 | |||
45 | public PacketHandler(DatagramPacket packetToHandle) { |
||
46 | this.packetToHandle = packetToHandle;
|
||
47 | packet_header = null;
|
||
48 | packet_body = null;
|
||
49 | header_splitted = null;
|
||
50 | senderIP = packetToHandle.getAddress(); |
||
51 | listeningChunk = false;
|
||
52 | receivedChunk = false;
|
||
53 | badMessage = false;
|
||
54 | } |
||
55 | |||
56 | @Override
|
||
57 | public void run() { |
||
58 | |||
59 | parseSubProtocol(); |
||
60 | //parseHeader();
|
||
61 | |||
62 | |||
63 | if(Integer.parseInt(header_splitted[2]) != Peer.getID()){ |
||
64 | |||
65 | switch (subprotocol) {
|
||
66 | case PUTCHUNK:
|
||
67 | parsePUTCHUNK(); |
||
68 | PUTCHUNKHandler(); |
||
69 | break;
|
||
70 | case STORED:
|
||
71 | parseSTORED(); |
||
72 | STOREDHandler(); |
||
73 | break;
|
||
74 | case GETCHUNK:
|
||
75 | parseGETCHUNK(); |
||
76 | GETCHUNKHandler(); |
||
77 | break;
|
||
78 | case CHUNK:
|
||
79 | parseCHUNK(); |
||
80 | CHUNKHandler(); |
||
81 | break;
|
||
82 | case DELETE:
|
||
83 | parseDELETE(); |
||
84 | DELETEHandler(); |
||
85 | break;
|
||
86 | case REMOVED:
|
||
87 | parseREMOVED(); |
||
88 | REMOVEDHandler(); |
||
89 | break;
|
||
90 | |||
91 | default: System.out.println("Unknown protocol. Ignoring message... " + subprotocol); |
||
92 | badMessage = true;
|
||
93 | break;
|
||
94 | |||
95 | } |
||
96 | |||
97 | if(!badMessage)
|
||
98 | System.out.println("\t Sender ID: " + packetToHandle.getAddress() + " \n" + |
||
99 | "\t PEER ID : " + senderID + "\n"); |
||
100 | |||
101 | |||
102 | } |
||
103 | |||
104 | |||
105 | |||
106 | } |
||
107 | |||
108 | private void parseSubProtocol() { |
||
109 | |||
110 | ByteArrayInputStream stream = new ByteArrayInputStream(packetToHandle.getData()); |
||
111 | BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); |
||
112 | |||
113 | try {
|
||
114 | packet_header = reader.readLine(); |
||
115 | header_splitted = packet_header.split(" ");
|
||
116 | subprotocol = header_splitted[0];
|
||
117 | } catch (IOException e) { |
||
118 | e.printStackTrace(); |
||
119 | } |
||
120 | } |
||
121 | |||
122 | |||
123 | private void parseREMOVED() { |
||
124 | |||
125 | protocolVersion = Float.parseFloat(header_splitted[1]); |
||
126 | senderID = Integer.parseInt(header_splitted[2]); |
||
127 | fileID = new FileID(sha256(header_splitted[3]), -1); |
||
128 | chunkNo = Integer.parseInt(header_splitted[4]); |
||
129 | |||
130 | |||
131 | |||
132 | } |
||
133 | |||
134 | private void REMOVEDHandler() { |
||
135 | |||
136 | |||
137 | if(Peer.getDb().isFileStored(fileID)){
|
||
138 | FileChunkID chunkID = new FileChunkID(fileID.toString(), chunkNo);
|
||
139 | Peer.getDb().decreasePerceivedRepDeg(chunkID , senderID); |
||
140 | |||
141 | System.out.println("HANDLER REMOVED: "+ Peer.getDb().getPerceivedRepDeg(chunkID) + "/" + |
||
142 | Peer.getDb().getDesiredRepDeg(chunkID)); |
||
143 | |||
144 | |||
145 | if(Peer.getDb().getPerceivedRepDeg(chunkID) < Peer.getDb().getDesiredRepDeg(chunkID)){
|
||
146 | System.out.println("\tPerceived rep degree dropped below desired rep degree."); |
||
147 | |||
148 | Peer.getMCListener().startCountingPutChunks(chunkID); |
||
149 | |||
150 | try{
|
||
151 | System.out.println("\tWaiting for Putchunk messages..."); |
||
152 | Thread.sleep((long)(Math.random() * MAX_WAITING_TIME)); |
||
153 | } catch (InterruptedException ie){ |
||
154 | ie.printStackTrace(); |
||
155 | } |
||
156 | |||
157 | int putChunksReceived = Peer.getMCListener().getCountPutChunks(chunkID);
|
||
158 | |||
159 | Peer.getMCListener().stopSavingPutChunks(chunkID); |
||
160 | |||
161 | if (putChunksReceived == 0){ |
||
162 | |||
163 | try {
|
||
164 | File cFile = new File("peer"+Peer.getID()+"/Backup/"+ |
||
165 | fileID.toString().split("\\.")[0]+ "/"+chunkID.toString()); |
||
166 | |||
167 | |||
168 | |||
169 | |||
170 | byte[] data = Backup.loadFileData(cFile); |
||
171 | |||
172 | //FileChunk(int replicationDegree, int chunkNo, FileID fileID, byte[] chunkData)
|
||
173 | fileID = new FileID(fileID.toString(), Peer.getDb().getDesiredRepDeg(chunkID));
|
||
174 | FileChunk fChunk = new FileChunk(Peer.getDb().getDesiredRepDeg(chunkID), chunkNo,
|
||
175 | fileID, data); |
||
176 | |||
177 | new Thread(new BackupChunk(fChunk)).start(); |
||
178 | } catch (FileNotFoundException e) { |
||
179 | e.printStackTrace(); |
||
180 | } |
||
181 | |||
182 | |||
183 | } |
||
184 | |||
185 | |||
186 | } |
||
187 | |||
188 | } |
||
189 | |||
190 | |||
191 | |||
192 | |||
193 | |||
194 | |||
195 | } |
||
196 | |||
197 | private void parseDELETE() { |
||
198 | |||
199 | |||
200 | protocolVersion = Float.parseFloat(header_splitted[1]); |
||
201 | senderID = Integer.parseInt(header_splitted[2]); |
||
202 | fileID = new FileID(sha256(header_splitted[3]), -1); |
||
203 | |||
204 | } |
||
205 | |||
206 | private void DELETEHandler() { |
||
207 | |||
208 | |||
209 | |||
210 | final File folder = new File("peer"+Peer.getID()+"/Backup/"+fileID.toString()+ |
||
211 | "/");
|
||
212 | |||
213 | System.out.println("FOLDER: " + folder.getPath()); |
||
214 | |||
215 | final File[] files = folder.listFiles( new FilenameFilter() { |
||
216 | @Override
|
||
217 | public boolean accept( final File dir, |
||
218 | final String name ) { |
||
219 | return name.matches( fileID.toString() + "-.*" ); |
||
220 | } |
||
221 | } ); |
||
222 | for ( final File file : files ) { |
||
223 | if ( !file.delete() ) {
|
||
224 | System.err.println( "Can't remove " + file.getAbsolutePath() ); |
||
225 | } |
||
226 | else {
|
||
227 | Peer.getDisk().removeFile(file.length()); |
||
228 | Peer.getDb().removeChunkInfo(new FileChunkID(fileID.toString(),
|
||
229 | Integer.parseInt(file.getName().split("-")[1]))); |
||
230 | |||
231 | Peer.getDb().removeFile(fileID); |
||
232 | } |
||
233 | } |
||
234 | |||
235 | |||
236 | |||
237 | |||
238 | |||
239 | } |
||
240 | |||
241 | private void parseCHUNK() { |
||
242 | |||
243 | //CHUNK <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF><Body>
|
||
244 | if(listeningChunk)
|
||
245 | receivedChunk=true;
|
||
246 | |||
247 | protocolVersion = Float.parseFloat(header_splitted[1]); |
||
248 | senderID = Integer.parseInt(header_splitted[2]); |
||
249 | fileID = new FileID(header_splitted[3], -1); |
||
250 | chunkNo = Integer.parseInt(header_splitted[4]); |
||
251 | parseBody(); |
||
252 | |||
253 | } |
||
254 | |||
255 | private void CHUNKHandler() { |
||
256 | |||
257 | |||
258 | if(Peer.restoring){
|
||
259 | FileChunk chunk = new FileChunk(-1, chunkNo, fileID, packet_body); |
||
260 | |||
261 | |||
262 | Peer.getMDRListener().queueChunk(chunk); |
||
263 | //System.out.println("\t CHUNKS RECEIVED : ");
|
||
264 | //Peer.getMDRListener().printChunksReceived();
|
||
265 | } |
||
266 | |||
267 | |||
268 | } |
||
269 | |||
270 | private void parseGETCHUNK() { |
||
271 | |||
272 | /* GETCHUNK <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF> */
|
||
273 | |||
274 | protocolVersion = Float.parseFloat(header_splitted[1]); |
||
275 | senderID = Integer.parseInt(header_splitted[2]); |
||
276 | fileID = new FileID(header_splitted[3], -1); |
||
277 | chunkNo = Integer.parseInt(header_splitted[4]); |
||
278 | |||
279 | |||
280 | } |
||
281 | |||
282 | private void GETCHUNKHandler() { |
||
283 | |||
284 | |||
285 | FileChunkID chunkID = new FileChunkID(fileID.toString(), chunkNo);
|
||
286 | FileInputStream is = null; |
||
287 | File chunkFile = new File("peer"+Peer.getID()+"/Backup/"+fileID.toString().split("\\.")[0]+ "/" + chunkID); |
||
288 | try {
|
||
289 | is = new FileInputStream(chunkFile); |
||
290 | } catch (FileNotFoundException e) { |
||
291 | e.printStackTrace(); |
||
292 | } |
||
293 | |||
294 | byte[] chunkData = new byte[(int) chunkFile.length()]; |
||
295 | |||
296 | try {
|
||
297 | assert is != null; |
||
298 | is.read(chunkData); |
||
299 | } catch (IOException e) { |
||
300 | e.printStackTrace(); |
||
301 | } |
||
302 | try {
|
||
303 | is.close(); |
||
304 | } catch (IOException e) { |
||
305 | e.printStackTrace(); |
||
306 | } |
||
307 | |||
308 | |||
309 | FileChunk chunk = new FileChunk(-1, chunkID.getChunkNumber(), fileID, chunkData); |
||
310 | |||
311 | /*
|
||
312 | |||
313 | To avoid flooding the host with CHUNK messages, each peer shall wait for a random time
|
||
314 | uniformly distributed between 0 and 400 ms, before sending the CHUNK message. If it receives
|
||
315 | a CHUNK message before that time expires, it will not send the CHUNK message.
|
||
316 | |||
317 | */
|
||
318 | |||
319 | try {
|
||
320 | listeningChunk = true;
|
||
321 | Thread.sleep((long)(Math.random() * MAX_WAITING_TIME)); |
||
322 | } catch (InterruptedException e) { |
||
323 | e.printStackTrace(); |
||
324 | } |
||
325 | |||
326 | if(!receivedChunk)
|
||
327 | Broker.sendCHUNK(chunk, chunkID); |
||
328 | |||
329 | receivedChunk = false;
|
||
330 | listeningChunk = false;
|
||
331 | |||
332 | |||
333 | } |
||
334 | |||
335 | private void parseSTORED() { |
||
336 | //STORED <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF>
|
||
337 | |||
338 | protocolVersion = Float.parseFloat(header_splitted[1]); |
||
339 | senderID = Integer.parseInt(header_splitted[2]); |
||
340 | fileID = new FileID(header_splitted[3], -1); |
||
341 | chunkNo = Integer.parseInt(header_splitted[4]); |
||
342 | |||
343 | } |
||
344 | |||
345 | private void STOREDHandler() { |
||
346 | |||
347 | FileChunkID fcID = new FileChunkID(fileID.toString(), chunkNo);
|
||
348 | Peer.getMCListener().countStored(fcID, String.valueOf(senderID));
|
||
349 | Peer.getDb().increasePerceivedRepDeg(new FileChunkID(fileID.toString(), chunkNo), senderID);
|
||
350 | |||
351 | } |
||
352 | |||
353 | |||
354 | private void parsePUTCHUNK() { |
||
355 | |||
356 | |||
357 | replicationDegree = Integer.parseInt(header_splitted[5]); |
||
358 | protocolVersion = Float.parseFloat(header_splitted[1]); |
||
359 | senderID = Integer.parseInt(header_splitted[2]); |
||
360 | fileID = new FileID(header_splitted[3], replicationDegree); |
||
361 | chunkNo = Integer.parseInt(header_splitted[4]); |
||
362 | parseBody(); |
||
363 | |||
364 | } |
||
365 | |||
366 | private void PUTCHUNKHandler() { |
||
367 | |||
368 | |||
369 | |||
370 | File dir = new File("peer"+Peer.getID()+"/Backup/"+fileID.toString().split("\\.")[0]+ "/"); |
||
371 | |||
372 | if(!dir.exists()){
|
||
373 | |||
374 | System.out.println("creating directory: " + dir.getName()); |
||
375 | |||
376 | try{
|
||
377 | dir.mkdirs(); |
||
378 | } |
||
379 | catch(SecurityException se){ |
||
380 | se.printStackTrace(); |
||
381 | } |
||
382 | |||
383 | } |
||
384 | |||
385 | |||
386 | |||
387 | FileChunkID chunkID = new FileChunkID(fileID.toString(), chunkNo);
|
||
388 | System.out.println("\t FILEID : " + fileID.toString() + "\n" |
||
389 | + "\t CHUNK NO : " + chunkNo+ "\n"); |
||
390 | System.out.println("\t PUT CHUNK: " + fileID.toString() + " with Replication Degree : " + replicationDegree + " in " + dir.getPath()); |
||
391 | System.out.println("\t Chunk ID: " + chunkID.toString()); |
||
392 | |||
393 | |||
394 | |||
395 | |||
396 | |||
397 | File chunkfile = new File(dir.getPath()+"/"+ chunkID.toString()); |
||
398 | |||
399 | |||
400 | |||
401 | |||
402 | |||
403 | Peer.getMDBListener().countPutChunk(chunkID, String.valueOf(senderID));
|
||
404 | |||
405 | if(chunkfile.exists()) {
|
||
406 | System.out.println("\n\t I already have this chunk, sending STORED..."); |
||
407 | Broker.sendSTORED(chunkID); |
||
408 | } else {
|
||
409 | |||
410 | if (Peer.getDisk().saveFile(packet_body.length)){
|
||
411 | |||
412 | try {
|
||
413 | FileOutputStream out = new FileOutputStream(dir.getPath()+"/" + chunkID.toString()); |
||
414 | System.out.println("\n\t Saving Chunk...\n"); |
||
415 | out.write(packet_body); |
||
416 | out.close(); |
||
417 | |||
418 | Peer.getDb().insertFile(fileID); |
||
419 | Peer.getDb().insertChunkInfo(fileID, replicationDegree, chunkNo, Peer.getID()); |
||
420 | |||
421 | try{
|
||
422 | System.out.println("\tSending STORED response..."); |
||
423 | Thread.sleep((long)(Math.random() * MAX_WAITING_TIME)); |
||
424 | } catch (InterruptedException ie){ |
||
425 | ie.printStackTrace(); |
||
426 | } |
||
427 | |||
428 | Broker.sendSTORED(chunkID); |
||
429 | |||
430 | } catch (IOException e) { |
||
431 | e.printStackTrace(); |
||
432 | } |
||
433 | |||
434 | } |
||
435 | |||
436 | |||
437 | } |
||
438 | |||
439 | |||
440 | |||
441 | |||
442 | |||
443 | } |
||
444 | |||
445 | private void parseBody() { |
||
446 | |||
447 | ByteArrayInputStream stream = new ByteArrayInputStream(packetToHandle.getData()); |
||
448 | BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); |
||
449 | |||
450 | String line = null; |
||
451 | int totalHeaderLinesLength = 0; |
||
452 | int totalLines = 0; |
||
453 | |||
454 | do {
|
||
455 | try {
|
||
456 | line = reader.readLine(); |
||
457 | totalHeaderLinesLength += line.length(); |
||
458 | totalLines++; |
||
459 | } catch (IOException e) { |
||
460 | e.printStackTrace(); |
||
461 | } |
||
462 | } while (!(line != null && line.isEmpty())); |
||
463 | |||
464 | bodyStartingIndex = totalHeaderLinesLength + (totalLines * CRLF.getBytes().length); |
||
465 | |||
466 | packet_body = Arrays.copyOfRange(packetToHandle.getData(), bodyStartingIndex,
|
||
467 | packetToHandle.getLength()); |
||
468 | |||
469 | //System.out.println("STARTING INDEX:" + bodyStartingIndex);
|
||
470 | } |
||
471 | |||
472 | |||
473 | |||
474 | |||
475 | |||
476 | } |