Project

General

Profile

Statistics
| Revision:

root / src / Peer.java

History | View | Annotate | Download (18 KB)

1 1 up20150487
2
import java.io.ByteArrayOutputStream;
3
import java.io.File;
4
import java.io.FileInputStream;
5
import java.io.FileOutputStream;
6
import java.io.IOException;
7
import java.io.ObjectInputStream;
8
import java.io.ObjectOutputStream;
9
import java.net.InetAddress;
10
import java.nio.file.*;
11
import java.rmi.AlreadyBoundException;
12
import java.rmi.RemoteException;
13
import java.rmi.registry.LocateRegistry;
14
import java.rmi.registry.Registry;
15
import java.rmi.server.UnicastRemoteObject;
16
import java.util.concurrent.ConcurrentHashMap;
17
18
import javax.print.DocFlavor.STRING;
19
20
import java.net.MulticastSocket;
21
import java.util.ArrayList;
22
23
public class Peer implements RMInterface {
24
25
    private static MC mc;
26
    private static MDB mdb;
27
    private static MDR mdr;
28
    private static int peerId;
29
    private static String protocolVersion;
30
31
    private static ChannelsInfo cInfo;
32
    private static String accessPoint;
33
34
    private static PeersInfo peersInfo;
35
36
    public Peer() {
37
38
        try {
39
            FileInputStream fileIn = new FileInputStream("./peer" + peerId + "/memory.ser");
40
            ObjectInputStream in = new ObjectInputStream(fileIn);
41
            peersInfo = (PeersInfo) in.readObject();
42
            in.close();
43
            fileIn.close();
44
45
        } catch (Exception e) {
46
            System.out.println("Create new PeersInfo");
47
            peersInfo = new PeersInfo(peerId);
48
        }
49
50
    }
51
52
    public static void main(String[] args) {
53
        try {
54
            accessPoint = args[2];
55
            protocolVersion = args[0];
56
            try {
57
                peerId = Integer.parseInt(args[1]);
58
59
            } catch (NumberFormatException e) {
60
                peerId = 0;
61
            }
62
            setRMI();
63
            parseArgs(args);
64
            System.err.println("Peer ready");
65
        } catch (Exception e) {
66
            System.err.println("Peer exception: " + e.toString());
67
            e.printStackTrace();
68
        }
69
    }
70
71
    public static void parseArgs(String[] args) {
72
        try {
73
            cInfo = new ChannelsInfo();
74
            cInfo.MCa = InetAddress.getByName(args[3]);
75
            cInfo.MCp = Integer.parseInt(args[4]);
76
77
            cInfo.MDBa = InetAddress.getByName(args[5]);
78
            cInfo.MDBp = Integer.parseInt(args[6]);
79
80
            cInfo.MDRa = InetAddress.getByName(args[7]);
81
            cInfo.MDRp = Integer.parseInt(args[8]);
82
83
            cInfo.socketMC = new MulticastSocket(cInfo.MCp);
84
            cInfo.socketMDB = new MulticastSocket(cInfo.MDBp);
85
            cInfo.socketMDR = new MulticastSocket(cInfo.MDRp);
86
87
            cInfo.socketMC.joinGroup(cInfo.MCa);
88
            cInfo.socketMDB.joinGroup(cInfo.MDBa);
89
            cInfo.socketMDR.joinGroup(cInfo.MDRa);
90
91
            mc = new MC(peersInfo, cInfo, peerId, cInfo.MCa, cInfo.MCp);
92
            mdb = new MDB(peersInfo, cInfo, peerId, cInfo.MDBa, cInfo.MDBp);
93
            mdr = new MDR(peersInfo, cInfo, peerId, cInfo.MDRa, cInfo.MDRp);
94
95
            mc.start();
96
            mdb.start();
97
            mdr.start();
98
        } catch (Exception e) {
99
            e.printStackTrace();
100
        }
101
102
    }
103
104
    private static void setRMI() {
105
        try {
106
            Peer peer = new Peer();
107
            RMInterface stub = (RMInterface) UnicastRemoteObject.exportObject(peer, 0);
108
            try {
109
                Registry registry = LocateRegistry.createRegistry(1099);
110
                registry.rebind(accessPoint, stub);
111
            } catch (Exception e) {
112
                Registry registry = LocateRegistry.getRegistry();
113
                registry.rebind(accessPoint, stub);
114
            }
115
        } catch (RemoteException e) {
116
            e.printStackTrace();
117
        }
118
    }
119
120
    public byte[] createMessage(String protocol, String version, int senderId, String fileId, String[] args,
121
            byte[] body) {
122
123
        String msg = "";
124
        msg = protocol + " " + version + " " + senderId + " " + fileId + " ";
125
        for (String arg : args) {
126
            msg += arg + " ";
127
        }
128
        msg += "\r\n\r\n";
129
        ByteArrayOutputStream append = new ByteArrayOutputStream();
130
131
        try {
132
            append.write(msg.getBytes());
133
            if (body != null)
134
                append.write(body);
135
        } catch (IOException e) {
136
            e.printStackTrace();
137
        }
138
        return append.toByteArray();
139
140
    }
141
142
    public static void serialize_storage(PeersInfo info, int peerid) {
143
        try {
144
            String path = "./peer" + peerid + "/";
145
            File file = new File(path);
146
            file.mkdirs();
147
            file = new File(path + "memory.ser");
148
            FileOutputStream fileIn = new FileOutputStream(file);
149
            ObjectOutputStream in = new ObjectOutputStream(fileIn);
150
            in.writeObject(info);
151
            in.close();
152
            fileIn.close();
153
154
        } catch (Exception e) {
155
            e.printStackTrace();
156
157
        }
158
    }
159
160
    public Integer getUsedStorage() {
161
        if (peersInfo.spaceUsedOnDisk.get(peerId) == null)
162
            return 0;
163
        else
164
            return peersInfo.spaceUsedOnDisk.get(peerId);
165
    }
166
167
    public Integer getMaxStorage() {
168
        return peersInfo.getMaxStorage();
169
    }
170
171
    private boolean hasFreeSpace(Integer size) {
172
        return getUsedStorage() + size > getMaxStorage();
173
    }
174
175
    @Override
176
    public void backupFile(String path, int repDegree) throws Exception {
177
178
        System.out.println("Entra no Backup");
179
        File fileRead = new File(path);
180
181
        if (!fileRead.exists()) {
182
            throw new Exception("[ERROR] Couldn't locate file");
183
        }
184
185
        FilesInfo file = new FilesInfo(peerId, path, repDegree);
186
        String key = file.getFileID() + ":" + path;
187
        peersInfo.files.put(key, file.getChunks().size());
188
189
        for (int i = 0; i < file.getChunks().size(); i++) {
190
            Chunk currChunk = file.getChunks().get(i);
191
            System.out.println("CURRRRORIGINAL->" + currChunk.getReplicationDegree());
192
            String id = "";
193
            id = currChunk.getFileID() + ":" + currChunk.getChunkNo();
194
            peersInfo.storage.put(id, currChunk);
195
196
            String[] args = new String[2];
197
            args[0] = String.valueOf(currChunk.getChunkNo());
198
            args[1] = String.valueOf(repDegree);
199
            System.out.println("CURR PASSADAAOS peers->" + repDegree);
200
            byte[] msg = createMessage("PUTCHUNK", protocolVersion, peerId, file.getFileID(), args,
201
                    currChunk.getData());
202
203
            try {
204
                int time = 1000;
205
                int counter = 1;
206
                Message mensagem = new Message(msg, msg.length);
207
208
                cInfo.send(mensagem, cInfo.socketMDB, cInfo.MDBa, cInfo.MDBp);
209
210
                int NoStored = peersInfo.getStorage().get(id).getCurrReplicationDegree();
211
                int chunkReplicationDegree = peersInfo.getStorage().get(id).getReplicationDegree();
212
                System.out.println("Chunk" + currChunk.getChunkNo());
213
                System.out
214
                        .println("CurrReplicationDegree->" + peersInfo.getStorage().get(id).getCurrReplicationDegree());
215
                System.out.println("ReplicationDegree->" + peersInfo.getStorage().get(id).getReplicationDegree());
216
217
                while (counter < 5) {
218
                    Thread.sleep(time);
219
                    NoStored = peersInfo.getStorage().get(id).getCurrReplicationDegree();
220
                    chunkReplicationDegree = peersInfo.getStorage().get(id).getReplicationDegree();
221
                    System.out.println(
222
                            "CurrReplicationDegree->" + peersInfo.getStorage().get(id).getCurrReplicationDegree());
223
                    System.out.println("ReplicationDegree->" + peersInfo.getStorage().get(id).getReplicationDegree());
224
225
                    if (NoStored < chunkReplicationDegree) {
226
                        cInfo.send(mensagem, cInfo.socketMDB, cInfo.MDBa, cInfo.MDBp);
227
                        time = 2 * time;
228
                        counter++;
229
                    } else {
230
                        counter = 5;
231
                    }
232
                }
233
234
            } catch (Exception e) {
235
236
            }
237
        }
238
    }
239
240
    @Override
241
    public void backupEnhFile(String path, int repDegree) throws Exception {
242
243
        System.out.println("Entra no Backup");
244
        File fileRead = new File(path);
245
246
        if (!fileRead.exists()) {
247
            throw new Exception("[ERROR] Couldn't locate file");
248
        }
249
250
        FilesInfo file = new FilesInfo(peerId, path, repDegree);
251
        String key = file.getFileID() + ":" + path;
252
        peersInfo.files.put(key, file.getChunks().size());
253
        for (int i = 0; i < file.getChunks().size(); i++) {
254
            Chunk currChunk = file.getChunks().get(i);
255
            System.out.println("CURRRRORIGINAL->" + currChunk.getReplicationDegree());
256
            // por no hashmap
257
            String id = "";
258
            id = currChunk.getFileID() + ":" + currChunk.getChunkNo();
259
            peersInfo.storage.put(id, currChunk);
260
            String[] args = new String[2];
261
            args[0] = String.valueOf(currChunk.getChunkNo());
262
            args[1] = String.valueOf(repDegree);
263
            System.out.println("CURRPASSADAAOS peers->" + repDegree);
264
            byte[] msg = createMessage("PUTCHUNK", protocolVersion, peerId, file.getFileID(), args,
265
                    currChunk.getData());
266
267
            try {
268
                int time = 1000;
269
                int counter = 1;
270
                Message mensagem = new Message(msg, msg.length);
271
272
                cInfo.send(mensagem, cInfo.socketMDB, cInfo.MDBa, cInfo.MDBp);
273
274
                int NoStored = peersInfo.getStorage().get(id).getCurrReplicationDegree();
275
                int chunkReplicationDegree = peersInfo.getStorage().get(id).getReplicationDegree();
276
                System.out.println("Chunk" + currChunk.getChunkNo());
277
                System.out
278
                        .println("CurrReplicationDegree->" + peersInfo.getStorage().get(id).getCurrReplicationDegree());
279
                System.out.println("ReplicationDegree" + peersInfo.getStorage().get(id).getReplicationDegree());
280
281
                while (counter < 5) {
282
                    Thread.sleep(time);
283
                    NoStored = peersInfo.getStorage().get(id).getCurrReplicationDegree();
284
                    chunkReplicationDegree = peersInfo.getStorage().get(id).getReplicationDegree();
285
                    System.out.println(
286
                            "CurrReplicationDegree->" + peersInfo.getStorage().get(id).getCurrReplicationDegree());
287
                    System.out.println("ReplicationDegree->" + peersInfo.getStorage().get(id).getReplicationDegree());
288
289
                    if (NoStored < chunkReplicationDegree) {
290
                        cInfo.send(mensagem, cInfo.socketMDB, cInfo.MDBa, cInfo.MDBp);
291
                        time = 2 * time;
292
                        counter++;
293
                    } else {
294
                        counter = 5;
295
                    }
296
                }
297
298
            } catch (Exception e) {
299
300
            }
301
        }
302
    }
303
304
    @Override
305
    public void restoreFile(String path) throws RemoteException {
306
307
        System.out.println("Entra no restore");
308
        for (ConcurrentHashMap.Entry<String, Chunk> entry : peersInfo.storage.entrySet()) {
309
            String key = entry.getKey().toString();
310
            Chunk value = entry.getValue();
311
312
            System.out.println("File id e No: " + key);
313
        }
314
315
        File fileRead = new File(path);
316
317
        FilesInfo file = new FilesInfo(peerId, path, 2);
318
319
        for (int i = 0; i < file.getChunks().size(); i++) {
320
            Chunk currChunk = file.getChunks().get(i);
321
322
            String id = "";
323
            id = currChunk.getFileID() + ":" + currChunk.getChunkNo();
324
325
            String[] args = new String[1];
326
            args[0] = String.valueOf(currChunk.getChunkNo());
327
328
            byte[] msg = createMessage("GETCHUNK", protocolVersion, peerId, file.getFileID(), args, null);
329
330
            try {
331
                int time = 1000;
332
                int counter = 1;
333
                Message mensagem = new Message(msg, msg.length);
334
                mensagem.setMessageType(Message.MessageType.GETCHUNK);
335
                mensagem.setProtocolVersion(protocolVersion);
336
                mensagem.setSenderId(peerId);
337
                mensagem.setFileID(file.getFileID());
338
339
                cInfo.send(mensagem, cInfo.socketMC, cInfo.MCa, cInfo.MCp);
340
                Thread.sleep(400);
341
342
            } catch (Exception e) {
343
                System.out.println("[ERROR] Exception in sending <GETCHUNK> message");
344
                e.printStackTrace();
345
            }
346
        }
347
348
    }
349
350
    @Override
351
    public void deleteFile(String path) throws RemoteException {
352
        System.out.println("entra no delete");
353
354
        FilesInfo fileInfo = new FilesInfo(peerId, path, 1);
355
        System.out.println("[FILEID]: " + fileInfo.getFileID());
356
        System.out.println("[PATH]: " + path);
357
358
        String[] args = new String[0]; // no args
359
360
        byte[] msg = createMessage("DELETE", protocolVersion, peerId, fileInfo.getFileID(), args, null);
361
362
        try {
363
            Message mensagem = new Message(msg, msg.length);
364
            cInfo.send(mensagem, cInfo.socketMC, cInfo.MCa, cInfo.MCp);
365
366
        } catch (Exception e) {
367
            System.out.println("[ERROR] Exception in sending <DELETE> message");
368
            e.printStackTrace();
369
        }
370
371
        try {
372
            Files.deleteIfExists(fileInfo.getFile().toPath());
373
            System.out.println("[INFO] File deleted successfully");
374
        } catch (NoSuchFileException x) {
375
            System.err.format("%s: no such" + " file or directory%n", fileInfo.getFile().toPath());
376
        } catch (DirectoryNotEmptyException x) {
377
            System.err.format("%s not empty%n", fileInfo.getFile().toPath());
378
        } catch (IOException x) {
379
            System.err.println(x);
380
        }
381
    }
382
383
    @Override
384
    public void reclaim(int size) throws RemoteException {
385
        // Ir a storage
386
        System.out.println("______________________");
387
        System.out.println("Entra no Reclaim");
388
        ArrayList<Chunk> chunksOnDisk = new ArrayList<Chunk>();
389
390
        for (ConcurrentHashMap.Entry<String, Chunk> entry : peersInfo.storage.entrySet()) {
391
            String key = entry.getKey().toString();
392
            Chunk currChunk = entry.getValue();
393
            if (currChunk.getPeersId().contains(peerId)) {
394
                chunksOnDisk.add(currChunk);
395
                System.out.println("File id e No: " + key);
396
                // significa que o peer a guardou no sistema
397
            }
398
        }
399
        System.out.println("Espaço usado até agora->" + Integer.toString(peersInfo.getSpaceUsedOnDisk().get(peerId)));
400
        int i = 0;
401
402
        while (peersInfo.getSpaceUsedOnDisk().get(peerId) > size) {
403
            if (i < chunksOnDisk.size()) {
404
                Chunk chunkToDelete = chunksOnDisk.get(i);
405
                int space = peersInfo.getSpaceUsedOnDisk().get(peerId);
406
                peersInfo.spaceUsedOnDisk.replace(peerId, space - chunkToDelete.getData().length);
407
408
                deleteFiles(chunkToDelete);
409
                String[] args = new String[1]; // no args
410
                args[0] = Integer.toString(chunkToDelete.getChunkNo());
411
412
                byte[] msg = createMessage("REMOVED", protocolVersion, peerId, chunkToDelete.getFileID(), args, null);
413
414
                try {
415
                    Message mensagem = new Message(msg, msg.length);
416
                    cInfo.send(mensagem, cInfo.socketMC, cInfo.MCa, cInfo.MCp);
417
418
                    Thread.sleep(2000);
419
420
                } catch (Exception e) {
421
                    System.out.println("[ERROR] Exception in sending <REMOVED> message");
422
                    e.printStackTrace();
423
                }
424
425
                i++;
426
427
            }
428
        }
429
430
    }
431
432
    public void deleteFiles(Chunk chunktoDelete) throws RemoteException {
433
434
        // removes each chunk
435
        Path path = Paths.get("./peer" + peerId + "/" + "backup/" + chunktoDelete.getFileID() + "/chk"
436
                + Integer.toString(chunktoDelete.getChunkNo()) + ".chunk");
437
438
        try {
439
            Files.deleteIfExists(path);
440
        } catch (IOException e) {
441
            System.err.println("[ERROR] Could not delete chunk folder");
442
            e.printStackTrace();
443
        }
444
        // removes directory from FileSystem
445
        Path pathfileID = Paths.get("./peer" + peerId + "/" + "backup/" + chunktoDelete.getFileID() + "/chk"
446
                + Integer.toString(chunktoDelete.getChunkNo()) + ".chunk");
447
        try {
448
            Files.deleteIfExists(pathfileID);
449
        } catch (IOException e) {
450
            System.err.println("[ERROR] Could not delete chunk");
451
            e.printStackTrace();
452
        }
453
    }
454
455
    @Override
456
    public String state() throws RemoteException {
457
        StringBuilder stateString = new StringBuilder();
458
459
        stateString.append("\n============ CURRENT PEER STATE ============\n\n");
460
461
        stateString.append("Files whose backup was initiated by this peer:\n");
462
463
        for (ConcurrentHashMap.Entry<String, Integer> entry : peersInfo.getFiles().entrySet()) {
464
465
            Integer numberOfChunks = entry.getValue();
466
467
            String[] entrySplit = entry.getKey().split(":");
468
469
            String fileID = entrySplit[0];
470
            String fileName = entrySplit[1];
471
472
            stateString.append("\t\tFile Pathname = " + fileName + " | fileID = " + fileID + "\n");
473
        }
474
475
        stateString.append("\nChunks stored in this peer:\n");
476
        for (ConcurrentHashMap.Entry<String, Chunk> entry : peersInfo.storage.entrySet()) {
477
            String key = entry.getKey().toString();
478
            Chunk chunk = entry.getValue();
479
            ArrayList<Integer> chunkSavedOnPeers = chunk.getPeersId();
480
481
            for (Integer savedOnPeer : chunkSavedOnPeers) {
482
                if (savedOnPeer == peerId) {
483
                    stateString.append("\t\tChunk id: " + chunk.getChunkNo() + " | Chunk size (KB): "
484
                            + chunk.getData().length / 1000 + " | Perceived replication degree:\n");
485
                }
486
            }
487
488
        }
489
490
        stateString.append("\nMax storage capacity (KB): " + getMaxStorage() / 1000 + "\n");
491
        stateString.append("Current amount of storage used (KB): " + getUsedStorage() / 1000 + "\n");
492
        return stateString.toString();
493
    }
494
}