Project

General

Profile

Statistics
| Revision:

root / src / Server.java

History | View | Annotate | Download (14.8 KB)

1
import java.rmi.registry.Registry;
2
import java.rmi.registry.LocateRegistry;
3
import java.rmi.RemoteException;
4
import java.rmi.server.UnicastRemoteObject;
5

    
6
import java.util.Map;
7

    
8
import java.util.concurrent.ConcurrentHashMap;
9
import java.util.concurrent.ScheduledThreadPoolExecutor;
10
import java.util.concurrent.TimeUnit;
11
import java.util.concurrent.atomic.AtomicBoolean;
12
import java.util.concurrent.Executors;
13
import java.util.concurrent.CopyOnWriteArrayList;;
14
import java.util.ArrayList;
15
import java.util.List;
16
import java.util.Comparator;
17
import java.util.HashSet;
18
import java.util.Collections;
19

    
20
import java.io.*;
21

    
22
import java.net.DatagramPacket;
23

    
24
import java.nio.file.*;
25

    
26
//Server class essentially a Peer
27
public class Server implements PeerInterface {
28

    
29
    // Unique peer id
30
    public String id;
31

    
32
    // Size controls
33
    public static final int DEFAULT_MAX_SIZE = 1000000000;
34
    public int limit;
35
    public int used;
36

    
37
    // Thread Pool
38
    public static ScheduledThreadPoolExecutor pool;
39

    
40
    // Sockets
41
    public static Socket MC;
42
    public static Socket MDB;
43
    public static Socket MDR;
44

    
45
    // State Backup
46
    public boolean doingBackUp;
47
    public String file;
48
    public int replications;
49

    
50
    // State Restore
51
    public boolean restoring;
52
    public AtomicBoolean waitRestoreReply = new AtomicBoolean(false);
53
    public ConcurrentHashMap<String, Chunk> recoveredChunks = new ConcurrentHashMap<String, Chunk>(); // ChunkNo ->
54
                                                                                                      // Chunk
55

    
56
    // Service Management Information
57
    public ConcurrentHashMap<String, Chunk> chunks = new ConcurrentHashMap<String, Chunk>(); // fileId-ChunkNo
58
                                                                                             // -> Chunk
59
    public ConcurrentHashMap<String, String> files = new ConcurrentHashMap<String, String>(); // fileId -> // file name
60

    
61
    // Empty Constructor
62
    public Server() {
63
    }
64

    
65
    // Simplified Constructor(Default values)
66
    public Server(String id) throws IOException {
67
        this.used = 0;
68
        this.id = id;
69
        this.limit = DEFAULT_MAX_SIZE;
70
        this.setupSockets();
71
        this.loadLocal();
72
        this.setupDirectory();
73
    }
74

    
75
    // Full constructor
76
    public Server(String id, int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
77
            String addressMDR) throws IOException {
78
        this.used = 0;
79
        this.id = id;
80
        this.limit = DEFAULT_MAX_SIZE;
81
        this.setupSockets(portMC, addressMC, portMDB, addressMDB, portMDR, addressMDR);
82
        this.loadLocal();
83
        this.setupDirectory();
84
    }
85

    
86
    // Backup Protocol
87
    public void backup(String path, int replications) throws RemoteException {
88

    
89
        // Start backup
90
        this.doingBackUp = true;
91

    
92
        // Extract file name
93
        String fileName = new File(path).getName();
94

    
95
        // Generate Hash from file name and date
96
        String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified()));
97

    
98
        // Is file backed up already?
99
        if (this.files.get(fileId) != null) {
100
            System.out.println("File already backed up");
101
            return;
102
        }
103

    
104
        // Add to file list
105
        this.files.put(fileId, fileName);
106

    
107
        try {
108

    
109
            // Get file bytes
110
            byte[] bytes = FileManager.readFile(path);
111

    
112
            // Get file in chunks
113
            byte[][] chunks = FileManager.split(bytes);
114

    
115
            // Send each chunk
116
            for (int i = 0; i < chunks.length; i++) {
117

    
118
                // Base receive time of 1 second
119
                int receiveTime = 1000;
120

    
121
                // PUTCHUNK tries
122
                for (int j = 0; j < 5; j++) {
123

    
124
                    // Prepare message
125
                    Message msg = new Message(Message.MessageType.PUTCHUNK, id, fileId, i, replications, chunks[i]);
126

    
127
                    // Pack in datagram
128
                    DatagramPacket packet = msg.packit(MDB.address, MDB.port);
129

    
130
                    // Send message
131
                    MDB.socket.send(packet);
132

    
133
                    // Wait for replies and check Replication level
134
                    this.replications = 0;
135
                    this.file = fileId;
136
                    Thread.sleep(receiveTime);
137
                    if (this.replications >= replications) {
138
                        break;
139
                    }
140

    
141
                    // Double receive time
142
                    receiveTime *= 2;
143

    
144
                }
145

    
146
            }
147

    
148
        } catch (Exception e) {
149
            e.printStackTrace();
150
        }
151

    
152
        // Terminate backup
153
        this.doingBackUp = false;
154

    
155
    }
156

    
157
    // Restore Protocol
158
    public void restore(String path) throws RemoteException {
159

    
160
        this.restoring = true;
161

    
162
        // File name
163
        String fileName = new File(path).getName();
164

    
165
        // Generate Hash from file name and date
166
        String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified()));
167

    
168
        // Find in list of backed up
169
        String name = this.files.get(fileId);
170

    
171
        // File not backed up yet, just return
172
        if (name == null) {
173
            System.out.println("File not backed up by this peer");
174
            return;
175
        }
176

    
177
        try {
178

    
179
            // Get byte count (probably better than actually fetching the bytes array)
180
            File file = new File(path);
181
            int bytes = (int) file.length();
182

    
183
            // How many chunks to expect
184
            int chunks = 1 + bytes / FileManager.chunkSize;
185

    
186
            // Reset list of Chunks recovered
187
            this.recoveredChunks.clear();
188

    
189
            // Request chunks
190
            for (int i = 0; i < chunks; i++) {
191

    
192
                // Prepare message
193
                Message msg = new Message(Message.MessageType.GETCHUNK, id, fileId, i, 0, new byte[0]);
194

    
195
                // Pack in datagram
196
                DatagramPacket packet = msg.packit(MC.address, MC.port);
197

    
198
                // Send message
199
                MC.socket.send(packet);
200

    
201
                // Wait for response
202
                this.waitRestoreReply.set(true);
203
                long time = System.nanoTime();
204
                while (this.waitRestoreReply.get()) {
205
                    // Restore timeout
206
                    if (TimeUnit.NANOSECONDS.toMillis(time) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) > 1000) {
207
                        System.out.println("Restore timeout, missing chunks");
208
                        return;
209
                    }
210
                }
211

    
212
            }
213

    
214
            // Get simple list from recovered chunks
215
            List<Chunk> list = new ArrayList<Chunk>(this.recoveredChunks.values());
216

    
217
            // Sort chunks
218
            Collections.sort(list, new Comparator<Chunk>() {
219
                @Override
220
                public int compare(Chunk p1, Chunk p2) {
221
                    return p1.chunkN - p2.chunkN; // Ascending
222
                }
223
            });
224

    
225
            // Pack chunks
226
            byte[][] allBytes = new byte[list.size()][];
227
            for (int i = 0; i < list.size(); i++) {
228
                allBytes[i] = list.get(i).bytes;
229
            }
230

    
231
            // Merge chunks
232
            byte[] fileBytes = FileManager.merge(allBytes);
233

    
234
            // Save file
235
            Path current = Paths.get("");
236
            String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\restored\\" + fileName;
237
            try (FileOutputStream stream = new FileOutputStream(place)) {
238
                stream.write(fileBytes);
239
                stream.close();
240
            }
241

    
242
        } catch (Exception e) {
243
            e.printStackTrace();
244
        }
245

    
246
        this.restoring = false;
247

    
248
    }
249

    
250
    // Delete chunks associated to this file
251
    public void delete(String path) throws RemoteException {
252

    
253
        // File name
254
        String fileName = new File(path).getName();
255

    
256
        // Generate Hash from file name and date
257
        String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified()));
258

    
259
        // Remove from list of backed up
260
        String name = this.files.get(fileId);
261
        if (name != null) {
262
            this.files.remove(fileId);
263
        }
264

    
265
        try {
266

    
267
            // Prepare message
268
            Message msg = new Message(Message.MessageType.DELETE, id, fileId, 0, 0, new byte[0]);
269

    
270
            // Pack in datagram
271
            DatagramPacket packet = msg.packit(MC.address, MC.port);
272

    
273
            // Send message
274
            MC.socket.send(packet);
275

    
276
        } catch (Exception e) {
277
            e.printStackTrace();
278
        }
279

    
280
    }
281

    
282
    public void reclaim(int memory) throws RemoteException {
283

    
284
        // Memory can't be lower than 0
285
        if (memory < 0) {
286
            return;
287
        }
288

    
289
        // Assign new limit
290
        limit = memory;
291

    
292
        try {
293

    
294
            // Check need to delete chunks
295
            // Going to delete by order in the map
296
            while (used > limit) {
297

    
298
                // Pick an entry
299
                Map.Entry<String, Chunk> entry = chunks.entrySet().iterator().next();
300

    
301
                // Remove from used space
302
                used -= entry.getValue().bytes.length;
303

    
304
                // Prepare message
305
                Message msg = new Message(Message.MessageType.REMOVED, id, entry.getValue().fileId,
306
                        entry.getValue().chunkN, 0, new byte[0]);
307

    
308
                // Pack in datagram
309
                DatagramPacket packet = msg.packit(MC.address, MC.port);
310

    
311
                // Send message
312
                MC.socket.send(packet);
313

    
314
                // Remove from map
315
                chunks.remove(entry.getKey());
316

    
317
                // Remove from file system
318
                Path current = Paths.get("");
319
                String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\backup\\"
320
                        + entry.getValue().fileId + "\\chk" + entry.getValue().chunkN + ".chk";
321
                File toDelete = new File(place);
322
                toDelete.delete();
323

    
324
            }
325

    
326
        } catch (Exception e) {
327
            e.printStackTrace();
328
        }
329

    
330
    }
331

    
332
    public void state() throws RemoteException {
333

    
334
    }
335

    
336
    // Get information of previous peer life from file system
337
    public void loadLocal() {
338

    
339
        // Base path
340
        Path currentRelativePath = Paths.get("");
341
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
342

    
343
        // Backup path
344
        File file = new File(peerDir + "\\backup");
345

    
346
        // FileIds backed up
347
        File[] files = file.listFiles(File::isDirectory);
348

    
349
        // Peer didn't exist, just return
350
        if (files == null)
351
            return;
352

    
353
        // Each file directory
354
        for (int i = 0; i < files.length; i++) {
355

    
356
            // FileId
357
            String fileId = files[i].getName();
358

    
359
            // Get Chunks inside
360
            file = new File(peerDir + "\\backup" + "\\" + fileId);
361
            File[] chunks = file.listFiles();
362

    
363
            // No chunks, just return
364
            if (chunks == null)
365
                return;
366

    
367
            // Each chunk
368
            for (int j = 0; j < chunks.length; j++) {
369

    
370
                // Chunk name
371
                String chunkNo = chunks[j].getName();
372

    
373
                // Remove chk
374
                chunkNo = chunkNo.replaceAll("\\.", "");
375
                chunkNo = chunkNo.replaceAll("chk", "");
376

    
377
                try {
378

    
379
                    // Read bytes
380
                    byte[] bytes = Files.readAllBytes(chunks[j].toPath());
381

    
382
                    // Updated used bytes
383
                    used += bytes.length;
384

    
385
                    // Add to HashMap
386
                    this.chunks.put(fileId + "-" + chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), bytes, 0));
387

    
388
                } catch (Exception e) {
389
                    e.printStackTrace();
390
                }
391

    
392
            }
393

    
394
        }
395

    
396
    }
397

    
398
    // Setup directory
399
    public void setupDirectory() {
400

    
401
        // Base path
402
        Path currentRelativePath = Paths.get("");
403
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
404

    
405
        // Backup path
406
        File file = new File(peerDir + "\\backup");
407
        file.mkdirs();
408

    
409
        // Backup path
410
        file = new File(peerDir + "\\restored");
411
        file.mkdirs();
412

    
413
    }
414

    
415
    // Default Socket Setup
416
    public void setupSockets() throws IOException {
417

    
418
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
419

    
420
        MC = new Socket(this, Socket.Type.MC);
421
        pool.execute(MC);
422

    
423
        MDB = new Socket(this, Socket.Type.MDB);
424
        pool.execute(MDB);
425

    
426
        MDR = new Socket(this, Socket.Type.MDR);
427
        pool.execute(MDR);
428

    
429
    }
430

    
431
    // Specifc Socket Setup
432
    public void setupSockets(int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
433
            String addressMDR) throws IOException {
434

    
435
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
436

    
437
        MC = new Socket(this, Socket.Type.MC, portMC, addressMC);
438
        pool.execute(MC);
439

    
440
        MDB = new Socket(this, Socket.Type.MDB, portMDB, addressMDB);
441
        pool.execute(MDB);
442

    
443
        MDR = new Socket(this, Socket.Type.MDR, portMDR, addressMDR);
444
        pool.execute(MDR);
445

    
446
    }
447

    
448
    public static void main(String args[]) {
449

    
450
        try {
451

    
452
            // Server object
453
            Server obj;
454

    
455
            // Basic (version id)
456
            if (args.length == 2) {
457
                obj = new Server(args[1]);
458
            }
459
            // Full (version id access port mc port mdb port mdr)
460
            else if (args.length == 9) {
461
                obj = new Server(args[1], Integer.parseInt(args[3]), args[4], Integer.parseInt(args[5]), args[6],
462
                        Integer.parseInt(args[7]), args[8]);
463
            }
464
            // Invalid arguments
465
            else {
466
                System.out.println("java Server <version> <server id>");
467
                System.out.println(
468
                        "java Server <version> <server id> <access_point> <MC_port> <MC_IP_address> <MDB_port> <MDB_IP_address> <MDR_port> <MDR_IP_address>");
469
                return;
470
            }
471

    
472
            // Bind the server as a registry
473
            PeerInterface stub = (PeerInterface) UnicastRemoteObject.exportObject(obj, 0);
474

    
475
            // Bind the remote object's stub in the registry
476
            Registry registry = LocateRegistry.getRegistry();
477
            registry.bind(obj.id, stub);
478

    
479
            System.err.println("Server ready");
480

    
481
        } catch (Exception e) {
482

    
483
            System.err.println("Server exception: " + e.toString());
484
            e.printStackTrace();
485

    
486
        }
487
    }
488

    
489
}