Project

General

Profile

Statistics
| Revision:

root / project / src / main / java / service / PacketHandler.java @ 1

History | View | Annotate | Download (13.2 KB)

1
package main.java.service;
2

    
3

    
4
import main.java.file.FileChunk;
5
import main.java.file.FileChunkID;
6
import main.java.file.FileID;
7
import main.java.listeners.Broker;
8
import main.java.peer.Peer;
9
import main.java.protocols.Backup;
10
import main.java.protocols.BackupChunk;
11
import main.java.utils.Constants.*;
12

    
13
import java.io.*;
14
import java.net.DatagramPacket;
15
import java.net.InetAddress;
16
import java.util.ArrayList;
17
import java.util.Arrays;
18
import java.util.List;
19

    
20
import static main.java.utils.Constants.*;
21
import static main.java.utils.Utilities.getLocalAddress;
22
import static main.java.utils.Utilities.sha256;
23

    
24
public class PacketHandler implements Runnable {
25

    
26
    private DatagramPacket packetToHandle;
27
    private String packet_header;
28
    private byte[] packet_body;
29
    private String subprotocol;
30
    private FileID fileID;
31
    private float protocolVersion;
32
    private int replicationDegree;
33
    private int chunkNo;
34
    private int senderID;
35
    private int bodyStartingIndex;
36
    private String[] header_splitted;
37

    
38
    private InetAddress senderIP;
39

    
40
    private boolean listeningChunk;
41
    private boolean receivedChunk;
42
    private boolean badMessage;
43

    
44

    
45
    public PacketHandler(DatagramPacket packetToHandle) {
46
        this.packetToHandle = packetToHandle;
47
        packet_header = null;
48
        packet_body = null;
49
        header_splitted = null;
50
        senderIP = packetToHandle.getAddress();
51
        listeningChunk = false;
52
        receivedChunk = false;
53
        badMessage = false;
54
    }
55

    
56
    @Override
57
    public void run() {
58

    
59
        parseSubProtocol();
60
        //parseHeader();
61

    
62

    
63
        if(Integer.parseInt(header_splitted[2]) != Peer.getID()){
64

    
65
            switch (subprotocol) {
66
                case PUTCHUNK:
67
                    parsePUTCHUNK();
68
                    PUTCHUNKHandler();
69
                    break;
70
                case STORED:
71
                    parseSTORED();
72
                    STOREDHandler();
73
                    break;
74
                case GETCHUNK:
75
                    parseGETCHUNK();
76
                    GETCHUNKHandler();
77
                    break;
78
                case CHUNK:
79
                    parseCHUNK();
80
                    CHUNKHandler();
81
                    break;
82
                case DELETE:
83
                    parseDELETE();
84
                    DELETEHandler();
85
                    break;
86
                case REMOVED:
87
                    parseREMOVED();
88
                    REMOVEDHandler();
89
                    break;
90

    
91
                default: System.out.println("Unknown protocol. Ignoring message... " + subprotocol);
92
                    badMessage = true;
93
                    break;
94

    
95
            }
96

    
97
            if(!badMessage)
98
                System.out.println("\t Sender ID: " + packetToHandle.getAddress() + " \n" +
99
                        "\t PEER ID : " + senderID + "\n");
100

    
101

    
102
        }
103

    
104

    
105

    
106
    }
107

    
108
    private void parseSubProtocol() {
109

    
110
        ByteArrayInputStream stream = new ByteArrayInputStream(packetToHandle.getData());
111
        BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
112

    
113
        try {
114
            packet_header = reader.readLine();
115
            header_splitted = packet_header.split(" ");
116
            subprotocol = header_splitted[0];
117
        } catch (IOException e) {
118
            e.printStackTrace();
119
        }
120
    }
121

    
122

    
123
    private void parseREMOVED() {
124

    
125
        protocolVersion = Float.parseFloat(header_splitted[1]);
126
        senderID = Integer.parseInt(header_splitted[2]);
127
        fileID = new FileID(sha256(header_splitted[3]), -1);
128
        chunkNo = Integer.parseInt(header_splitted[4]);
129

    
130

    
131

    
132
    }
133

    
134
    private void REMOVEDHandler() {
135

    
136

    
137
        if(Peer.getDb().isFileStored(fileID)){
138
            FileChunkID chunkID = new FileChunkID(fileID.toString(), chunkNo);
139
            Peer.getDb().decreasePerceivedRepDeg(chunkID , senderID);
140

    
141
            System.out.println("HANDLER REMOVED: "+ Peer.getDb().getPerceivedRepDeg(chunkID) + "/" +
142
                    Peer.getDb().getDesiredRepDeg(chunkID));
143

    
144

    
145
            if(Peer.getDb().getPerceivedRepDeg(chunkID) < Peer.getDb().getDesiredRepDeg(chunkID)){
146
                System.out.println("\tPerceived rep degree dropped below desired rep degree.");
147

    
148
                Peer.getMCListener().startCountingPutChunks(chunkID);
149

    
150
                try{
151
                    System.out.println("\tWaiting for Putchunk messages...");
152
                    Thread.sleep((long)(Math.random() * MAX_WAITING_TIME));
153
                } catch (InterruptedException ie){
154
                    ie.printStackTrace();
155
                }
156

    
157
                int putChunksReceived = Peer.getMCListener().getCountPutChunks(chunkID);
158

    
159
                Peer.getMCListener().stopSavingPutChunks(chunkID);
160

    
161
                if (putChunksReceived == 0){
162

    
163
                    try {
164
                        File cFile = new File("peer"+Peer.getID()+"/Backup/"+
165
                                fileID.toString().split("\\.")[0]+ "/"+chunkID.toString());
166

    
167

    
168

    
169

    
170
                        byte[] data = Backup.loadFileData(cFile);
171

    
172
                        //FileChunk(int replicationDegree, int chunkNo, FileID fileID, byte[] chunkData)
173
                        fileID = new FileID(fileID.toString(), Peer.getDb().getDesiredRepDeg(chunkID));
174
                        FileChunk fChunk = new FileChunk(Peer.getDb().getDesiredRepDeg(chunkID), chunkNo,
175
                                fileID, data);
176

    
177
                        new Thread(new BackupChunk(fChunk)).start();
178
                    } catch (FileNotFoundException e) {
179
                        e.printStackTrace();
180
                    }
181

    
182

    
183
                }
184

    
185

    
186
            }
187

    
188
        }
189

    
190

    
191

    
192

    
193

    
194

    
195
    }
196

    
197
    private void parseDELETE() {
198

    
199

    
200
        protocolVersion = Float.parseFloat(header_splitted[1]);
201
        senderID = Integer.parseInt(header_splitted[2]);
202
        fileID = new FileID(sha256(header_splitted[3]), -1);
203

    
204
    }
205

    
206
    private void DELETEHandler() {
207

    
208

    
209

    
210
        final File folder = new File("peer"+Peer.getID()+"/Backup/"+fileID.toString()+
211
                "/");
212

    
213
        System.out.println("FOLDER: " + folder.getPath());
214

    
215
        final File[] files = folder.listFiles( new FilenameFilter() {
216
            @Override
217
            public boolean accept( final File dir,
218
                                   final String name ) {
219
                return name.matches( fileID.toString() + "-.*" );
220
            }
221
        } );
222
        for ( final File file : files ) {
223
            if ( !file.delete() ) {
224
                System.err.println( "Can't remove " + file.getAbsolutePath() );
225
            }
226
            else {
227
                Peer.getDisk().removeFile(file.length());
228
                Peer.getDb().removeChunkInfo(new FileChunkID(fileID.toString(),
229
                        Integer.parseInt(file.getName().split("-")[1])));
230

    
231
                Peer.getDb().removeFile(fileID);
232
            }
233
        }
234

    
235

    
236

    
237

    
238

    
239
    }
240

    
241
    private void parseCHUNK() {
242

    
243
        //CHUNK <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF><Body>
244
        if(listeningChunk)
245
            receivedChunk=true;
246

    
247
        protocolVersion = Float.parseFloat(header_splitted[1]);
248
        senderID = Integer.parseInt(header_splitted[2]);
249
        fileID = new FileID(header_splitted[3], -1);
250
        chunkNo = Integer.parseInt(header_splitted[4]);
251
        parseBody();
252

    
253
    }
254

    
255
    private void CHUNKHandler() {
256

    
257

    
258
        if(Peer.restoring){
259
            FileChunk chunk = new FileChunk(-1, chunkNo, fileID, packet_body);
260

    
261

    
262
            Peer.getMDRListener().queueChunk(chunk);
263
            //System.out.println("\t CHUNKS RECEIVED : ");
264
            //Peer.getMDRListener().printChunksReceived();
265
        }
266

    
267

    
268
    }
269

    
270
    private void parseGETCHUNK() {
271

    
272
        /* GETCHUNK <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF> */
273

    
274
        protocolVersion = Float.parseFloat(header_splitted[1]);
275
        senderID = Integer.parseInt(header_splitted[2]);
276
        fileID = new FileID(header_splitted[3], -1);
277
        chunkNo = Integer.parseInt(header_splitted[4]);
278

    
279

    
280
    }
281

    
282
    private void GETCHUNKHandler() {
283

    
284

    
285
        FileChunkID chunkID = new FileChunkID(fileID.toString(), chunkNo);
286
        FileInputStream is = null;
287
        File chunkFile = new File("peer"+Peer.getID()+"/Backup/"+fileID.toString().split("\\.")[0]+ "/" + chunkID);
288
        try {
289
            is  = new FileInputStream(chunkFile);
290
        } catch (FileNotFoundException e) {
291
            e.printStackTrace();
292
        }
293

    
294
        byte[] chunkData = new byte[(int) chunkFile.length()];
295

    
296
        try {
297
            assert is != null;
298
            is.read(chunkData);
299
        } catch (IOException e) {
300
            e.printStackTrace();
301
        }
302
        try {
303
            is.close();
304
        } catch (IOException e) {
305
            e.printStackTrace();
306
        }
307

    
308

    
309
        FileChunk chunk = new FileChunk(-1, chunkID.getChunkNumber(), fileID, chunkData);
310

    
311
        /*
312

313
        To avoid flooding the host with CHUNK messages, each peer shall wait for a random time
314
        uniformly distributed between 0 and 400 ms, before sending the CHUNK message. If it receives
315
        a CHUNK message before that time expires, it will not send the CHUNK message.
316

317
         */
318

    
319
        try {
320
            listeningChunk = true;
321
            Thread.sleep((long)(Math.random() * MAX_WAITING_TIME));
322
        } catch (InterruptedException e) {
323
            e.printStackTrace();
324
        }
325

    
326
        if(!receivedChunk)
327
            Broker.sendCHUNK(chunk, chunkID);
328

    
329
        receivedChunk = false;
330
        listeningChunk = false;
331

    
332

    
333
    }
334

    
335
    private void parseSTORED() {
336
        //STORED <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF>
337

    
338
        protocolVersion = Float.parseFloat(header_splitted[1]);
339
        senderID = Integer.parseInt(header_splitted[2]);
340
        fileID = new FileID(header_splitted[3], -1);
341
        chunkNo = Integer.parseInt(header_splitted[4]);
342

    
343
    }
344

    
345
    private void STOREDHandler() {
346

    
347
        FileChunkID fcID = new FileChunkID(fileID.toString(), chunkNo);
348
        Peer.getMCListener().countStored(fcID, String.valueOf(senderID));
349
        Peer.getDb().increasePerceivedRepDeg(new FileChunkID(fileID.toString(), chunkNo), senderID);
350

    
351
    }
352

    
353

    
354
    private void parsePUTCHUNK() {
355

    
356

    
357
        replicationDegree = Integer.parseInt(header_splitted[5]);
358
        protocolVersion = Float.parseFloat(header_splitted[1]);
359
        senderID = Integer.parseInt(header_splitted[2]);
360
        fileID = new FileID(header_splitted[3], replicationDegree);
361
        chunkNo = Integer.parseInt(header_splitted[4]);
362
        parseBody();
363

    
364
    }
365

    
366
    private void PUTCHUNKHandler() {
367

    
368

    
369

    
370
        File dir = new File("peer"+Peer.getID()+"/Backup/"+fileID.toString().split("\\.")[0]+ "/");
371

    
372
        if(!dir.exists()){
373

    
374
            System.out.println("creating directory: " + dir.getName());
375

    
376
            try{
377
                dir.mkdirs();
378
            }
379
            catch(SecurityException se){
380
                se.printStackTrace();
381
            }
382

    
383
        }
384

    
385

    
386

    
387
        FileChunkID chunkID = new FileChunkID(fileID.toString(), chunkNo);
388
        System.out.println("\t FILEID : " + fileID.toString() + "\n"
389
                + "\t CHUNK NO : " + chunkNo+ "\n");
390
        System.out.println("\t PUT CHUNK: " + fileID.toString() + " with Replication Degree : " + replicationDegree  + " in " + dir.getPath());
391
        System.out.println("\t Chunk ID: " + chunkID.toString());
392

    
393

    
394

    
395

    
396

    
397
        File chunkfile = new File(dir.getPath()+"/"+ chunkID.toString());
398

    
399

    
400

    
401

    
402

    
403
        Peer.getMDBListener().countPutChunk(chunkID, String.valueOf(senderID));
404

    
405
        if(chunkfile.exists()) {
406
            System.out.println("\n\t I already have this chunk, sending STORED...");
407
            Broker.sendSTORED(chunkID);
408
        } else {
409

    
410
            if (Peer.getDisk().saveFile(packet_body.length)){
411

    
412
                try {
413
                    FileOutputStream out = new FileOutputStream(dir.getPath()+"/" + chunkID.toString());
414
                    System.out.println("\n\t Saving Chunk...\n");
415
                    out.write(packet_body);
416
                    out.close();
417

    
418
                    Peer.getDb().insertFile(fileID);
419
                    Peer.getDb().insertChunkInfo(fileID, replicationDegree, chunkNo, Peer.getID());
420

    
421
                    try{
422
                        System.out.println("\tSending STORED response...");
423
                        Thread.sleep((long)(Math.random() * MAX_WAITING_TIME));
424
                    } catch (InterruptedException ie){
425
                        ie.printStackTrace();
426
                    }
427

    
428
                    Broker.sendSTORED(chunkID);
429

    
430
                } catch (IOException e) {
431
                    e.printStackTrace();
432
                }
433

    
434
            }
435

    
436

    
437
        }
438

    
439

    
440

    
441

    
442

    
443
    }
444

    
445
    private void parseBody() {
446

    
447
        ByteArrayInputStream stream = new ByteArrayInputStream(packetToHandle.getData());
448
        BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
449

    
450
        String line = null;
451
        int totalHeaderLinesLength = 0;
452
        int totalLines = 0;
453

    
454
        do {
455
            try {
456
                line = reader.readLine();
457
                totalHeaderLinesLength += line.length();
458
                totalLines++;
459
            } catch (IOException e) {
460
                e.printStackTrace();
461
            }
462
        } while (!(line != null && line.isEmpty()));
463

    
464
        bodyStartingIndex = totalHeaderLinesLength + (totalLines * CRLF.getBytes().length);
465

    
466
        packet_body = Arrays.copyOfRange(packetToHandle.getData(), bodyStartingIndex,
467
                packetToHandle.getLength());
468

    
469
        //System.out.println("STARTING INDEX:"  + bodyStartingIndex);
470
    }
471

    
472

    
473

    
474

    
475

    
476
}