Project

General

Profile

Statistics
| Revision:

root / project / src / main / java / service / PacketHandler.java @ 1

History | View | Annotate | Download (13.2 KB)

1 1 up20120064
package main.java.service;
2
3
4
import main.java.file.FileChunk;
5
import main.java.file.FileChunkID;
6
import main.java.file.FileID;
7
import main.java.listeners.Broker;
8
import main.java.peer.Peer;
9
import main.java.protocols.Backup;
10
import main.java.protocols.BackupChunk;
11
import main.java.utils.Constants.*;
12
13
import java.io.*;
14
import java.net.DatagramPacket;
15
import java.net.InetAddress;
16
import java.util.ArrayList;
17
import java.util.Arrays;
18
import java.util.List;
19
20
import static main.java.utils.Constants.*;
21
import static main.java.utils.Utilities.getLocalAddress;
22
import static main.java.utils.Utilities.sha256;
23
24
public class PacketHandler implements Runnable {
25
26
    private DatagramPacket packetToHandle;
27
    private String packet_header;
28
    private byte[] packet_body;
29
    private String subprotocol;
30
    private FileID fileID;
31
    private float protocolVersion;
32
    private int replicationDegree;
33
    private int chunkNo;
34
    private int senderID;
35
    private int bodyStartingIndex;
36
    private String[] header_splitted;
37
38
    private InetAddress senderIP;
39
40
    private boolean listeningChunk;
41
    private boolean receivedChunk;
42
    private boolean badMessage;
43
44
45
    public PacketHandler(DatagramPacket packetToHandle) {
46
        this.packetToHandle = packetToHandle;
47
        packet_header = null;
48
        packet_body = null;
49
        header_splitted = null;
50
        senderIP = packetToHandle.getAddress();
51
        listeningChunk = false;
52
        receivedChunk = false;
53
        badMessage = false;
54
    }
55
56
    @Override
57
    public void run() {
58
59
        parseSubProtocol();
60
        //parseHeader();
61
62
63
        if(Integer.parseInt(header_splitted[2]) != Peer.getID()){
64
65
            switch (subprotocol) {
66
                case PUTCHUNK:
67
                    parsePUTCHUNK();
68
                    PUTCHUNKHandler();
69
                    break;
70
                case STORED:
71
                    parseSTORED();
72
                    STOREDHandler();
73
                    break;
74
                case GETCHUNK:
75
                    parseGETCHUNK();
76
                    GETCHUNKHandler();
77
                    break;
78
                case CHUNK:
79
                    parseCHUNK();
80
                    CHUNKHandler();
81
                    break;
82
                case DELETE:
83
                    parseDELETE();
84
                    DELETEHandler();
85
                    break;
86
                case REMOVED:
87
                    parseREMOVED();
88
                    REMOVEDHandler();
89
                    break;
90
91
                default: System.out.println("Unknown protocol. Ignoring message... " + subprotocol);
92
                    badMessage = true;
93
                    break;
94
95
            }
96
97
            if(!badMessage)
98
                System.out.println("\t Sender ID: " + packetToHandle.getAddress() + " \n" +
99
                        "\t PEER ID : " + senderID + "\n");
100
101
102
        }
103
104
105
106
    }
107
108
    private void parseSubProtocol() {
109
110
        ByteArrayInputStream stream = new ByteArrayInputStream(packetToHandle.getData());
111
        BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
112
113
        try {
114
            packet_header = reader.readLine();
115
            header_splitted = packet_header.split(" ");
116
            subprotocol = header_splitted[0];
117
        } catch (IOException e) {
118
            e.printStackTrace();
119
        }
120
    }
121
122
123
    private void parseREMOVED() {
124
125
        protocolVersion = Float.parseFloat(header_splitted[1]);
126
        senderID = Integer.parseInt(header_splitted[2]);
127
        fileID = new FileID(sha256(header_splitted[3]), -1);
128
        chunkNo = Integer.parseInt(header_splitted[4]);
129
130
131
132
    }
133
134
    private void REMOVEDHandler() {
135
136
137
        if(Peer.getDb().isFileStored(fileID)){
138
            FileChunkID chunkID = new FileChunkID(fileID.toString(), chunkNo);
139
            Peer.getDb().decreasePerceivedRepDeg(chunkID , senderID);
140
141
            System.out.println("HANDLER REMOVED: "+ Peer.getDb().getPerceivedRepDeg(chunkID) + "/" +
142
                    Peer.getDb().getDesiredRepDeg(chunkID));
143
144
145
            if(Peer.getDb().getPerceivedRepDeg(chunkID) < Peer.getDb().getDesiredRepDeg(chunkID)){
146
                System.out.println("\tPerceived rep degree dropped below desired rep degree.");
147
148
                Peer.getMCListener().startCountingPutChunks(chunkID);
149
150
                try{
151
                    System.out.println("\tWaiting for Putchunk messages...");
152
                    Thread.sleep((long)(Math.random() * MAX_WAITING_TIME));
153
                } catch (InterruptedException ie){
154
                    ie.printStackTrace();
155
                }
156
157
                int putChunksReceived = Peer.getMCListener().getCountPutChunks(chunkID);
158
159
                Peer.getMCListener().stopSavingPutChunks(chunkID);
160
161
                if (putChunksReceived == 0){
162
163
                    try {
164
                        File cFile = new File("peer"+Peer.getID()+"/Backup/"+
165
                                fileID.toString().split("\\.")[0]+ "/"+chunkID.toString());
166
167
168
169
170
                        byte[] data = Backup.loadFileData(cFile);
171
172
                        //FileChunk(int replicationDegree, int chunkNo, FileID fileID, byte[] chunkData)
173
                        fileID = new FileID(fileID.toString(), Peer.getDb().getDesiredRepDeg(chunkID));
174
                        FileChunk fChunk = new FileChunk(Peer.getDb().getDesiredRepDeg(chunkID), chunkNo,
175
                                fileID, data);
176
177
                        new Thread(new BackupChunk(fChunk)).start();
178
                    } catch (FileNotFoundException e) {
179
                        e.printStackTrace();
180
                    }
181
182
183
                }
184
185
186
            }
187
188
        }
189
190
191
192
193
194
195
    }
196
197
    private void parseDELETE() {
198
199
200
        protocolVersion = Float.parseFloat(header_splitted[1]);
201
        senderID = Integer.parseInt(header_splitted[2]);
202
        fileID = new FileID(sha256(header_splitted[3]), -1);
203
204
    }
205
206
    private void DELETEHandler() {
207
208
209
210
        final File folder = new File("peer"+Peer.getID()+"/Backup/"+fileID.toString()+
211
                "/");
212
213
        System.out.println("FOLDER: " + folder.getPath());
214
215
        final File[] files = folder.listFiles( new FilenameFilter() {
216
            @Override
217
            public boolean accept( final File dir,
218
                                   final String name ) {
219
                return name.matches( fileID.toString() + "-.*" );
220
            }
221
        } );
222
        for ( final File file : files ) {
223
            if ( !file.delete() ) {
224
                System.err.println( "Can't remove " + file.getAbsolutePath() );
225
            }
226
            else {
227
                Peer.getDisk().removeFile(file.length());
228
                Peer.getDb().removeChunkInfo(new FileChunkID(fileID.toString(),
229
                        Integer.parseInt(file.getName().split("-")[1])));
230
231
                Peer.getDb().removeFile(fileID);
232
            }
233
        }
234
235
236
237
238
239
    }
240
241
    private void parseCHUNK() {
242
243
        //CHUNK <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF><Body>
244
        if(listeningChunk)
245
            receivedChunk=true;
246
247
        protocolVersion = Float.parseFloat(header_splitted[1]);
248
        senderID = Integer.parseInt(header_splitted[2]);
249
        fileID = new FileID(header_splitted[3], -1);
250
        chunkNo = Integer.parseInt(header_splitted[4]);
251
        parseBody();
252
253
    }
254
255
    private void CHUNKHandler() {
256
257
258
        if(Peer.restoring){
259
            FileChunk chunk = new FileChunk(-1, chunkNo, fileID, packet_body);
260
261
262
            Peer.getMDRListener().queueChunk(chunk);
263
            //System.out.println("\t CHUNKS RECEIVED : ");
264
            //Peer.getMDRListener().printChunksReceived();
265
        }
266
267
268
    }
269
270
    private void parseGETCHUNK() {
271
272
        /* GETCHUNK <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF> */
273
274
        protocolVersion = Float.parseFloat(header_splitted[1]);
275
        senderID = Integer.parseInt(header_splitted[2]);
276
        fileID = new FileID(header_splitted[3], -1);
277
        chunkNo = Integer.parseInt(header_splitted[4]);
278
279
280
    }
281
282
    private void GETCHUNKHandler() {
283
284
285
        FileChunkID chunkID = new FileChunkID(fileID.toString(), chunkNo);
286
        FileInputStream is = null;
287
        File chunkFile = new File("peer"+Peer.getID()+"/Backup/"+fileID.toString().split("\\.")[0]+ "/" + chunkID);
288
        try {
289
            is  = new FileInputStream(chunkFile);
290
        } catch (FileNotFoundException e) {
291
            e.printStackTrace();
292
        }
293
294
        byte[] chunkData = new byte[(int) chunkFile.length()];
295
296
        try {
297
            assert is != null;
298
            is.read(chunkData);
299
        } catch (IOException e) {
300
            e.printStackTrace();
301
        }
302
        try {
303
            is.close();
304
        } catch (IOException e) {
305
            e.printStackTrace();
306
        }
307
308
309
        FileChunk chunk = new FileChunk(-1, chunkID.getChunkNumber(), fileID, chunkData);
310
311
        /*
312

313
        To avoid flooding the host with CHUNK messages, each peer shall wait for a random time
314
        uniformly distributed between 0 and 400 ms, before sending the CHUNK message. If it receives
315
        a CHUNK message before that time expires, it will not send the CHUNK message.
316

317
         */
318
319
        try {
320
            listeningChunk = true;
321
            Thread.sleep((long)(Math.random() * MAX_WAITING_TIME));
322
        } catch (InterruptedException e) {
323
            e.printStackTrace();
324
        }
325
326
        if(!receivedChunk)
327
            Broker.sendCHUNK(chunk, chunkID);
328
329
        receivedChunk = false;
330
        listeningChunk = false;
331
332
333
    }
334
335
    private void parseSTORED() {
336
        //STORED <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF>
337
338
        protocolVersion = Float.parseFloat(header_splitted[1]);
339
        senderID = Integer.parseInt(header_splitted[2]);
340
        fileID = new FileID(header_splitted[3], -1);
341
        chunkNo = Integer.parseInt(header_splitted[4]);
342
343
    }
344
345
    private void STOREDHandler() {
346
347
        FileChunkID fcID = new FileChunkID(fileID.toString(), chunkNo);
348
        Peer.getMCListener().countStored(fcID, String.valueOf(senderID));
349
        Peer.getDb().increasePerceivedRepDeg(new FileChunkID(fileID.toString(), chunkNo), senderID);
350
351
    }
352
353
354
    private void parsePUTCHUNK() {
355
356
357
        replicationDegree = Integer.parseInt(header_splitted[5]);
358
        protocolVersion = Float.parseFloat(header_splitted[1]);
359
        senderID = Integer.parseInt(header_splitted[2]);
360
        fileID = new FileID(header_splitted[3], replicationDegree);
361
        chunkNo = Integer.parseInt(header_splitted[4]);
362
        parseBody();
363
364
    }
365
366
    private void PUTCHUNKHandler() {
367
368
369
370
        File dir = new File("peer"+Peer.getID()+"/Backup/"+fileID.toString().split("\\.")[0]+ "/");
371
372
        if(!dir.exists()){
373
374
            System.out.println("creating directory: " + dir.getName());
375
376
            try{
377
                dir.mkdirs();
378
            }
379
            catch(SecurityException se){
380
                se.printStackTrace();
381
            }
382
383
        }
384
385
386
387
        FileChunkID chunkID = new FileChunkID(fileID.toString(), chunkNo);
388
        System.out.println("\t FILEID : " + fileID.toString() + "\n"
389
                + "\t CHUNK NO : " + chunkNo+ "\n");
390
        System.out.println("\t PUT CHUNK: " + fileID.toString() + " with Replication Degree : " + replicationDegree  + " in " + dir.getPath());
391
        System.out.println("\t Chunk ID: " + chunkID.toString());
392
393
394
395
396
397
        File chunkfile = new File(dir.getPath()+"/"+ chunkID.toString());
398
399
400
401
402
403
        Peer.getMDBListener().countPutChunk(chunkID, String.valueOf(senderID));
404
405
        if(chunkfile.exists()) {
406
            System.out.println("\n\t I already have this chunk, sending STORED...");
407
            Broker.sendSTORED(chunkID);
408
        } else {
409
410
            if (Peer.getDisk().saveFile(packet_body.length)){
411
412
                try {
413
                    FileOutputStream out = new FileOutputStream(dir.getPath()+"/" + chunkID.toString());
414
                    System.out.println("\n\t Saving Chunk...\n");
415
                    out.write(packet_body);
416
                    out.close();
417
418
                    Peer.getDb().insertFile(fileID);
419
                    Peer.getDb().insertChunkInfo(fileID, replicationDegree, chunkNo, Peer.getID());
420
421
                    try{
422
                        System.out.println("\tSending STORED response...");
423
                        Thread.sleep((long)(Math.random() * MAX_WAITING_TIME));
424
                    } catch (InterruptedException ie){
425
                        ie.printStackTrace();
426
                    }
427
428
                    Broker.sendSTORED(chunkID);
429
430
                } catch (IOException e) {
431
                    e.printStackTrace();
432
                }
433
434
            }
435
436
437
        }
438
439
440
441
442
443
    }
444
445
    private void parseBody() {
446
447
        ByteArrayInputStream stream = new ByteArrayInputStream(packetToHandle.getData());
448
        BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
449
450
        String line = null;
451
        int totalHeaderLinesLength = 0;
452
        int totalLines = 0;
453
454
        do {
455
            try {
456
                line = reader.readLine();
457
                totalHeaderLinesLength += line.length();
458
                totalLines++;
459
            } catch (IOException e) {
460
                e.printStackTrace();
461
            }
462
        } while (!(line != null && line.isEmpty()));
463
464
        bodyStartingIndex = totalHeaderLinesLength + (totalLines * CRLF.getBytes().length);
465
466
        packet_body = Arrays.copyOfRange(packetToHandle.getData(), bodyStartingIndex,
467
                packetToHandle.getLength());
468
469
        //System.out.println("STARTING INDEX:"  + bodyStartingIndex);
470
    }
471
472
473
474
475
476
}