Project

General

Profile

Statistics
| Revision:

sdis1819-t7g02 / service / Peer.java @ 2

History | View | Annotate | Download (11.5 KB)

1 1 up20150366
package service;
2
3
import java.io.IOException;
4
import java.nio.file.Files;
5
import java.nio.file.Path;
6
import java.nio.file.Paths;
7
import java.rmi.registry.Registry;
8
import java.rmi.registry.LocateRegistry;
9
import java.rmi.server.UnicastRemoteObject;
10
import java.util.ArrayList;
11
import java.util.List;
12
import java.util.concurrent.Callable;
13
import java.util.concurrent.ExecutionException;
14
import java.util.concurrent.ExecutorService;
15
import java.util.concurrent.Executors;
16
import java.util.concurrent.Future;
17
import java.util.concurrent.ScheduledThreadPoolExecutor;
18
import java.util.concurrent.TimeUnit;
19
20
import channels.Message;
21
import channels.MessageHeader;
22
import protocols.BackupChunkProtocol;
23
import protocols.RestoreChunksProtocol;
24
25
public class Peer implements RMI {
26
27
        ScheduledThreadPoolExecutor exec;
28
        private static Cloud cloud;
29
30
        public Peer() {}
31
32
        public static void main(String[] args) {
33
34
                try
35
                {
36
                        Peer peer = new Peer();
37
                        RMI rmi = (RMI) UnicastRemoteObject.exportObject(peer, 0);
38
                        String acess_point = args[2];
39
                        Registry registry = LocateRegistry.getRegistry();
40
                        registry.bind(acess_point, rmi);
41
                        System.err.println("Peer ready");
42
                }
43
                catch (Exception e) {
44
                        System.err.println("Peer exception: " + e.toString());
45
                        return;
46
                }
47
48
                if(args.length != 9) {
49
                        printHelp();
50
                        return;
51
                }
52
53
                try {
54
                        Cloud aux = new Cloud(args[0], args[1]);
55
                        cloud = aux.getPeerTools(args[0], args[1]);
56
                        cloud.createControlRoom(args[3], args[4]);
57
                        cloud.createBackupChannel(args[5], args[6]);
58
                        cloud.createRestoreChannel(args[7], args[8]);
59
60
                        String path = Constants.DATABASE_DIR;
61
                        Path p = Paths.get(path);
62
                        if (Files.exists(p))
63
                        {
64
                                try {
65
                                        cloud.loadFromDisk();
66
                                } catch (Exception e) {
67
                                        e.printStackTrace();
68
                                }
69
70
                        }
71
72
                        //                        System.out.println(cloud.getStoredChunks().size());
73
74
                } catch (NumberFormatException | IOException e) {
75
                        System.out.println(e.getMessage());
76
                        return;
77
                }
78
79
                if(Cloud.checkOrCreateDirectory(Constants.CHUNKS_DIR + args[0] + "/") == -1) {
80
                        System.out.println("Couldn't create peer chunks dir!");
81
                }
82
83
                cloud.activateChannels();
84
                System.out.println("Channels activated");
85
        }
86
87
        // the name of each multicast channel consists of the ip multicast address and
88
        // port, passed as cmd line arguments, followed by protocol version, server id
89
        // and service access point
90
        private static void printHelp() {
91
                System.out.println("java service.Peer SERVER_id VERSION ACCESS_POINT MC_ip MC_port MDB_ip MDB_port MDR_ip MDR_port");
92
        }
93
94
        public void saveState() {
95
                try {
96
                        cloud.saveToDisk();
97
                } catch (IOException e) {
98
                        e.printStackTrace();
99
                }
100
        }
101
102
103
        public synchronized String backup(String file_path, int replication_degree)
104
        {
105
                FileChunker fc;
106
                Message m;
107
                MessageHeader h;
108
                int numberOfChunks;
109
                ExecutorService WORKER_THREAD_POOL;
110
                List<Callable<Object>> callables = new ArrayList<Callable<Object>>();
111
                List<Future<Object>> futures = null;
112
                ArrayList<Chunk> chunkHolder = null;
113
                String[] headerElements = new String[Constants.PUTCHUNK_N_ARGS];
114
                Chunk aux;
115
                int numberOfSteps;
116
117
                System.out.println("BACKUP request received");
118
119
                try {
120
                        fc = new FileChunker(file_path, replication_degree);
121
                        //                cloud.createDirectoryFile(fc);
122
                }
123
                catch (IOException e) {
124
                        System.out.println("ERROR chunking file -> " + e.getMessage());
125
                        return "ERROR chunking file -> " + e.getMessage();
126
                }
127
128
                chunkHolder = fc.getChunks();
129
                numberOfChunks = chunkHolder.size();
130
131
                cloud.registerNumberOfFileChunks(fc.getFileID(), numberOfChunks);
132
133
                System.out.println("File splited into " + numberOfChunks + " chunks.");
134
135
                headerElements[0] = Constants.PUTCHUNK;
136
                headerElements[1] = cloud.getProtocolVersion();
137
                headerElements[2] = Integer.toString(cloud.getID());
138
139
                numberOfSteps = numberOfChunks % Constants.MAX_NUMBER_OF_THREADS == 0 ? (int)(numberOfChunks / Constants.MAX_NUMBER_OF_THREADS):(int)(numberOfChunks / Constants.MAX_NUMBER_OF_THREADS) + 1;
140
141
                for(int step = 0, first, last; step < numberOfSteps; step++, futures.clear(), callables.clear()) {
142
                        first = step * Constants.MAX_NUMBER_OF_THREADS;
143
                        last = numberOfChunks - first < Constants.MAX_NUMBER_OF_THREADS ? numberOfChunks : first + Constants.MAX_NUMBER_OF_THREADS;
144
145
                        System.out.println("Sending from " + first + " to " + last);
146
147
                        WORKER_THREAD_POOL = Executors.newFixedThreadPool(last-first);
148
149
                        for(int chunkNo = first; chunkNo < last; chunkNo++) {
150
                                aux = chunkHolder.get(chunkNo);
151
                                headerElements[3] = fc.getFileID();
152
                                headerElements[4] = aux.getChunkNumber();
153
                                headerElements[5] = Integer.toString(replication_degree);
154
                                try {
155
                                        h = new MessageHeader(headerElements.clone());
156
157
                                        m = new Message(h, aux.getChunkContent());
158
159
                                        callables.add(new BackupChunkProtocol(cloud, aux, m ,replication_degree));
160
                                }
161
                                catch(IllegalArgumentException e) {
162
                                        System.out.println(e.getMessage());
163
                                        return "Backup failed preparing chunks";
164
                                }
165
                        }
166
167
                        try {
168
                                futures = WORKER_THREAD_POOL.invokeAll(callables);
169
170
                                WORKER_THREAD_POOL.shutdown();
171
172
                                if(WORKER_THREAD_POOL.awaitTermination(35, TimeUnit.SECONDS)) {
173
                                        for(Future<Object> p: futures) {
174
                                                if(!(boolean)p.get()) {
175
                                                        System.out.println("One of the chunks could not be stored.");
176
177
                                                        delete(file_path); //order successfully stored chunks to be deleted
178
179
                                                        return "Backup failed preparing chunks";
180
                                                }
181
                                        }
182
                                }
183
                        }
184
                        catch (InterruptedException | ExecutionException e) {
185
                                return "Error in peer: " + e.getMessage();
186
                        }
187
                }
188
189
                System.out.println("backup peer has " + cloud.getNumberOfStoredChunks());
190
191
                saveState();
192
193
                return "Backup successfull.";
194
        }
195
196
        public synchronized String restore(String file_path)
197
        {
198
                System.out.println("RESTORE request received");
199
200
                FileChunker fc;
201
                List<Callable<Object>> callables = new ArrayList<Callable<Object>>();
202
                List<Future<Object>> futures = null;
203
                int numberOfChunks;
204
                MessageHeader h;
205
                int numberOfSteps;
206
                ExecutorService WORKER_THREAD_POOL;
207
                String[] headerElements = new String[Constants.GETCHUNK_N_ARGS];
208
209
                headerElements[0] = Constants.GETCHUNK;
210
                headerElements[1] = cloud.getProtocolVersion();
211
                headerElements[2] = Integer.toString(cloud.getID());
212
213
                try
214
                {
215
                        fc = new FileChunker(file_path);
216
                }
217
                catch (IOException e)
218
                {
219
                        System.out.println("ERROR restoring file -> " + e.getMessage());
220
                        return "ERROR restoring file -> " + e.getMessage();
221
                }
222
223
                headerElements[3] = fc.getFileID();
224
225
                String file_id = fc.getFileID();
226
227
                numberOfChunks = cloud.getNumberOfChunks(file_id);
228
229
                if(numberOfChunks < 0) {
230
                        return file_path + " wasn't backep up by this peer.";
231
                }
232
233
                numberOfSteps = numberOfChunks % Constants.MAX_NUMBER_OF_THREADS == 0 ? (int)(numberOfChunks / Constants.MAX_NUMBER_OF_THREADS):(int)(numberOfChunks / Constants.MAX_NUMBER_OF_THREADS) + 1;
234
235
                for(int step = 0, first, last; step < numberOfSteps; step++, futures.clear(), callables.clear()) {
236
                        first = step * Constants.MAX_NUMBER_OF_THREADS;
237
                        last = numberOfChunks - first < Constants.MAX_NUMBER_OF_THREADS ? numberOfChunks : first + Constants.MAX_NUMBER_OF_THREADS;
238
239
                        System.out.println("Sending from " + first + " to " + last);
240
241
                        WORKER_THREAD_POOL = Executors.newFixedThreadPool(last-first);
242
243
                        for(int chunkNo = first; chunkNo < last; chunkNo++)
244
                        {
245
                                headerElements[4] = Integer.toString(chunkNo);
246
247
                                try
248
                                {
249
                                        h = new MessageHeader(headerElements.clone());
250
251
                                        cloud.registerFileChunkToRestore(file_id, headerElements[4]);
252
253
                                        callables.add(new RestoreChunksProtocol(cloud, h));
254
255
                                }
256
                                catch(IllegalArgumentException e) {
257
                                        System.out.println(e.getMessage());
258
                                        return "Restore failed preparing chunks";
259
                                }
260
                        }
261
262
                        try {
263
                                futures = WORKER_THREAD_POOL.invokeAll(callables);
264
265
                                WORKER_THREAD_POOL.shutdown();
266
267
                                if(WORKER_THREAD_POOL.awaitTermination(35, TimeUnit.SECONDS)) {
268
                                        for(Future<Object> p: futures) {
269
                                                if(!(boolean)p.get()) {
270
                                                        System.out.println("One of the chunks could not be restored.");
271
272
                                                        return "Restore failed preparing chunks";
273
                                                }
274
                                        }
275
                                }
276
                        }
277
                        catch (InterruptedException | ExecutionException e) {
278
                                return "Error in peer: " + e.getMessage();
279
                        }
280
281
                }
282
283
                System.out.println("Colected all file parts. Building file.");
284
285
                try {
286
                        cloud.buildFile(file_id, numberOfChunks, Paths.get(file_path).getFileName().toString());
287
                }
288
                catch (IOException e) {
289
                        System.out.println("ERROR builidng file -> " + e.getMessage());
290
                }
291
292
                saveState();
293
294
                return "Restored file successfully.";
295
        }
296
297
        public synchronized String delete(String file_path)
298
        {
299
                FileChunker fc;
300
                MessageHeader h;
301
                String[] headerElements = new String[Constants.DELETE_N_ARGS];
302
303
                System.out.println("DELETE request.");
304
305
                try {
306
                        fc = new FileChunker(file_path);
307
                        headerElements[0] = Constants.DELETE;
308
                        headerElements[1] = cloud.getProtocolVersion();
309
                        headerElements[2] = Integer.toString(cloud.getID());
310
                        headerElements[3] = fc.getFileID();
311
                        h = new MessageHeader(headerElements.clone());
312
                        if(cloud.checkIfPeerStoredFile(headerElements[3])) {
313
                                for(int i = 0; i < 3; i++) {
314
                                        cloud.controlRoom.sendHeader(h);
315
                                        Thread.sleep(1000);
316
                                }
317
                        }
318
                        else {
319
                                return "File was not uploaded here. Delete failed.";
320
                        }
321
322
                }
323
                catch(Exception e) {
324
                        System.out.println("ERROR deleting file -> " + e.getMessage());
325
                        return "ERROR deleting file -> " + e.getMessage();
326
                }
327
328
                saveState();
329
330
                return "Delete successfull.";
331
        }
332
333
        public synchronized String reclaim(long disk_space)
334
        {
335
                if(disk_space < 0) {
336
                        return "Space_disk can't be lower than 0!";
337
                }
338
339
                System.out.println("Space reclaim request to " + disk_space + " bytes.");
340
341
                if(disk_space >= cloud.getChunkSpace()) {
342
                        cloud.setAvailableSpace(disk_space);
343
                }
344
                else {
345
                        cloud.freeUpSpace(disk_space);
346
                }
347
348
                saveState();
349
350
                return "Space reclaim successfull";
351
        }
352
353
        public synchronized String state()
354
        {
355
                System.out.println("STATE OPERATION RECEIVED");
356
357
                String answer = "\n\nPeer " + cloud.getID() + " state: \n";
358
359
                answer += "\tBacked-up files:\n";
360
361
                String pathName;
362
                int repDegree, nChunks;
363
                Chunk c;
364
365
                System.out.println("Number of files = " + cloud.getBackedUpFileNames().size());
366
367
                for(String key : cloud.getBackedUpFileNames()) {
368
369
                        pathName = cloud.getPathNameFromFileId(key);
370
                        repDegree = cloud.getBackedUpChunkNecessaryRepDegree(key);
371
372
                        if(pathName != null && repDegree > -1) {
373
                                answer += "\t\tFILE PATHNAME: " + pathName + "    FILE ID: " + key + "    DESIRED REPPLICATION DEGREE:" + repDegree + "\n";
374
                        }
375
                        else {
376
                                System.out.println("Error in key: " + key + " pathname " + pathName + " degree " + repDegree);
377
                                continue;
378
                        }
379
380
                        nChunks = cloud.getNumberOfChunks(key);
381
382
                        if(nChunks < 0) {
383
                                System.out.println("Error getting number of chunks in key: " + key + " pathname " + pathName + " degree " + repDegree);
384
                                continue;
385
                        }
386
387
                        answer += "\t\t\tChunks:\n";
388
389
                        for(int i = 0; i < nChunks; i++) {
390
                                answer += "\t\t\t\tchk" + i + "   REP: DEGREE " + cloud.getBackepUpChunkPerceivedRepDegree(key, i) + "\n";
391
                        }
392
393
                        answer += "\n";
394
                }
395
396
                answer += "\tStored chunks:\n";
397
398
                for(String key : cloud.getStoredChunks().keySet()) {
399
                        c = cloud.getStoredChunks().get(key);
400
401
                        if(c.getFilePathName() == null) { //only backep up files have a non-null file path name
402
                                answer += "\t\t" +c.getChunkId() + ":" + c.getChunkNumber() + "   " + "SIZE: " + c.getChunkSize() / 1024 + " KBytes    " + " PERCEIVED REP. DEGREE: " + c.getActualReplicationDegree() + "\n";
403
                        }
404
                }
405
406
                answer += "\n\n\t Total peer disk space: " + cloud.getTotalSpace() / 1024 + " KBytes   Space used by chunks : " + cloud.getChunkSpace() / 1024 + " KBytes   Available space: " + cloud.getAvailableSpace() / 1024 + " KBytes.\n";
407
408
                return answer;
409
        }
410
}