Project

General

Profile

Statistics
| Revision:

root / Server.java @ 1

History | View | Annotate | Download (13.8 KB)

1 1 up20160792
import java.rmi.registry.Registry;
2
import java.rmi.registry.LocateRegistry;
3
import java.rmi.RemoteException;
4
import java.rmi.server.UnicastRemoteObject;
5
6
import java.util.Map;
7
8
import java.util.concurrent.ConcurrentHashMap;
9
import java.util.concurrent.ScheduledThreadPoolExecutor;
10
import java.util.concurrent.TimeUnit;
11
import java.util.concurrent.atomic.AtomicBoolean;
12
import java.util.concurrent.Executors;
13
import java.util.concurrent.CopyOnWriteArrayList;;
14
import java.util.ArrayList;
15
import java.util.List;
16
import java.util.Comparator;
17
import java.util.HashSet;
18
import java.util.Collections;
19
20
import java.io.*;
21
22
import java.net.DatagramPacket;
23
24
import java.nio.file.*;
25
26
//Server class essentially a Peer
27
public class Server implements PeerInterface {
28
29
    // Unique peer id
30
    public String id;
31
32
    // Size controls
33
    public static final int DEFAULT_MAX_SIZE = 1000000000;
34
    public int limit;
35
    public int used;
36
37
    // Thread Pool
38
    public static ScheduledThreadPoolExecutor pool;
39
40
    // Sockets
41
    public static Socket MC;
42
    public static Socket MDB;
43
    public static Socket MDR;
44
45
    // State Backup
46
    public boolean doingBackUp;
47
    public String file;
48
    public int replications;
49
50
    // State Restore
51
    public boolean restoring;
52
    public AtomicBoolean waitRestoreReply = new AtomicBoolean(false);
53
    public ConcurrentHashMap<String, Chunk> recoveredChunks = new ConcurrentHashMap<String, Chunk>(); // ChunkNo -> Chunk
54
55
    // Service Management Information
56
    public ConcurrentHashMap<String, Chunk> chunks = new ConcurrentHashMap<String, Chunk>(); // fileId-ChunkNo
57
                                                                                             // -> Chunk
58
    public ConcurrentHashMap<String, String> files = new ConcurrentHashMap<String, String>(); // fileId -> // file name
59
60
    // Empty Constructor
61
    public Server() {
62
    }
63
64
    // Simplified Constructor(Default values)
65
    public Server(String id) throws IOException {
66
        this.used = 0;
67
        this.id = id;
68
        this.limit = DEFAULT_MAX_SIZE;
69
        this.setupSockets();
70
        this.loadLocal();
71
        this.setupDirectory();
72
    }
73
74
    // Full constructor
75
    public Server(String id, int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
76
            String addressMDR) throws IOException {
77
        this.used = 0;
78
        this.id = id;
79
        this.limit = DEFAULT_MAX_SIZE;
80
        this.setupSockets(portMC, addressMC, portMDB, addressMDB, portMDR, addressMDR);
81
        this.loadLocal();
82
        this.setupDirectory();
83
    }
84
85
    // Backup Protocol
86
    public void backup(String path, int replications) throws RemoteException {
87
88
        // TODO is file backed up already?
89
90
        // Start backup
91
        this.doingBackUp = true;
92
93
        // Extract file name
94
        int index = path.lastIndexOf("\\");
95
        String fileName = path.substring(index + 1);
96
97
        // Generate Hash
98
        // TODO dont use just file name for hashing(multiple file versions with same
99
        // name problem)
100
        String fileId = FileManager.hash(fileName);
101
102
        // Add to file list
103
        this.files.put(fileId, fileName);
104
105
        try {
106
107
            // Get file bytes
108
            byte[] bytes = FileManager.readFile(path);
109
110
            // Get file in chunks
111
            byte[][] chunks = FileManager.split(bytes);
112
113
            // Send each chunk
114
            for (int i = 0; i < chunks.length; i++) {
115
116
                // Base receive time of 1 second
117
                int receiveTime = 1000;
118
119
                // PUTCHUNK tries
120
                for (int j = 0; j < 5; j++) {
121
122
                    // Prepare message
123
                    Message msg = new Message(Message.MessageType.PUTCHUNK, id, fileId, i, replications, chunks[i]);
124
125
                    // Pack in datagram
126
                    DatagramPacket packet = msg.packit(MDB.address, MDB.port);
127
128
                    // Send message
129
                    MDB.socket.send(packet);
130
131
                    // Wait for replies and check Replication level
132
                    this.replications = 0;
133
                    this.file = fileId;
134
                    Thread.sleep(receiveTime);
135
                    if (this.replications >= replications) {
136
                        break;
137
                    }
138
139
                    // Double receive time
140
                    receiveTime *= 2;
141
142
                }
143
144
            }
145
146
        } catch (Exception e) {
147
            e.printStackTrace();
148
        }
149
150
        // Terminate backup
151
        this.doingBackUp = false;
152
153
    }
154
155
    // Restore Protocol
156
    public void restore(String path) throws RemoteException {
157
158
        this.restoring = true;
159
160
        // File name
161
        int index = path.lastIndexOf("\\");
162
        String fileName = path.substring(index + 1);
163
164
        // Generate Hash
165
        // TODO dont use just file name for hashing(multiple file versions with same
166
        // name problem)
167
        String fileId = FileManager.hash(fileName);
168
169
        // Find in list of backed up
170
        String name = this.files.get(fileId);
171
172
        // File not backed up yet, just return
173
        if (name == null) {
174
            return;
175
        }
176
177
        try {
178
179
            // Get byte count (probably better than actually fetching the bytes array)
180
            File file = new File(path);
181
            int bytes = (int) file.length();
182
183
            // How many chunks to expect
184
            int chunks = 1 + bytes / FileManager.chunkSize;
185
186
            // Reset list of Chunks recovered
187
            this.recoveredChunks.clear();
188
189
            // Request chunks
190
            for (int i = 0; i < chunks; i++) {
191
192
                // Prepare message
193
                Message msg = new Message(Message.MessageType.GETCHUNK, id, fileId, i, 0, new byte[0]);
194
195
                // Pack in datagram
196
                DatagramPacket packet = msg.packit(MC.address, MC.port);
197
198
                // Send message
199
                MC.socket.send(packet);
200
201
                // Wait for response
202
                this.waitRestoreReply.set(true);
203
                while (this.waitRestoreReply.get()) {
204
                }
205
206
            }
207
208
            //Get simple list from recovered chunks
209
            List<Chunk> list = new ArrayList<Chunk>(this.recoveredChunks.values());
210
211
            // Sort chunks
212
            Collections.sort(list, new Comparator<Chunk>() {
213
                @Override
214
                public int compare(Chunk p1, Chunk p2) {
215
                    return p1.chunkN - p2.chunkN; // Ascending
216
                }
217
            });
218
219
            // Pack chunks
220
            byte[][] allBytes = new byte[list.size()][];
221
            for (int i = 0; i < list.size(); i++) {
222
                allBytes[i] = list.get(i).bytes;
223
            }
224
225
            // Merge chunks
226
            byte[] fileBytes = FileManager.merge(allBytes);
227
228
            // Save file
229
            Path current = Paths.get("");
230
            String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\restored\\" + fileName;
231
            try (FileOutputStream stream = new FileOutputStream(place)) {
232
                stream.write(fileBytes);
233
                stream.close();
234
            }
235
236
        } catch (Exception e) {
237
            e.printStackTrace();
238
        }
239
240
        this.restoring = false;
241
242
    }
243
244
    // Delete chunks associated to this file
245
    public void delete(String path) throws RemoteException {
246
247
        // File name
248
        int index = path.lastIndexOf("\\");
249
        String fileName = path.substring(index + 1);
250
251
        // Generate Hash
252
        // TODO dont use just file name for hashing(multiple file versions with same
253
        // name problem)
254
        String fileId = FileManager.hash(fileName);
255
256
        try {
257
258
            // Prepare message
259
            Message msg = new Message(Message.MessageType.DELETE, id, fileId, 0, 0, new byte[0]);
260
261
            // Pack in datagram
262
            DatagramPacket packet = msg.packit(MC.address, MC.port);
263
264
            // Send message
265
            MC.socket.send(packet);
266
267
        } catch (Exception e) {
268
            e.printStackTrace();
269
        }
270
271
    }
272
273
    public void reclaim(int memory) throws RemoteException {
274
275
        // Memory can't be lower than 0
276
        if (memory < 0) {
277
            return;
278
        }
279
280
        // Assign new limit
281
        limit = memory;
282
283
        try {
284
285
            // Check need to delete chunks
286
            // Going to delete by order in the map
287
            while (used > limit) {
288
289
                // Pick an entry
290
                Map.Entry<String, Chunk> entry = chunks.entrySet().iterator().next();
291
292
                // Remove from used space
293
                used -= entry.getValue().bytes.length;
294
295
                // Prepare message
296
                Message msg = new Message(Message.MessageType.REMOVED, id, entry.getValue().fileId,
297
                        entry.getValue().chunkN, 0, new byte[0]);
298
299
                // Pack in datagram
300
                DatagramPacket packet = msg.packit(MC.address, MC.port);
301
302
                // Send message
303
                MC.socket.send(packet);
304
305
                // Remove from map
306
                chunks.remove(entry.getKey());
307
308
                // Remove from file system
309
                Path current = Paths.get("");
310
                String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\backup\\" + entry.getValue().fileId + "\\chk" + entry.getValue().chunkN + ".chk";
311
                File toDelete = new File(place);
312
                toDelete.delete();
313
314
            }
315
316
        } catch (Exception e) {
317
            e.printStackTrace();
318
        }
319
320
    }
321
322
    public void state() throws RemoteException {
323
324
    }
325
326
    // Get information of previous peer life from file system
327
    public void loadLocal() {
328
329
        // Base path
330
        Path currentRelativePath = Paths.get("");
331
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
332
333
        // Backup path
334
        File file = new File(peerDir + "\\backup");
335
336
        // FileIds backed up
337
        File[] files = file.listFiles(File::isDirectory);
338
339
        // Peer didn't exist, just return
340
        if (files == null)
341
            return;
342
343
        // Each file directory
344
        for (int i = 0; i < files.length; i++) {
345
346
            // FileId
347
            String fileId = files[i].getName();
348
349
            // Get Chunks inside
350
            file = new File(peerDir + "\\backup" + "\\" + fileId);
351
            File[] chunks = file.listFiles();
352
353
            // No chunks, just return
354
            if (chunks == null)
355
                return;
356
357
            // Each chunk
358
            for (int j = 0; j < chunks.length; j++) {
359
360
                // Chunk name
361
                String chunkNo = chunks[j].getName();
362
363
                // Remove chk
364
                chunkNo = chunkNo.replaceAll("\\.", "");
365
                chunkNo = chunkNo.replaceAll("chk", "");
366
367
                try {
368
369
                    // Read bytes
370
                    byte[] bytes = Files.readAllBytes(chunks[j].toPath());
371
372
                    // Updated used bytes
373
                    used += bytes.length;
374
375
                    // Add to HashMap
376
                    this.chunks.put(fileId + "-" + chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), bytes, 0));
377
378
                } catch (Exception e) {
379
                    e.printStackTrace();
380
                }
381
382
            }
383
384
        }
385
386
    }
387
388
    // Setup directory
389
    public void setupDirectory() {
390
391
        // Base path
392
        Path currentRelativePath = Paths.get("");
393
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
394
395
        // Backup path
396
        File file = new File(peerDir + "\\backup");
397
        file.mkdirs();
398
399
        // Backup path
400
        file = new File(peerDir + "\\restored");
401
        file.mkdirs();
402
403
    }
404
405
    // Default Socket Setup
406
    public void setupSockets() throws IOException {
407
408
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
409
410
        MC = new Socket(this, Socket.Type.MC);
411
        pool.execute(MC);
412
413
        MDB = new Socket(this, Socket.Type.MDB);
414
        pool.execute(MDB);
415
416
        MDR = new Socket(this, Socket.Type.MDR);
417
        pool.execute(MDR);
418
419
    }
420
421
    // Specifc Socket Setup
422
    public void setupSockets(int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
423
            String addressMDR) throws IOException {
424
425
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
426
427
        MC = new Socket(this, Socket.Type.MC, portMC, addressMC);
428
        pool.execute(MC);
429
430
        MDB = new Socket(this, Socket.Type.MDB, portMDB, addressMDB);
431
        pool.execute(MDB);
432
433
        MDR = new Socket(this, Socket.Type.MDR, portMDR, addressMDR);
434
        pool.execute(MDR);
435
436
    }
437
438
    public static void main(String args[]) {
439
440
        // TODO Arguments are not correct, need to receive all the specific socket
441
        // arguments too, version and server(RMI)
442
443
        // Check arguments
444
        if (args.length < 1) {
445
            System.err.println("Invalid number of arguments");
446
            return;
447
        }
448
449
        try {
450
451
            // Create Server
452
            Server obj = new Server(args[0]);
453
454
            // Bind the server as a registry
455
            PeerInterface stub = (PeerInterface) UnicastRemoteObject.exportObject(obj, 0);
456
457
            // Bind the remote object's stub in the registry
458
459
            // TODO Get registry what port?
460
            Registry registry = LocateRegistry.getRegistry();
461
            registry.bind(args[0], stub);
462
463
            System.err.println("Server ready");
464
465
        } catch (Exception e) {
466
467
            System.err.println("Server exception: " + e.toString());
468
            e.printStackTrace();
469
470
        }
471
    }
472
473
}