Project

General

Profile

Statistics
| Revision:

root / proj / src / Peer.java @ 2

History | View | Annotate | Download (14.5 KB)

1 1 up20160559
2
import java.io.BufferedInputStream;
3
import java.io.BufferedWriter;
4 2 up20160559
import java.io.ByteArrayInputStream;
5
import java.io.ByteArrayOutputStream;
6 1 up20160559
import java.io.File;
7
import java.io.FileInputStream;
8
import java.io.FileNotFoundException;
9
import java.io.FileOutputStream;
10
import java.io.FileWriter;
11
import java.io.IOException;
12
import java.io.InputStream;
13
import java.io.ObjectOutputStream;
14
import java.io.OutputStream;
15 2 up20160559
import java.io.StringReader;
16 1 up20160559
import java.net.UnknownHostException;
17 2 up20160559
import java.nio.ByteBuffer;
18
import java.nio.channels.FileChannel;
19
import java.nio.charset.Charset;
20 1 up20160559
import java.nio.charset.StandardCharsets;
21
import java.nio.file.Files;
22
import java.nio.file.Paths;
23
import java.rmi.registry.LocateRegistry;
24
import java.rmi.registry.Registry;
25
import java.rmi.server.UnicastRemoteObject;
26
import java.security.MessageDigest;
27
import java.util.ArrayList;
28 2 up20160559
import java.util.Arrays;
29 1 up20160559
import java.util.concurrent.TimeUnit;
30
31
32
33
34
public class Peer implements RMI {
35
36
37
        //final static int PORT = 8888;
38
        private static String version;
39
        private static int server_id;
40
        private static int peerID;
41
        private static String mcIp = "224.0.0.3";
42
        private static int mcPort = 8888;
43
        private static String mdbIp;
44
        private static int mdbPort;
45
        private static String mdrIp;
46
        private static int mdrPort;
47
48
        private static Channel mc;
49
        private static Channel mdb;
50
        private static Channel mdr;
51
        private boolean lastChunk = false;
52
53
54
55
        private static StorageSystem storage;
56
57
        private int chunkIterator = 0;
58
        private Peer(String mcIp, int mcPort, String mdbIp, int mdbPort, String mdrIp, int mdrPort) throws IOException {
59
                mc = new Channel(mcIp, mcPort, this);
60
                mdb = new Channel(mdbIp, mdbPort, this);
61
                mdr = new Channel(mdrIp,mdrPort, this);
62
63
                new Thread(mc).start();
64
                new Thread(mdb).start();
65
                new Thread(mdr).start();
66
        }
67
68
69
        public static void main(String[] args) throws UnknownHostException, InterruptedException {
70
71
                setVersion(args[0]);
72
                server_id = Integer.parseInt(args[1]);
73
                peerID = Integer.parseInt(args[2]);
74
                mcIp = args[3];
75
                mcPort = Integer.parseInt(args[4]);
76
                mdbIp = args[5];
77
                mdbPort = Integer.parseInt(args[6]);
78
                mdrIp = args[7];
79
                mdrPort = Integer.parseInt(args[8]);
80
81
                storage = new StorageSystem(peerID);
82
83
84
                try {
85
86
                        Peer obj = new Peer(mcIp, mcPort, mdbIp, mcPort, mdrIp, mcPort);
87
88
                        RMI stub = (RMI) UnicastRemoteObject.exportObject(obj, 0);
89
90
                        // Binding the remote object (stub) in the registry
91
                        Registry registry = LocateRegistry.getRegistry();
92
                        registry.rebind(args[2], stub);
93
94
95
                        System.err.println("Peer ready");
96
97
98
99
100
                } catch (Exception e) {
101
                        System.err.println("Peer exception: " + e.toString());
102
                        e.printStackTrace();
103
                }
104
        }
105
106
107 2 up20160559
108 1 up20160559
        public void operation(String operation, String file_path, int rep_degree, double space) { //operator is space for reclaim, rep_degree for back up
109
110
111
                if(operation.equals("BACKUP"))
112
                {
113
114
                        try {
115
                                initiateBackup(file_path, rep_degree);
116
                        } catch (InterruptedException e) {
117
                                // TODO Auto-generated catch block
118
                                e.printStackTrace();
119
                        }
120
121
                }
122
                else if(operation.equals("RESTORE"))
123
                {
124
                        try {
125
                                initiateRestore(file_path);
126
                        } catch (IOException e) {
127
                                // TODO Auto-generated catch block
128
                                e.printStackTrace();
129
                        }
130
131
                }
132
                else if(operation.equals("DELETE"))
133
                {
134
                        try {
135
                                initiateDelete(file_path);
136
                        } catch (Exception e) {
137
                                // TODO Auto-generated catch block
138
                                e.printStackTrace();
139
                        }
140
141
                }
142 2 up20160559
                else if(operation.equals("RECLAIM"))
143
                {
144
                        initiateReclaim(space);
145 1 up20160559
                }
146 2 up20160559
                else if(operation.equals("STATE"))
147
                        System.out.print(state());
148
149 1 up20160559
150
151 2 up20160559
152 1 up20160559
        }
153
154
155
156
157
158
private void initiateReclaim(double space) {
159 2 up20160559
                      long before = storage.getUsed_storage();
160
        while(space < storage.getUsed_storage()) {
161
162
                //int size = storage.getChunks().get(0).getsize();
163
                String fileID = storage.getChunks().get(0).getFileID();
164
                int chunkN = storage.getChunks().get(0).getChunkN();
165
166
                storage.deleteChunkByFileIDAndChunkN(fileID, chunkN);
167
168 1 up20160559
169 2 up20160559
        Message msg = new Message("REMOVED", getVersion(), this.getPeerID(),fileID, chunkN, 0 , null);
170
        byte[] msgByte = msg.sendable();
171
172
        try {
173
            mc.sendMessage(msgByte);
174
        } catch (UnknownHostException e) {
175
            // TODO Auto-generated catch block
176 1 up20160559
                            e.printStackTrace();
177
                        }
178 2 up20160559
179 1 up20160559
180 2 up20160559
       String filename = "Peer"+this.getPeerID()+ "/"+ fileID + "/chk" + chunkN;
181
182
         File folder = new File(filename);
183
184
         if (folder.exists()) {
185
                 String[]entries = folder.list();
186
                 for(String s: entries){
187
                     File currentFile = new File(folder.getPath(),s);
188
                     currentFile.delete();
189
                 }
190
191
                    folder.delete();
192
193
                }
194
        }
195
196
        long reclaimed = before-storage.getUsed_storage();
197
         System.out.println("RECLAIMED " +reclaimed+"B of data.");
198
199
200
201 1 up20160559
202
    }
203
204
205
        private void initiateDelete(String file_path) {
206
    String hash = storage.lookUp(file_path);
207
    byte[] msgByte = null;
208
    Message msg;
209
    msg = new Message("DELETE", getVersion(), this.getPeerID(),hash, 0, 0, null);
210
    msgByte = msg.sendable();
211
    System.out.println(msg.messageToStringPrintable());
212
    try {
213
        mdr.sendMessage(msgByte);
214
215
216
        TimeUnit.SECONDS.sleep(1);
217
218
    } catch (Exception e) {
219
        // TODO Auto-generated catch block
220
        e.printStackTrace();
221
    }
222
223
        }
224
225
        public void lastChunk() {
226
                this.lastChunk = true;
227 2 up20160559
228 1 up20160559
        }
229
230 2 up20160559
private void initiateRestore(String file_path) throws IOException { //GETCHUNK <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF>
231 1 up20160559
232
                String hash = storage.lookUp(file_path);
233
                int chunkn = 0;
234
                byte[] msgByte = null;
235
                Message msg;
236
                        ///getChunksFromFile(hash);
237 2 up20160559
                boolean lastChunkAux = this.lastChunk;
238
                while(!lastChunkAux) {
239 1 up20160559
240
                        chunkn++;
241
242
                        msg = new Message("GETCHUNK", getVersion(), this.getPeerID(),hash,chunkn, 0, null);
243
                        msgByte = msg.sendable();
244
                        System.out.println(msg.messageToStringPrintable());
245
                        try {
246
                                mdr.sendMessage(msgByte);
247
248
249
250
                                TimeUnit.SECONDS.sleep(1);
251
252
                        } catch (Exception e) {
253
                                // TODO Auto-generated catch block
254
                                e.printStackTrace();
255
                        }
256
257 2 up20160559
                        lastChunkAux = this.lastChunk;
258 1 up20160559
                }
259
260
                String newfilename = "Peer"+this.getPeerID() + "/restore/"+file_path;
261
262
        File file = new File(newfilename);
263
264
265
266
        if (!file.exists()) {
267
            file.getParentFile().mkdirs();
268
            file.createNewFile();
269
        }
270
271
        FileOutputStream outputStream = new FileOutputStream(newfilename);
272
273
274
275
            for(int i = 0; i < storage.getChunks().size(); i++) {
276
                       if(storage.getChunks().get(i).getFileID().equals(hash)) {
277
278
                               outputStream.write(storage.getChunks().get(i).getContent());
279
280
                    }
281
            }
282
283
284
285
             outputStream.close();
286
287
            this.lastChunk = false;
288
        }
289
290
291 2 up20160559
public String state() {
292
        String s = "";
293
          s += "\n PEER "+this.getPeerID()+" HAS INITIATED PROTOCOL BACK UP ON THE FOLLOWING FILES: \n";
294
    for(FileInfo file: storage.getFileInfo()) {
295
            s+="File Pathname: "+file.getFilename()+"\n";
296
            s+="File ID: "+file.getFileID()+"\n";
297
            s+="Desired Replication Degree: "+file.getDesiredRepDeg()+"\n ";
298
            s+="Initiated Chunks: \n";
299
            for(BackUpInfo chunk: storage.getBackUps()) {
300
                    s+= "Chunk ID: "+ chunk.getFileID() + "chunk" + chunk.getChunkN()+"\n";
301
                    s+="Perceived Replication Degree: "+ storage.perceivedRepDeg(chunk.getFileID(), chunk.getChunkN())+"\n";
302
            }
303
    s+="\n PEER "+this.getPeerID()+" IS STORING THE FOLLOWING CHUNKS: \n";
304
            for(Chunk storedchunk: storage.getChunks()) {
305
                    s+="Chunk ID: "+ storedchunk.getChunkId()+"\n";
306
                    s+="Size: "+storedchunk.getsize()+"\n";
307
                     s+="Perceived Replication Degree: "+storedchunk.getRepDeg()+"\n";
308
            }
309
          s+="PEER STORAGE CAPACITY: " + storage.getSpace_available()+"\n";
310
311
    }
312
    return s;
313
}
314 1 up20160559
315
316
        private void initiateBackup(String file_path, int rep_degree) throws InterruptedException {
317
318
                File file = new File(file_path);
319
                String fileID = generateFileID(file);
320
                byte[] msgByte = null;
321
                boolean stored = false;
322
323
                try {
324 2 up20160559
                        storage.addFile(file_path, file.lastModified(), fileID, rep_degree);
325 1 up20160559
                        storage.serializeFileInfo();
326 2 up20160559
                        ArrayList<Chunk> chunks = splitIntoChunks(file, fileID, 64000);
327 1 up20160559
328
329 2 up20160559
                                for(int i = 0; i < chunks.size(); i++) {
330 1 up20160559
331 2 up20160559
                                        Message msg = new Message("PUTCHUNK", getVersion(), this.getPeerID(), chunks.get(i).getFileID(),chunks.get(i).getChunkN(), rep_degree,
332
                                                        chunks.get(i).getContent());
333 1 up20160559
                                        msgByte = msg.sendable();
334 2 up20160559
335 1 up20160559
                                        System.out.println(msg.messageToStringPrintable());
336
                                        mdb.sendMessage(msgByte);
337
338 2 up20160559
                                        for(int j = 0; j < 5 && !stored; j++) {
339 1 up20160559
340 2 up20160559
                                        TimeUnit.SECONDS.sleep(1+j);
341 1 up20160559
342 2 up20160559
                                        if(storedCheck(chunks.get(i).getChunkN(), chunks.get(i).getFileID()) >= rep_degree) {
343 1 up20160559
                                                stored = true;
344
                                        }
345
                                        else {
346
                                                mdb.sendMessage(msgByte);
347
                                                System.out.println(msg.messageToStringPrintable());
348
349
                                        }
350
                                        }
351
                                        stored = false;
352
353
354
                                }
355
356
357
358
                } catch (IOException e) {
359
                        System.err.println("IO Exception: " + e.toString());
360
                        e.printStackTrace();
361
                }
362
363
364
}
365
366 2 up20160559
        public int storedCheck(int chunkN, String fileID) {
367 1 up20160559
                int timesSaved = 0;
368
369
                for(BackUpInfo info : storage.getBackUps()) {
370
                        if(chunkN == info.getChunkN() && info.getFileID().equals(fileID))
371
                                timesSaved++;
372
                }
373
                return timesSaved;
374
        }
375
376
377
378
379
380
        public int getPeerID() {
381
                return peerID;
382
        }
383
384
        public Channel getMC() {
385
                return mc;
386
        }
387
388
        public Channel getMDB() {
389
                return mdb;
390
        }
391
392
        public Channel getMDR() {
393
                return mdr;
394
        }
395
396
397
398 2 up20160559
        public void deleteFileFolder(String hash) {
399
                 String foldername = "Peer"+this.getPeerID() + "/backup/"+hash;
400
401
                 File folder = new File(foldername);
402
403
                 if (folder.exists()) {
404
                         String[]entries = folder.list();
405
                         for(String s: entries){
406
                             File currentFile = new File(folder.getPath(),s);
407
                             currentFile.delete();
408
                         }
409
410
                            folder.delete();
411
412
                        }
413
        }
414 1 up20160559
415
        public void saveChunks() throws IOException{
416
417
                for(int i = 0; i < storage.getChunks().size(); i++) {
418
419
420
                         String filename = "Peer"+this.getPeerID() + "/backup/"+storage.getChunks().get(i).getFileID()+"/chk"+storage.getChunks().get(i).getChunkN();
421
422
                 File file = new File(filename);
423
                 if (!file.exists()) {
424
                     file.getParentFile().mkdirs();
425
                     file.createNewFile();
426
                 }
427
428
                 FileOutputStream fileOut = new FileOutputStream(filename);
429
                 ObjectOutputStream out = new ObjectOutputStream(fileOut);
430
                 out.writeObject(storage.getChunks().get(i).getContent());
431
                 out.close();
432
                 fileOut.close();
433
                }
434
435
        }
436
         public static String sha256hashing(String base) {
437
                try{
438
                    MessageDigest md = MessageDigest.getInstance("SHA-256");
439 2 up20160559
                    byte[] hashed = md.digest(base.getBytes("UTF8"));
440 1 up20160559
                    StringBuffer hexString = new StringBuffer();
441
442
                    for (int i = 0; i < hashed.length; i++) {
443
                        String hex = Integer.toHexString(0xff & hashed[i]);
444
                        if(hex.length() == 1) hexString.append('0');
445
                        hexString.append(hex);
446
                    }
447
448
                    return hexString.toString();
449
                } catch(Exception ex){
450
                   throw new RuntimeException(ex);
451
                }
452
            }
453
454
         String generateFileID(File file) {
455
456
                    String aux  =file.getName() + file.lastModified();
457
458
459
                    return sha256hashing(aux);
460
            }
461
462
                public void getChunksFromFile(String hash) throws IOException {
463
464
465
                   try{
466
                        Files.exists(Paths.get("Peer"+this.getPeerID() + "/backup/"+hash+"/"));
467
                   File folder = new File("Peer"+this.getPeerID() + "/backup/"+hash+"/");
468
                   File[] files = folder.listFiles();
469
470
                    for (File file : files)
471
                    {
472
                            chunkIterator++;
473
                            byte[] chunkContent = Files.readAllBytes(file.toPath());
474
                            storage.addChunk(new Chunk(hash,chunkIterator,chunkContent));
475
                    }
476
                    chunkIterator = 0;
477
                   }
478
                   catch(FileNotFoundException ex){
479
                          System.out.println("Chunks not found" + ex.toString());
480
                          ex.printStackTrace();
481
                           }
482
483
484
                }
485
486
487
                public String getVersion() {
488
                        return version;
489
                }
490
491
492
                public static void setVersion(String version) {
493
                        Peer.version = version;
494
                }
495
496
497
                public StorageSystem getStorage() {
498
                        return storage;
499
                }
500
501 2 up20160559
                public ArrayList<Chunk> splitIntoChunks(File file, String fileID, int chunk_size) throws IOException
502 1 up20160559
            {
503
                        ArrayList<Chunk> filechunks = new ArrayList<Chunk>();
504
505
             Boolean lastChunk = false;
506
             File willBeRead = file;
507
             int FILE_SIZE = (int) willBeRead.length();
508
509
510
             System.out.println("Total File Size: "+FILE_SIZE);
511
512
             byte[] temporary = null;
513
514
             try {
515
              InputStream inStream = null;
516
              int totalBytesRead = 0;
517
518
              try {
519
               inStream = new BufferedInputStream ( new FileInputStream( willBeRead ));
520
521
               int chunkCount = 0;
522
               while ( totalBytesRead < FILE_SIZE )
523
               {
524
525
                int bytesRemaining = FILE_SIZE-totalBytesRead;
526
                if ( bytesRemaining < chunk_size )
527
                {
528
                 chunk_size = bytesRemaining;
529
                 lastChunk = true;
530
                }
531
532
                temporary = new byte[chunk_size]; //Temporary Byte Array
533
                int bytesRead = inStream.read(temporary, 0, chunk_size);
534 2 up20160559
535 1 up20160559
536
                if ( bytesRead > 0) // If bytes read is not empty
537
                {
538
                 totalBytesRead += bytesRead;
539
                 chunkCount++;
540
                }
541
542
                filechunks.add(new Chunk(fileID,chunkCount, temporary));
543
544
                if(bytesRemaining == 0 && lastChunk)
545
                        filechunks.add(new Chunk(fileID,chunkCount, null));
546
547
                System.out.println("Total Bytes Read: "+ totalBytesRead);
548
               }
549
550
              }
551
              finally {
552
               inStream.close();
553
              }
554
             }
555
             catch (NullPointerException ex)
556
             {
557
                     System.out.println("File not found"+ex.toString());
558
              ex.printStackTrace();
559
             }
560
             catch (FileNotFoundException ex)
561
             {
562
              ex.printStackTrace();
563
             }
564
             catch (IOException ex)
565
             {
566
              ex.printStackTrace();
567
             }
568
569
             return filechunks;
570
            }
571
572 2 up20160559
573 1 up20160559
}