root / project / src / main / java / service / PacketHandler.java @ 1
History | View | Annotate | Download (13.2 KB)
1 |
package main.java.service; |
---|---|
2 |
|
3 |
|
4 |
import main.java.file.FileChunk; |
5 |
import main.java.file.FileChunkID; |
6 |
import main.java.file.FileID; |
7 |
import main.java.listeners.Broker; |
8 |
import main.java.peer.Peer; |
9 |
import main.java.protocols.Backup; |
10 |
import main.java.protocols.BackupChunk; |
11 |
import main.java.utils.Constants.*; |
12 |
|
13 |
import java.io.*; |
14 |
import java.net.DatagramPacket; |
15 |
import java.net.InetAddress; |
16 |
import java.util.ArrayList; |
17 |
import java.util.Arrays; |
18 |
import java.util.List; |
19 |
|
20 |
import static main.java.utils.Constants.*; |
21 |
import static main.java.utils.Utilities.getLocalAddress; |
22 |
import static main.java.utils.Utilities.sha256; |
23 |
|
24 |
public class PacketHandler implements Runnable { |
25 |
|
26 |
private DatagramPacket packetToHandle; |
27 |
private String packet_header; |
28 |
private byte[] packet_body; |
29 |
private String subprotocol; |
30 |
private FileID fileID;
|
31 |
private float protocolVersion; |
32 |
private int replicationDegree; |
33 |
private int chunkNo; |
34 |
private int senderID; |
35 |
private int bodyStartingIndex; |
36 |
private String[] header_splitted; |
37 |
|
38 |
private InetAddress senderIP; |
39 |
|
40 |
private boolean listeningChunk; |
41 |
private boolean receivedChunk; |
42 |
private boolean badMessage; |
43 |
|
44 |
|
45 |
public PacketHandler(DatagramPacket packetToHandle) { |
46 |
this.packetToHandle = packetToHandle;
|
47 |
packet_header = null;
|
48 |
packet_body = null;
|
49 |
header_splitted = null;
|
50 |
senderIP = packetToHandle.getAddress(); |
51 |
listeningChunk = false;
|
52 |
receivedChunk = false;
|
53 |
badMessage = false;
|
54 |
} |
55 |
|
56 |
@Override
|
57 |
public void run() { |
58 |
|
59 |
parseSubProtocol(); |
60 |
//parseHeader();
|
61 |
|
62 |
|
63 |
if(Integer.parseInt(header_splitted[2]) != Peer.getID()){ |
64 |
|
65 |
switch (subprotocol) {
|
66 |
case PUTCHUNK:
|
67 |
parsePUTCHUNK(); |
68 |
PUTCHUNKHandler(); |
69 |
break;
|
70 |
case STORED:
|
71 |
parseSTORED(); |
72 |
STOREDHandler(); |
73 |
break;
|
74 |
case GETCHUNK:
|
75 |
parseGETCHUNK(); |
76 |
GETCHUNKHandler(); |
77 |
break;
|
78 |
case CHUNK:
|
79 |
parseCHUNK(); |
80 |
CHUNKHandler(); |
81 |
break;
|
82 |
case DELETE:
|
83 |
parseDELETE(); |
84 |
DELETEHandler(); |
85 |
break;
|
86 |
case REMOVED:
|
87 |
parseREMOVED(); |
88 |
REMOVEDHandler(); |
89 |
break;
|
90 |
|
91 |
default: System.out.println("Unknown protocol. Ignoring message... " + subprotocol); |
92 |
badMessage = true;
|
93 |
break;
|
94 |
|
95 |
} |
96 |
|
97 |
if(!badMessage)
|
98 |
System.out.println("\t Sender ID: " + packetToHandle.getAddress() + " \n" + |
99 |
"\t PEER ID : " + senderID + "\n"); |
100 |
|
101 |
|
102 |
} |
103 |
|
104 |
|
105 |
|
106 |
} |
107 |
|
108 |
private void parseSubProtocol() { |
109 |
|
110 |
ByteArrayInputStream stream = new ByteArrayInputStream(packetToHandle.getData()); |
111 |
BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); |
112 |
|
113 |
try {
|
114 |
packet_header = reader.readLine(); |
115 |
header_splitted = packet_header.split(" ");
|
116 |
subprotocol = header_splitted[0];
|
117 |
} catch (IOException e) { |
118 |
e.printStackTrace(); |
119 |
} |
120 |
} |
121 |
|
122 |
|
123 |
private void parseREMOVED() { |
124 |
|
125 |
protocolVersion = Float.parseFloat(header_splitted[1]); |
126 |
senderID = Integer.parseInt(header_splitted[2]); |
127 |
fileID = new FileID(sha256(header_splitted[3]), -1); |
128 |
chunkNo = Integer.parseInt(header_splitted[4]); |
129 |
|
130 |
|
131 |
|
132 |
} |
133 |
|
134 |
private void REMOVEDHandler() { |
135 |
|
136 |
|
137 |
if(Peer.getDb().isFileStored(fileID)){
|
138 |
FileChunkID chunkID = new FileChunkID(fileID.toString(), chunkNo);
|
139 |
Peer.getDb().decreasePerceivedRepDeg(chunkID , senderID); |
140 |
|
141 |
System.out.println("HANDLER REMOVED: "+ Peer.getDb().getPerceivedRepDeg(chunkID) + "/" + |
142 |
Peer.getDb().getDesiredRepDeg(chunkID)); |
143 |
|
144 |
|
145 |
if(Peer.getDb().getPerceivedRepDeg(chunkID) < Peer.getDb().getDesiredRepDeg(chunkID)){
|
146 |
System.out.println("\tPerceived rep degree dropped below desired rep degree."); |
147 |
|
148 |
Peer.getMCListener().startCountingPutChunks(chunkID); |
149 |
|
150 |
try{
|
151 |
System.out.println("\tWaiting for Putchunk messages..."); |
152 |
Thread.sleep((long)(Math.random() * MAX_WAITING_TIME)); |
153 |
} catch (InterruptedException ie){ |
154 |
ie.printStackTrace(); |
155 |
} |
156 |
|
157 |
int putChunksReceived = Peer.getMCListener().getCountPutChunks(chunkID);
|
158 |
|
159 |
Peer.getMCListener().stopSavingPutChunks(chunkID); |
160 |
|
161 |
if (putChunksReceived == 0){ |
162 |
|
163 |
try {
|
164 |
File cFile = new File("peer"+Peer.getID()+"/Backup/"+ |
165 |
fileID.toString().split("\\.")[0]+ "/"+chunkID.toString()); |
166 |
|
167 |
|
168 |
|
169 |
|
170 |
byte[] data = Backup.loadFileData(cFile); |
171 |
|
172 |
//FileChunk(int replicationDegree, int chunkNo, FileID fileID, byte[] chunkData)
|
173 |
fileID = new FileID(fileID.toString(), Peer.getDb().getDesiredRepDeg(chunkID));
|
174 |
FileChunk fChunk = new FileChunk(Peer.getDb().getDesiredRepDeg(chunkID), chunkNo,
|
175 |
fileID, data); |
176 |
|
177 |
new Thread(new BackupChunk(fChunk)).start(); |
178 |
} catch (FileNotFoundException e) { |
179 |
e.printStackTrace(); |
180 |
} |
181 |
|
182 |
|
183 |
} |
184 |
|
185 |
|
186 |
} |
187 |
|
188 |
} |
189 |
|
190 |
|
191 |
|
192 |
|
193 |
|
194 |
|
195 |
} |
196 |
|
197 |
private void parseDELETE() { |
198 |
|
199 |
|
200 |
protocolVersion = Float.parseFloat(header_splitted[1]); |
201 |
senderID = Integer.parseInt(header_splitted[2]); |
202 |
fileID = new FileID(sha256(header_splitted[3]), -1); |
203 |
|
204 |
} |
205 |
|
206 |
private void DELETEHandler() { |
207 |
|
208 |
|
209 |
|
210 |
final File folder = new File("peer"+Peer.getID()+"/Backup/"+fileID.toString()+ |
211 |
"/");
|
212 |
|
213 |
System.out.println("FOLDER: " + folder.getPath()); |
214 |
|
215 |
final File[] files = folder.listFiles( new FilenameFilter() { |
216 |
@Override
|
217 |
public boolean accept( final File dir, |
218 |
final String name ) { |
219 |
return name.matches( fileID.toString() + "-.*" ); |
220 |
} |
221 |
} ); |
222 |
for ( final File file : files ) { |
223 |
if ( !file.delete() ) {
|
224 |
System.err.println( "Can't remove " + file.getAbsolutePath() ); |
225 |
} |
226 |
else {
|
227 |
Peer.getDisk().removeFile(file.length()); |
228 |
Peer.getDb().removeChunkInfo(new FileChunkID(fileID.toString(),
|
229 |
Integer.parseInt(file.getName().split("-")[1]))); |
230 |
|
231 |
Peer.getDb().removeFile(fileID); |
232 |
} |
233 |
} |
234 |
|
235 |
|
236 |
|
237 |
|
238 |
|
239 |
} |
240 |
|
241 |
private void parseCHUNK() { |
242 |
|
243 |
//CHUNK <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF><Body>
|
244 |
if(listeningChunk)
|
245 |
receivedChunk=true;
|
246 |
|
247 |
protocolVersion = Float.parseFloat(header_splitted[1]); |
248 |
senderID = Integer.parseInt(header_splitted[2]); |
249 |
fileID = new FileID(header_splitted[3], -1); |
250 |
chunkNo = Integer.parseInt(header_splitted[4]); |
251 |
parseBody(); |
252 |
|
253 |
} |
254 |
|
255 |
private void CHUNKHandler() { |
256 |
|
257 |
|
258 |
if(Peer.restoring){
|
259 |
FileChunk chunk = new FileChunk(-1, chunkNo, fileID, packet_body); |
260 |
|
261 |
|
262 |
Peer.getMDRListener().queueChunk(chunk); |
263 |
//System.out.println("\t CHUNKS RECEIVED : ");
|
264 |
//Peer.getMDRListener().printChunksReceived();
|
265 |
} |
266 |
|
267 |
|
268 |
} |
269 |
|
270 |
private void parseGETCHUNK() { |
271 |
|
272 |
/* GETCHUNK <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF> */
|
273 |
|
274 |
protocolVersion = Float.parseFloat(header_splitted[1]); |
275 |
senderID = Integer.parseInt(header_splitted[2]); |
276 |
fileID = new FileID(header_splitted[3], -1); |
277 |
chunkNo = Integer.parseInt(header_splitted[4]); |
278 |
|
279 |
|
280 |
} |
281 |
|
282 |
private void GETCHUNKHandler() { |
283 |
|
284 |
|
285 |
FileChunkID chunkID = new FileChunkID(fileID.toString(), chunkNo);
|
286 |
FileInputStream is = null; |
287 |
File chunkFile = new File("peer"+Peer.getID()+"/Backup/"+fileID.toString().split("\\.")[0]+ "/" + chunkID); |
288 |
try {
|
289 |
is = new FileInputStream(chunkFile); |
290 |
} catch (FileNotFoundException e) { |
291 |
e.printStackTrace(); |
292 |
} |
293 |
|
294 |
byte[] chunkData = new byte[(int) chunkFile.length()]; |
295 |
|
296 |
try {
|
297 |
assert is != null; |
298 |
is.read(chunkData); |
299 |
} catch (IOException e) { |
300 |
e.printStackTrace(); |
301 |
} |
302 |
try {
|
303 |
is.close(); |
304 |
} catch (IOException e) { |
305 |
e.printStackTrace(); |
306 |
} |
307 |
|
308 |
|
309 |
FileChunk chunk = new FileChunk(-1, chunkID.getChunkNumber(), fileID, chunkData); |
310 |
|
311 |
/*
|
312 |
|
313 |
To avoid flooding the host with CHUNK messages, each peer shall wait for a random time
|
314 |
uniformly distributed between 0 and 400 ms, before sending the CHUNK message. If it receives
|
315 |
a CHUNK message before that time expires, it will not send the CHUNK message.
|
316 |
|
317 |
*/
|
318 |
|
319 |
try {
|
320 |
listeningChunk = true;
|
321 |
Thread.sleep((long)(Math.random() * MAX_WAITING_TIME)); |
322 |
} catch (InterruptedException e) { |
323 |
e.printStackTrace(); |
324 |
} |
325 |
|
326 |
if(!receivedChunk)
|
327 |
Broker.sendCHUNK(chunk, chunkID); |
328 |
|
329 |
receivedChunk = false;
|
330 |
listeningChunk = false;
|
331 |
|
332 |
|
333 |
} |
334 |
|
335 |
private void parseSTORED() { |
336 |
//STORED <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF>
|
337 |
|
338 |
protocolVersion = Float.parseFloat(header_splitted[1]); |
339 |
senderID = Integer.parseInt(header_splitted[2]); |
340 |
fileID = new FileID(header_splitted[3], -1); |
341 |
chunkNo = Integer.parseInt(header_splitted[4]); |
342 |
|
343 |
} |
344 |
|
345 |
private void STOREDHandler() { |
346 |
|
347 |
FileChunkID fcID = new FileChunkID(fileID.toString(), chunkNo);
|
348 |
Peer.getMCListener().countStored(fcID, String.valueOf(senderID));
|
349 |
Peer.getDb().increasePerceivedRepDeg(new FileChunkID(fileID.toString(), chunkNo), senderID);
|
350 |
|
351 |
} |
352 |
|
353 |
|
354 |
private void parsePUTCHUNK() { |
355 |
|
356 |
|
357 |
replicationDegree = Integer.parseInt(header_splitted[5]); |
358 |
protocolVersion = Float.parseFloat(header_splitted[1]); |
359 |
senderID = Integer.parseInt(header_splitted[2]); |
360 |
fileID = new FileID(header_splitted[3], replicationDegree); |
361 |
chunkNo = Integer.parseInt(header_splitted[4]); |
362 |
parseBody(); |
363 |
|
364 |
} |
365 |
|
366 |
private void PUTCHUNKHandler() { |
367 |
|
368 |
|
369 |
|
370 |
File dir = new File("peer"+Peer.getID()+"/Backup/"+fileID.toString().split("\\.")[0]+ "/"); |
371 |
|
372 |
if(!dir.exists()){
|
373 |
|
374 |
System.out.println("creating directory: " + dir.getName()); |
375 |
|
376 |
try{
|
377 |
dir.mkdirs(); |
378 |
} |
379 |
catch(SecurityException se){ |
380 |
se.printStackTrace(); |
381 |
} |
382 |
|
383 |
} |
384 |
|
385 |
|
386 |
|
387 |
FileChunkID chunkID = new FileChunkID(fileID.toString(), chunkNo);
|
388 |
System.out.println("\t FILEID : " + fileID.toString() + "\n" |
389 |
+ "\t CHUNK NO : " + chunkNo+ "\n"); |
390 |
System.out.println("\t PUT CHUNK: " + fileID.toString() + " with Replication Degree : " + replicationDegree + " in " + dir.getPath()); |
391 |
System.out.println("\t Chunk ID: " + chunkID.toString()); |
392 |
|
393 |
|
394 |
|
395 |
|
396 |
|
397 |
File chunkfile = new File(dir.getPath()+"/"+ chunkID.toString()); |
398 |
|
399 |
|
400 |
|
401 |
|
402 |
|
403 |
Peer.getMDBListener().countPutChunk(chunkID, String.valueOf(senderID));
|
404 |
|
405 |
if(chunkfile.exists()) {
|
406 |
System.out.println("\n\t I already have this chunk, sending STORED..."); |
407 |
Broker.sendSTORED(chunkID); |
408 |
} else {
|
409 |
|
410 |
if (Peer.getDisk().saveFile(packet_body.length)){
|
411 |
|
412 |
try {
|
413 |
FileOutputStream out = new FileOutputStream(dir.getPath()+"/" + chunkID.toString()); |
414 |
System.out.println("\n\t Saving Chunk...\n"); |
415 |
out.write(packet_body); |
416 |
out.close(); |
417 |
|
418 |
Peer.getDb().insertFile(fileID); |
419 |
Peer.getDb().insertChunkInfo(fileID, replicationDegree, chunkNo, Peer.getID()); |
420 |
|
421 |
try{
|
422 |
System.out.println("\tSending STORED response..."); |
423 |
Thread.sleep((long)(Math.random() * MAX_WAITING_TIME)); |
424 |
} catch (InterruptedException ie){ |
425 |
ie.printStackTrace(); |
426 |
} |
427 |
|
428 |
Broker.sendSTORED(chunkID); |
429 |
|
430 |
} catch (IOException e) { |
431 |
e.printStackTrace(); |
432 |
} |
433 |
|
434 |
} |
435 |
|
436 |
|
437 |
} |
438 |
|
439 |
|
440 |
|
441 |
|
442 |
|
443 |
} |
444 |
|
445 |
private void parseBody() { |
446 |
|
447 |
ByteArrayInputStream stream = new ByteArrayInputStream(packetToHandle.getData()); |
448 |
BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); |
449 |
|
450 |
String line = null; |
451 |
int totalHeaderLinesLength = 0; |
452 |
int totalLines = 0; |
453 |
|
454 |
do {
|
455 |
try {
|
456 |
line = reader.readLine(); |
457 |
totalHeaderLinesLength += line.length(); |
458 |
totalLines++; |
459 |
} catch (IOException e) { |
460 |
e.printStackTrace(); |
461 |
} |
462 |
} while (!(line != null && line.isEmpty())); |
463 |
|
464 |
bodyStartingIndex = totalHeaderLinesLength + (totalLines * CRLF.getBytes().length); |
465 |
|
466 |
packet_body = Arrays.copyOfRange(packetToHandle.getData(), bodyStartingIndex,
|
467 |
packetToHandle.getLength()); |
468 |
|
469 |
//System.out.println("STARTING INDEX:" + bodyStartingIndex);
|
470 |
} |
471 |
|
472 |
|
473 |
|
474 |
|
475 |
|
476 |
} |