Project

General

Profile

Statistics
| Revision:

root / Server.java @ 2

History | View | Annotate | Download (15 KB)

1 1 up20160792
import java.rmi.registry.Registry;
2
import java.rmi.registry.LocateRegistry;
3
import java.rmi.RemoteException;
4
import java.rmi.server.UnicastRemoteObject;
5
6
import java.util.Map;
7
8
import java.util.concurrent.ConcurrentHashMap;
9
import java.util.concurrent.ScheduledThreadPoolExecutor;
10
import java.util.concurrent.TimeUnit;
11
import java.util.concurrent.atomic.AtomicBoolean;
12
import java.util.concurrent.Executors;
13
import java.util.concurrent.CopyOnWriteArrayList;;
14
import java.util.ArrayList;
15
import java.util.List;
16
import java.util.Comparator;
17
import java.util.HashSet;
18
import java.util.Collections;
19
20
import java.io.*;
21
22
import java.net.DatagramPacket;
23
24
import java.nio.file.*;
25
26
//Server class essentially a Peer
27
public class Server implements PeerInterface {
28
29
    // Unique peer id
30
    public String id;
31
32
    // Size controls
33
    public static final int DEFAULT_MAX_SIZE = 1000000000;
34
    public int limit;
35
    public int used;
36
37
    // Thread Pool
38
    public static ScheduledThreadPoolExecutor pool;
39
40
    // Sockets
41
    public static Socket MC;
42
    public static Socket MDB;
43
    public static Socket MDR;
44
45
    // State Backup
46
    public boolean doingBackUp;
47
    public String file;
48
    public int replications;
49
50
    // State Restore
51
    public boolean restoring;
52
    public AtomicBoolean waitRestoreReply = new AtomicBoolean(false);
53 2 up20160792
    public ConcurrentHashMap<String, Chunk> recoveredChunks = new ConcurrentHashMap<String, Chunk>(); // ChunkNo ->
54
                                                                                                      // Chunk
55 1 up20160792
56
    // Service Management Information
57
    public ConcurrentHashMap<String, Chunk> chunks = new ConcurrentHashMap<String, Chunk>(); // fileId-ChunkNo
58
                                                                                             // -> Chunk
59
    public ConcurrentHashMap<String, String> files = new ConcurrentHashMap<String, String>(); // fileId -> // file name
60
61
    // Empty Constructor
62
    public Server() {
63
    }
64
65
    // Simplified Constructor(Default values)
66
    public Server(String id) throws IOException {
67
        this.used = 0;
68
        this.id = id;
69
        this.limit = DEFAULT_MAX_SIZE;
70
        this.setupSockets();
71
        this.loadLocal();
72
        this.setupDirectory();
73
    }
74
75
    // Full constructor
76
    public Server(String id, int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
77
            String addressMDR) throws IOException {
78
        this.used = 0;
79
        this.id = id;
80
        this.limit = DEFAULT_MAX_SIZE;
81
        this.setupSockets(portMC, addressMC, portMDB, addressMDB, portMDR, addressMDR);
82
        this.loadLocal();
83
        this.setupDirectory();
84
    }
85
86
    // Backup Protocol
87
    public void backup(String path, int replications) throws RemoteException {
88
89
        // Start backup
90
        this.doingBackUp = true;
91
92
        // Extract file name
93
        int index = path.lastIndexOf("\\");
94
        String fileName = path.substring(index + 1);
95
96 2 up20160792
        // Generate Hash from file name and date
97
        String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified()));
98 1 up20160792
99 2 up20160792
        // Is file backed up already?
100
        if (this.files.get(fileId) != null) {
101
            System.out.println("File already backed up");
102
            return;
103
        }
104
105 1 up20160792
        // Add to file list
106
        this.files.put(fileId, fileName);
107
108
        try {
109
110
            // Get file bytes
111
            byte[] bytes = FileManager.readFile(path);
112
113
            // Get file in chunks
114
            byte[][] chunks = FileManager.split(bytes);
115
116
            // Send each chunk
117
            for (int i = 0; i < chunks.length; i++) {
118
119
                // Base receive time of 1 second
120
                int receiveTime = 1000;
121
122
                // PUTCHUNK tries
123
                for (int j = 0; j < 5; j++) {
124
125
                    // Prepare message
126
                    Message msg = new Message(Message.MessageType.PUTCHUNK, id, fileId, i, replications, chunks[i]);
127
128
                    // Pack in datagram
129
                    DatagramPacket packet = msg.packit(MDB.address, MDB.port);
130
131
                    // Send message
132
                    MDB.socket.send(packet);
133
134
                    // Wait for replies and check Replication level
135
                    this.replications = 0;
136
                    this.file = fileId;
137
                    Thread.sleep(receiveTime);
138
                    if (this.replications >= replications) {
139
                        break;
140
                    }
141
142
                    // Double receive time
143
                    receiveTime *= 2;
144
145
                }
146
147
            }
148
149
        } catch (Exception e) {
150
            e.printStackTrace();
151
        }
152
153
        // Terminate backup
154
        this.doingBackUp = false;
155
156
    }
157
158
    // Restore Protocol
159
    public void restore(String path) throws RemoteException {
160
161
        this.restoring = true;
162
163
        // File name
164
        int index = path.lastIndexOf("\\");
165
        String fileName = path.substring(index + 1);
166
167 2 up20160792
        // Generate Hash from file name and date
168
        String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified()));
169 1 up20160792
170
        // Find in list of backed up
171
        String name = this.files.get(fileId);
172
173
        // File not backed up yet, just return
174
        if (name == null) {
175 2 up20160792
            System.out.println("File not backed up by this peer");
176 1 up20160792
            return;
177
        }
178
179
        try {
180
181
            // Get byte count (probably better than actually fetching the bytes array)
182
            File file = new File(path);
183
            int bytes = (int) file.length();
184
185
            // How many chunks to expect
186
            int chunks = 1 + bytes / FileManager.chunkSize;
187
188
            // Reset list of Chunks recovered
189
            this.recoveredChunks.clear();
190
191
            // Request chunks
192
            for (int i = 0; i < chunks; i++) {
193
194
                // Prepare message
195
                Message msg = new Message(Message.MessageType.GETCHUNK, id, fileId, i, 0, new byte[0]);
196
197
                // Pack in datagram
198
                DatagramPacket packet = msg.packit(MC.address, MC.port);
199
200
                // Send message
201
                MC.socket.send(packet);
202
203
                // Wait for response
204
                this.waitRestoreReply.set(true);
205 2 up20160792
                long time = System.nanoTime();
206 1 up20160792
                while (this.waitRestoreReply.get()) {
207 2 up20160792
                    // Restore timeout
208
                    if (TimeUnit.NANOSECONDS.toMillis(time) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) > 1000) {
209
                        System.out.println("Restore timeout, missing chunks");
210
                        return;
211
                    }
212 1 up20160792
                }
213
214 2 up20160792
            }
215 1 up20160792
216 2 up20160792
            // Get simple list from recovered chunks
217 1 up20160792
            List<Chunk> list = new ArrayList<Chunk>(this.recoveredChunks.values());
218
219
            // Sort chunks
220
            Collections.sort(list, new Comparator<Chunk>() {
221
                @Override
222
                public int compare(Chunk p1, Chunk p2) {
223
                    return p1.chunkN - p2.chunkN; // Ascending
224
                }
225
            });
226
227
            // Pack chunks
228
            byte[][] allBytes = new byte[list.size()][];
229
            for (int i = 0; i < list.size(); i++) {
230
                allBytes[i] = list.get(i).bytes;
231
            }
232
233
            // Merge chunks
234
            byte[] fileBytes = FileManager.merge(allBytes);
235
236
            // Save file
237
            Path current = Paths.get("");
238
            String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\restored\\" + fileName;
239
            try (FileOutputStream stream = new FileOutputStream(place)) {
240
                stream.write(fileBytes);
241
                stream.close();
242
            }
243
244
        } catch (Exception e) {
245
            e.printStackTrace();
246
        }
247
248
        this.restoring = false;
249
250
    }
251
252
    // Delete chunks associated to this file
253
    public void delete(String path) throws RemoteException {
254
255
        // File name
256
        int index = path.lastIndexOf("\\");
257
        String fileName = path.substring(index + 1);
258
259 2 up20160792
        // Generate Hash from file name and date
260
        String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified()));
261 1 up20160792
262 2 up20160792
        // Remove from list of backed up
263
        String name = this.files.get(fileId);
264
        if (name != null) {
265
            this.files.remove(fileId);
266
        }
267
268 1 up20160792
        try {
269
270
            // Prepare message
271
            Message msg = new Message(Message.MessageType.DELETE, id, fileId, 0, 0, new byte[0]);
272
273
            // Pack in datagram
274
            DatagramPacket packet = msg.packit(MC.address, MC.port);
275
276
            // Send message
277
            MC.socket.send(packet);
278
279
        } catch (Exception e) {
280
            e.printStackTrace();
281
        }
282
283
    }
284
285
    public void reclaim(int memory) throws RemoteException {
286
287
        // Memory can't be lower than 0
288
        if (memory < 0) {
289
            return;
290
        }
291
292
        // Assign new limit
293
        limit = memory;
294
295
        try {
296
297
            // Check need to delete chunks
298
            // Going to delete by order in the map
299
            while (used > limit) {
300
301
                // Pick an entry
302
                Map.Entry<String, Chunk> entry = chunks.entrySet().iterator().next();
303
304
                // Remove from used space
305
                used -= entry.getValue().bytes.length;
306
307
                // Prepare message
308
                Message msg = new Message(Message.MessageType.REMOVED, id, entry.getValue().fileId,
309
                        entry.getValue().chunkN, 0, new byte[0]);
310
311
                // Pack in datagram
312
                DatagramPacket packet = msg.packit(MC.address, MC.port);
313
314
                // Send message
315
                MC.socket.send(packet);
316
317
                // Remove from map
318
                chunks.remove(entry.getKey());
319
320
                // Remove from file system
321
                Path current = Paths.get("");
322 2 up20160792
                String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\backup\\"
323
                        + entry.getValue().fileId + "\\chk" + entry.getValue().chunkN + ".chk";
324 1 up20160792
                File toDelete = new File(place);
325
                toDelete.delete();
326
327
            }
328
329
        } catch (Exception e) {
330
            e.printStackTrace();
331
        }
332
333
    }
334
335
    public void state() throws RemoteException {
336
337
    }
338
339
    // Get information of previous peer life from file system
340
    public void loadLocal() {
341
342
        // Base path
343
        Path currentRelativePath = Paths.get("");
344
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
345
346
        // Backup path
347
        File file = new File(peerDir + "\\backup");
348
349
        // FileIds backed up
350
        File[] files = file.listFiles(File::isDirectory);
351
352
        // Peer didn't exist, just return
353
        if (files == null)
354
            return;
355
356
        // Each file directory
357
        for (int i = 0; i < files.length; i++) {
358
359
            // FileId
360
            String fileId = files[i].getName();
361
362
            // Get Chunks inside
363
            file = new File(peerDir + "\\backup" + "\\" + fileId);
364
            File[] chunks = file.listFiles();
365
366
            // No chunks, just return
367
            if (chunks == null)
368
                return;
369
370
            // Each chunk
371
            for (int j = 0; j < chunks.length; j++) {
372
373
                // Chunk name
374
                String chunkNo = chunks[j].getName();
375
376
                // Remove chk
377
                chunkNo = chunkNo.replaceAll("\\.", "");
378
                chunkNo = chunkNo.replaceAll("chk", "");
379
380
                try {
381
382
                    // Read bytes
383
                    byte[] bytes = Files.readAllBytes(chunks[j].toPath());
384
385
                    // Updated used bytes
386
                    used += bytes.length;
387
388
                    // Add to HashMap
389
                    this.chunks.put(fileId + "-" + chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), bytes, 0));
390
391
                } catch (Exception e) {
392
                    e.printStackTrace();
393
                }
394
395
            }
396
397
        }
398
399
    }
400
401
    // Setup directory
402
    public void setupDirectory() {
403
404
        // Base path
405
        Path currentRelativePath = Paths.get("");
406
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
407
408
        // Backup path
409
        File file = new File(peerDir + "\\backup");
410
        file.mkdirs();
411
412
        // Backup path
413
        file = new File(peerDir + "\\restored");
414
        file.mkdirs();
415
416
    }
417
418
    // Default Socket Setup
419
    public void setupSockets() throws IOException {
420
421
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
422
423
        MC = new Socket(this, Socket.Type.MC);
424
        pool.execute(MC);
425
426
        MDB = new Socket(this, Socket.Type.MDB);
427
        pool.execute(MDB);
428
429
        MDR = new Socket(this, Socket.Type.MDR);
430
        pool.execute(MDR);
431
432
    }
433
434
    // Specifc Socket Setup
435
    public void setupSockets(int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
436
            String addressMDR) throws IOException {
437
438
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
439
440
        MC = new Socket(this, Socket.Type.MC, portMC, addressMC);
441
        pool.execute(MC);
442
443
        MDB = new Socket(this, Socket.Type.MDB, portMDB, addressMDB);
444
        pool.execute(MDB);
445
446
        MDR = new Socket(this, Socket.Type.MDR, portMDR, addressMDR);
447
        pool.execute(MDR);
448
449
    }
450
451
    public static void main(String args[]) {
452
453 2 up20160792
        try {
454 1 up20160792
455 2 up20160792
            // Server object
456
            Server obj;
457 1 up20160792
458 2 up20160792
            // Basic (version id)
459
            if (args.length == 2) {
460
                obj = new Server(args[1]);
461
            }
462
            // Full (version id access port mc port mdb port mdr)
463
            else if (args.length == 9) {
464
                obj = new Server(args[1], Integer.parseInt(args[3]), args[4], Integer.parseInt(args[5]), args[6],
465
                        Integer.parseInt(args[7]), args[8]);
466
            }
467
            // Invalid arguments
468
            else {
469
                System.out.println("java Server <version> <server id>");
470
                System.out.println(
471
                        "java Server <version> <server id> <access_point> <MC_port> <MC_IP_address> <MDB_port> <MDB_IP_address> <MDR_port> <MDR_IP_address>");
472
                return;
473
            }
474 1 up20160792
475
            // Bind the server as a registry
476
            PeerInterface stub = (PeerInterface) UnicastRemoteObject.exportObject(obj, 0);
477
478
            // Bind the remote object's stub in the registry
479
            Registry registry = LocateRegistry.getRegistry();
480 2 up20160792
            registry.bind(obj.id, stub);
481 1 up20160792
482
            System.err.println("Server ready");
483
484
        } catch (Exception e) {
485
486
            System.err.println("Server exception: " + e.toString());
487
            e.printStackTrace();
488
489
        }
490
    }
491
492
}