Project

General

Profile

Statistics
| Revision:

root / Server.java @ 2

History | View | Annotate | Download (15 KB)

1
import java.rmi.registry.Registry;
2
import java.rmi.registry.LocateRegistry;
3
import java.rmi.RemoteException;
4
import java.rmi.server.UnicastRemoteObject;
5

    
6
import java.util.Map;
7

    
8
import java.util.concurrent.ConcurrentHashMap;
9
import java.util.concurrent.ScheduledThreadPoolExecutor;
10
import java.util.concurrent.TimeUnit;
11
import java.util.concurrent.atomic.AtomicBoolean;
12
import java.util.concurrent.Executors;
13
import java.util.concurrent.CopyOnWriteArrayList;;
14
import java.util.ArrayList;
15
import java.util.List;
16
import java.util.Comparator;
17
import java.util.HashSet;
18
import java.util.Collections;
19

    
20
import java.io.*;
21

    
22
import java.net.DatagramPacket;
23

    
24
import java.nio.file.*;
25

    
26
//Server class essentially a Peer
27
public class Server implements PeerInterface {
28

    
29
    // Unique peer id
30
    public String id;
31

    
32
    // Size controls
33
    public static final int DEFAULT_MAX_SIZE = 1000000000;
34
    public int limit;
35
    public int used;
36

    
37
    // Thread Pool
38
    public static ScheduledThreadPoolExecutor pool;
39

    
40
    // Sockets
41
    public static Socket MC;
42
    public static Socket MDB;
43
    public static Socket MDR;
44

    
45
    // State Backup
46
    public boolean doingBackUp;
47
    public String file;
48
    public int replications;
49

    
50
    // State Restore
51
    public boolean restoring;
52
    public AtomicBoolean waitRestoreReply = new AtomicBoolean(false);
53
    public ConcurrentHashMap<String, Chunk> recoveredChunks = new ConcurrentHashMap<String, Chunk>(); // ChunkNo ->
54
                                                                                                      // Chunk
55

    
56
    // Service Management Information
57
    public ConcurrentHashMap<String, Chunk> chunks = new ConcurrentHashMap<String, Chunk>(); // fileId-ChunkNo
58
                                                                                             // -> Chunk
59
    public ConcurrentHashMap<String, String> files = new ConcurrentHashMap<String, String>(); // fileId -> // file name
60

    
61
    // Empty Constructor
62
    public Server() {
63
    }
64

    
65
    // Simplified Constructor(Default values)
66
    public Server(String id) throws IOException {
67
        this.used = 0;
68
        this.id = id;
69
        this.limit = DEFAULT_MAX_SIZE;
70
        this.setupSockets();
71
        this.loadLocal();
72
        this.setupDirectory();
73
    }
74

    
75
    // Full constructor
76
    public Server(String id, int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
77
            String addressMDR) throws IOException {
78
        this.used = 0;
79
        this.id = id;
80
        this.limit = DEFAULT_MAX_SIZE;
81
        this.setupSockets(portMC, addressMC, portMDB, addressMDB, portMDR, addressMDR);
82
        this.loadLocal();
83
        this.setupDirectory();
84
    }
85

    
86
    // Backup Protocol
87
    public void backup(String path, int replications) throws RemoteException {
88

    
89
        // Start backup
90
        this.doingBackUp = true;
91

    
92
        // Extract file name
93
        int index = path.lastIndexOf("\\");
94
        String fileName = path.substring(index + 1);
95

    
96
        // Generate Hash from file name and date
97
        String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified()));
98

    
99
        // Is file backed up already?
100
        if (this.files.get(fileId) != null) {
101
            System.out.println("File already backed up");
102
            return;
103
        }
104

    
105
        // Add to file list
106
        this.files.put(fileId, fileName);
107

    
108
        try {
109

    
110
            // Get file bytes
111
            byte[] bytes = FileManager.readFile(path);
112

    
113
            // Get file in chunks
114
            byte[][] chunks = FileManager.split(bytes);
115

    
116
            // Send each chunk
117
            for (int i = 0; i < chunks.length; i++) {
118

    
119
                // Base receive time of 1 second
120
                int receiveTime = 1000;
121

    
122
                // PUTCHUNK tries
123
                for (int j = 0; j < 5; j++) {
124

    
125
                    // Prepare message
126
                    Message msg = new Message(Message.MessageType.PUTCHUNK, id, fileId, i, replications, chunks[i]);
127

    
128
                    // Pack in datagram
129
                    DatagramPacket packet = msg.packit(MDB.address, MDB.port);
130

    
131
                    // Send message
132
                    MDB.socket.send(packet);
133

    
134
                    // Wait for replies and check Replication level
135
                    this.replications = 0;
136
                    this.file = fileId;
137
                    Thread.sleep(receiveTime);
138
                    if (this.replications >= replications) {
139
                        break;
140
                    }
141

    
142
                    // Double receive time
143
                    receiveTime *= 2;
144

    
145
                }
146

    
147
            }
148

    
149
        } catch (Exception e) {
150
            e.printStackTrace();
151
        }
152

    
153
        // Terminate backup
154
        this.doingBackUp = false;
155

    
156
    }
157

    
158
    // Restore Protocol
159
    public void restore(String path) throws RemoteException {
160

    
161
        this.restoring = true;
162

    
163
        // File name
164
        int index = path.lastIndexOf("\\");
165
        String fileName = path.substring(index + 1);
166

    
167
        // Generate Hash from file name and date
168
        String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified()));
169

    
170
        // Find in list of backed up
171
        String name = this.files.get(fileId);
172

    
173
        // File not backed up yet, just return
174
        if (name == null) {
175
            System.out.println("File not backed up by this peer");
176
            return;
177
        }
178

    
179
        try {
180

    
181
            // Get byte count (probably better than actually fetching the bytes array)
182
            File file = new File(path);
183
            int bytes = (int) file.length();
184

    
185
            // How many chunks to expect
186
            int chunks = 1 + bytes / FileManager.chunkSize;
187

    
188
            // Reset list of Chunks recovered
189
            this.recoveredChunks.clear();
190

    
191
            // Request chunks
192
            for (int i = 0; i < chunks; i++) {
193

    
194
                // Prepare message
195
                Message msg = new Message(Message.MessageType.GETCHUNK, id, fileId, i, 0, new byte[0]);
196

    
197
                // Pack in datagram
198
                DatagramPacket packet = msg.packit(MC.address, MC.port);
199

    
200
                // Send message
201
                MC.socket.send(packet);
202

    
203
                // Wait for response
204
                this.waitRestoreReply.set(true);
205
                long time = System.nanoTime();
206
                while (this.waitRestoreReply.get()) {
207
                    // Restore timeout
208
                    if (TimeUnit.NANOSECONDS.toMillis(time) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) > 1000) {
209
                        System.out.println("Restore timeout, missing chunks");
210
                        return;
211
                    }
212
                }
213

    
214
            }
215

    
216
            // Get simple list from recovered chunks
217
            List<Chunk> list = new ArrayList<Chunk>(this.recoveredChunks.values());
218

    
219
            // Sort chunks
220
            Collections.sort(list, new Comparator<Chunk>() {
221
                @Override
222
                public int compare(Chunk p1, Chunk p2) {
223
                    return p1.chunkN - p2.chunkN; // Ascending
224
                }
225
            });
226

    
227
            // Pack chunks
228
            byte[][] allBytes = new byte[list.size()][];
229
            for (int i = 0; i < list.size(); i++) {
230
                allBytes[i] = list.get(i).bytes;
231
            }
232

    
233
            // Merge chunks
234
            byte[] fileBytes = FileManager.merge(allBytes);
235

    
236
            // Save file
237
            Path current = Paths.get("");
238
            String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\restored\\" + fileName;
239
            try (FileOutputStream stream = new FileOutputStream(place)) {
240
                stream.write(fileBytes);
241
                stream.close();
242
            }
243

    
244
        } catch (Exception e) {
245
            e.printStackTrace();
246
        }
247

    
248
        this.restoring = false;
249

    
250
    }
251

    
252
    // Delete chunks associated to this file
253
    public void delete(String path) throws RemoteException {
254

    
255
        // File name
256
        int index = path.lastIndexOf("\\");
257
        String fileName = path.substring(index + 1);
258

    
259
        // Generate Hash from file name and date
260
        String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified()));
261

    
262
        // Remove from list of backed up
263
        String name = this.files.get(fileId);
264
        if (name != null) {
265
            this.files.remove(fileId);
266
        }
267

    
268
        try {
269

    
270
            // Prepare message
271
            Message msg = new Message(Message.MessageType.DELETE, id, fileId, 0, 0, new byte[0]);
272

    
273
            // Pack in datagram
274
            DatagramPacket packet = msg.packit(MC.address, MC.port);
275

    
276
            // Send message
277
            MC.socket.send(packet);
278

    
279
        } catch (Exception e) {
280
            e.printStackTrace();
281
        }
282

    
283
    }
284

    
285
    public void reclaim(int memory) throws RemoteException {
286

    
287
        // Memory can't be lower than 0
288
        if (memory < 0) {
289
            return;
290
        }
291

    
292
        // Assign new limit
293
        limit = memory;
294

    
295
        try {
296

    
297
            // Check need to delete chunks
298
            // Going to delete by order in the map
299
            while (used > limit) {
300

    
301
                // Pick an entry
302
                Map.Entry<String, Chunk> entry = chunks.entrySet().iterator().next();
303

    
304
                // Remove from used space
305
                used -= entry.getValue().bytes.length;
306

    
307
                // Prepare message
308
                Message msg = new Message(Message.MessageType.REMOVED, id, entry.getValue().fileId,
309
                        entry.getValue().chunkN, 0, new byte[0]);
310

    
311
                // Pack in datagram
312
                DatagramPacket packet = msg.packit(MC.address, MC.port);
313

    
314
                // Send message
315
                MC.socket.send(packet);
316

    
317
                // Remove from map
318
                chunks.remove(entry.getKey());
319

    
320
                // Remove from file system
321
                Path current = Paths.get("");
322
                String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\backup\\"
323
                        + entry.getValue().fileId + "\\chk" + entry.getValue().chunkN + ".chk";
324
                File toDelete = new File(place);
325
                toDelete.delete();
326

    
327
            }
328

    
329
        } catch (Exception e) {
330
            e.printStackTrace();
331
        }
332

    
333
    }
334

    
335
    public void state() throws RemoteException {
336

    
337
    }
338

    
339
    // Get information of previous peer life from file system
340
    public void loadLocal() {
341

    
342
        // Base path
343
        Path currentRelativePath = Paths.get("");
344
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
345

    
346
        // Backup path
347
        File file = new File(peerDir + "\\backup");
348

    
349
        // FileIds backed up
350
        File[] files = file.listFiles(File::isDirectory);
351

    
352
        // Peer didn't exist, just return
353
        if (files == null)
354
            return;
355

    
356
        // Each file directory
357
        for (int i = 0; i < files.length; i++) {
358

    
359
            // FileId
360
            String fileId = files[i].getName();
361

    
362
            // Get Chunks inside
363
            file = new File(peerDir + "\\backup" + "\\" + fileId);
364
            File[] chunks = file.listFiles();
365

    
366
            // No chunks, just return
367
            if (chunks == null)
368
                return;
369

    
370
            // Each chunk
371
            for (int j = 0; j < chunks.length; j++) {
372

    
373
                // Chunk name
374
                String chunkNo = chunks[j].getName();
375

    
376
                // Remove chk
377
                chunkNo = chunkNo.replaceAll("\\.", "");
378
                chunkNo = chunkNo.replaceAll("chk", "");
379

    
380
                try {
381

    
382
                    // Read bytes
383
                    byte[] bytes = Files.readAllBytes(chunks[j].toPath());
384

    
385
                    // Updated used bytes
386
                    used += bytes.length;
387

    
388
                    // Add to HashMap
389
                    this.chunks.put(fileId + "-" + chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), bytes, 0));
390

    
391
                } catch (Exception e) {
392
                    e.printStackTrace();
393
                }
394

    
395
            }
396

    
397
        }
398

    
399
    }
400

    
401
    // Setup directory
402
    public void setupDirectory() {
403

    
404
        // Base path
405
        Path currentRelativePath = Paths.get("");
406
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
407

    
408
        // Backup path
409
        File file = new File(peerDir + "\\backup");
410
        file.mkdirs();
411

    
412
        // Backup path
413
        file = new File(peerDir + "\\restored");
414
        file.mkdirs();
415

    
416
    }
417

    
418
    // Default Socket Setup
419
    public void setupSockets() throws IOException {
420

    
421
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
422

    
423
        MC = new Socket(this, Socket.Type.MC);
424
        pool.execute(MC);
425

    
426
        MDB = new Socket(this, Socket.Type.MDB);
427
        pool.execute(MDB);
428

    
429
        MDR = new Socket(this, Socket.Type.MDR);
430
        pool.execute(MDR);
431

    
432
    }
433

    
434
    // Specifc Socket Setup
435
    public void setupSockets(int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
436
            String addressMDR) throws IOException {
437

    
438
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
439

    
440
        MC = new Socket(this, Socket.Type.MC, portMC, addressMC);
441
        pool.execute(MC);
442

    
443
        MDB = new Socket(this, Socket.Type.MDB, portMDB, addressMDB);
444
        pool.execute(MDB);
445

    
446
        MDR = new Socket(this, Socket.Type.MDR, portMDR, addressMDR);
447
        pool.execute(MDR);
448

    
449
    }
450

    
451
    public static void main(String args[]) {
452

    
453
        try {
454

    
455
            // Server object
456
            Server obj;
457

    
458
            // Basic (version id)
459
            if (args.length == 2) {
460
                obj = new Server(args[1]);
461
            }
462
            // Full (version id access port mc port mdb port mdr)
463
            else if (args.length == 9) {
464
                obj = new Server(args[1], Integer.parseInt(args[3]), args[4], Integer.parseInt(args[5]), args[6],
465
                        Integer.parseInt(args[7]), args[8]);
466
            }
467
            // Invalid arguments
468
            else {
469
                System.out.println("java Server <version> <server id>");
470
                System.out.println(
471
                        "java Server <version> <server id> <access_point> <MC_port> <MC_IP_address> <MDB_port> <MDB_IP_address> <MDR_port> <MDR_IP_address>");
472
                return;
473
            }
474

    
475
            // Bind the server as a registry
476
            PeerInterface stub = (PeerInterface) UnicastRemoteObject.exportObject(obj, 0);
477

    
478
            // Bind the remote object's stub in the registry
479
            Registry registry = LocateRegistry.getRegistry();
480
            registry.bind(obj.id, stub);
481

    
482
            System.err.println("Server ready");
483

    
484
        } catch (Exception e) {
485

    
486
            System.err.println("Server exception: " + e.toString());
487
            e.printStackTrace();
488

    
489
        }
490
    }
491

    
492
}