Project

General

Profile

Statistics
| Revision:

root / Server.java @ 1

History | View | Annotate | Download (13.8 KB)

1
import java.rmi.registry.Registry;
2
import java.rmi.registry.LocateRegistry;
3
import java.rmi.RemoteException;
4
import java.rmi.server.UnicastRemoteObject;
5

    
6
import java.util.Map;
7

    
8
import java.util.concurrent.ConcurrentHashMap;
9
import java.util.concurrent.ScheduledThreadPoolExecutor;
10
import java.util.concurrent.TimeUnit;
11
import java.util.concurrent.atomic.AtomicBoolean;
12
import java.util.concurrent.Executors;
13
import java.util.concurrent.CopyOnWriteArrayList;;
14
import java.util.ArrayList;
15
import java.util.List;
16
import java.util.Comparator;
17
import java.util.HashSet;
18
import java.util.Collections;
19

    
20
import java.io.*;
21

    
22
import java.net.DatagramPacket;
23

    
24
import java.nio.file.*;
25

    
26
//Server class essentially a Peer
27
public class Server implements PeerInterface {
28

    
29
    // Unique peer id
30
    public String id;
31

    
32
    // Size controls
33
    public static final int DEFAULT_MAX_SIZE = 1000000000;
34
    public int limit;
35
    public int used;
36

    
37
    // Thread Pool
38
    public static ScheduledThreadPoolExecutor pool;
39

    
40
    // Sockets
41
    public static Socket MC;
42
    public static Socket MDB;
43
    public static Socket MDR;
44

    
45
    // State Backup
46
    public boolean doingBackUp;
47
    public String file;
48
    public int replications;
49

    
50
    // State Restore
51
    public boolean restoring;
52
    public AtomicBoolean waitRestoreReply = new AtomicBoolean(false);
53
    public ConcurrentHashMap<String, Chunk> recoveredChunks = new ConcurrentHashMap<String, Chunk>(); // ChunkNo -> Chunk
54

    
55
    // Service Management Information
56
    public ConcurrentHashMap<String, Chunk> chunks = new ConcurrentHashMap<String, Chunk>(); // fileId-ChunkNo
57
                                                                                             // -> Chunk
58
    public ConcurrentHashMap<String, String> files = new ConcurrentHashMap<String, String>(); // fileId -> // file name
59

    
60
    // Empty Constructor
61
    public Server() {
62
    }
63

    
64
    // Simplified Constructor(Default values)
65
    public Server(String id) throws IOException {
66
        this.used = 0;
67
        this.id = id;
68
        this.limit = DEFAULT_MAX_SIZE;
69
        this.setupSockets();
70
        this.loadLocal();
71
        this.setupDirectory();
72
    }
73

    
74
    // Full constructor
75
    public Server(String id, int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
76
            String addressMDR) throws IOException {
77
        this.used = 0;
78
        this.id = id;
79
        this.limit = DEFAULT_MAX_SIZE;
80
        this.setupSockets(portMC, addressMC, portMDB, addressMDB, portMDR, addressMDR);
81
        this.loadLocal();
82
        this.setupDirectory();
83
    }
84

    
85
    // Backup Protocol
86
    public void backup(String path, int replications) throws RemoteException {
87

    
88
        // TODO is file backed up already?
89

    
90
        // Start backup
91
        this.doingBackUp = true;
92

    
93
        // Extract file name
94
        int index = path.lastIndexOf("\\");
95
        String fileName = path.substring(index + 1);
96

    
97
        // Generate Hash
98
        // TODO dont use just file name for hashing(multiple file versions with same
99
        // name problem)
100
        String fileId = FileManager.hash(fileName);
101

    
102
        // Add to file list
103
        this.files.put(fileId, fileName);
104

    
105
        try {
106

    
107
            // Get file bytes
108
            byte[] bytes = FileManager.readFile(path);
109

    
110
            // Get file in chunks
111
            byte[][] chunks = FileManager.split(bytes);
112

    
113
            // Send each chunk
114
            for (int i = 0; i < chunks.length; i++) {
115

    
116
                // Base receive time of 1 second
117
                int receiveTime = 1000;
118

    
119
                // PUTCHUNK tries
120
                for (int j = 0; j < 5; j++) {
121

    
122
                    // Prepare message
123
                    Message msg = new Message(Message.MessageType.PUTCHUNK, id, fileId, i, replications, chunks[i]);
124

    
125
                    // Pack in datagram
126
                    DatagramPacket packet = msg.packit(MDB.address, MDB.port);
127

    
128
                    // Send message
129
                    MDB.socket.send(packet);
130

    
131
                    // Wait for replies and check Replication level
132
                    this.replications = 0;
133
                    this.file = fileId;
134
                    Thread.sleep(receiveTime);
135
                    if (this.replications >= replications) {
136
                        break;
137
                    }
138

    
139
                    // Double receive time
140
                    receiveTime *= 2;
141

    
142
                }
143

    
144
            }
145

    
146
        } catch (Exception e) {
147
            e.printStackTrace();
148
        }
149

    
150
        // Terminate backup
151
        this.doingBackUp = false;
152

    
153
    }
154

    
155
    // Restore Protocol
156
    public void restore(String path) throws RemoteException {
157

    
158
        this.restoring = true;
159

    
160
        // File name
161
        int index = path.lastIndexOf("\\");
162
        String fileName = path.substring(index + 1);
163

    
164
        // Generate Hash
165
        // TODO dont use just file name for hashing(multiple file versions with same
166
        // name problem)
167
        String fileId = FileManager.hash(fileName);
168

    
169
        // Find in list of backed up
170
        String name = this.files.get(fileId);
171

    
172
        // File not backed up yet, just return
173
        if (name == null) {
174
            return;
175
        }
176

    
177
        try {
178

    
179
            // Get byte count (probably better than actually fetching the bytes array)
180
            File file = new File(path);
181
            int bytes = (int) file.length();
182

    
183
            // How many chunks to expect
184
            int chunks = 1 + bytes / FileManager.chunkSize;
185

    
186
            // Reset list of Chunks recovered
187
            this.recoveredChunks.clear();
188

    
189
            // Request chunks
190
            for (int i = 0; i < chunks; i++) {
191

    
192
                // Prepare message
193
                Message msg = new Message(Message.MessageType.GETCHUNK, id, fileId, i, 0, new byte[0]);
194

    
195
                // Pack in datagram
196
                DatagramPacket packet = msg.packit(MC.address, MC.port);
197

    
198
                // Send message
199
                MC.socket.send(packet);
200

    
201
                // Wait for response
202
                this.waitRestoreReply.set(true);
203
                while (this.waitRestoreReply.get()) {
204
                }
205

    
206
            }      
207

    
208
            //Get simple list from recovered chunks
209
            List<Chunk> list = new ArrayList<Chunk>(this.recoveredChunks.values());
210

    
211
            // Sort chunks
212
            Collections.sort(list, new Comparator<Chunk>() {
213
                @Override
214
                public int compare(Chunk p1, Chunk p2) {
215
                    return p1.chunkN - p2.chunkN; // Ascending
216
                }
217
            });
218

    
219
            // Pack chunks
220
            byte[][] allBytes = new byte[list.size()][];
221
            for (int i = 0; i < list.size(); i++) {
222
                allBytes[i] = list.get(i).bytes;
223
            }
224

    
225
            // Merge chunks
226
            byte[] fileBytes = FileManager.merge(allBytes);
227

    
228
            // Save file
229
            Path current = Paths.get("");
230
            String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\restored\\" + fileName;
231
            try (FileOutputStream stream = new FileOutputStream(place)) {
232
                stream.write(fileBytes);
233
                stream.close();
234
            }
235

    
236
        } catch (Exception e) {
237
            e.printStackTrace();
238
        }
239

    
240
        this.restoring = false;
241

    
242
    }
243

    
244
    // Delete chunks associated to this file
245
    public void delete(String path) throws RemoteException {
246

    
247
        // File name
248
        int index = path.lastIndexOf("\\");
249
        String fileName = path.substring(index + 1);
250

    
251
        // Generate Hash
252
        // TODO dont use just file name for hashing(multiple file versions with same
253
        // name problem)
254
        String fileId = FileManager.hash(fileName);
255

    
256
        try {
257

    
258
            // Prepare message
259
            Message msg = new Message(Message.MessageType.DELETE, id, fileId, 0, 0, new byte[0]);
260

    
261
            // Pack in datagram
262
            DatagramPacket packet = msg.packit(MC.address, MC.port);
263

    
264
            // Send message
265
            MC.socket.send(packet);
266

    
267
        } catch (Exception e) {
268
            e.printStackTrace();
269
        }
270

    
271
    }
272

    
273
    public void reclaim(int memory) throws RemoteException {
274

    
275
        // Memory can't be lower than 0
276
        if (memory < 0) {
277
            return;
278
        }
279

    
280
        // Assign new limit
281
        limit = memory;
282

    
283
        try {
284

    
285
            // Check need to delete chunks
286
            // Going to delete by order in the map
287
            while (used > limit) {
288

    
289
                // Pick an entry
290
                Map.Entry<String, Chunk> entry = chunks.entrySet().iterator().next();
291

    
292
                // Remove from used space
293
                used -= entry.getValue().bytes.length;
294

    
295
                // Prepare message
296
                Message msg = new Message(Message.MessageType.REMOVED, id, entry.getValue().fileId,
297
                        entry.getValue().chunkN, 0, new byte[0]);
298

    
299
                // Pack in datagram
300
                DatagramPacket packet = msg.packit(MC.address, MC.port);
301

    
302
                // Send message
303
                MC.socket.send(packet);
304

    
305
                // Remove from map
306
                chunks.remove(entry.getKey());
307

    
308
                // Remove from file system
309
                Path current = Paths.get("");
310
                String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\backup\\" + entry.getValue().fileId + "\\chk" + entry.getValue().chunkN + ".chk";
311
                File toDelete = new File(place);
312
                toDelete.delete();
313

    
314
            }
315

    
316
        } catch (Exception e) {
317
            e.printStackTrace();
318
        }
319

    
320
    }
321

    
322
    public void state() throws RemoteException {
323

    
324
    }
325

    
326
    // Get information of previous peer life from file system
327
    public void loadLocal() {
328

    
329
        // Base path
330
        Path currentRelativePath = Paths.get("");
331
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
332

    
333
        // Backup path
334
        File file = new File(peerDir + "\\backup");
335

    
336
        // FileIds backed up
337
        File[] files = file.listFiles(File::isDirectory);
338

    
339
        // Peer didn't exist, just return
340
        if (files == null)
341
            return;
342

    
343
        // Each file directory
344
        for (int i = 0; i < files.length; i++) {
345

    
346
            // FileId
347
            String fileId = files[i].getName();
348

    
349
            // Get Chunks inside
350
            file = new File(peerDir + "\\backup" + "\\" + fileId);
351
            File[] chunks = file.listFiles();
352

    
353
            // No chunks, just return
354
            if (chunks == null)
355
                return;
356

    
357
            // Each chunk
358
            for (int j = 0; j < chunks.length; j++) {
359

    
360
                // Chunk name
361
                String chunkNo = chunks[j].getName();
362

    
363
                // Remove chk
364
                chunkNo = chunkNo.replaceAll("\\.", "");
365
                chunkNo = chunkNo.replaceAll("chk", "");
366

    
367
                try {
368

    
369
                    // Read bytes
370
                    byte[] bytes = Files.readAllBytes(chunks[j].toPath());
371

    
372
                    // Updated used bytes
373
                    used += bytes.length;
374

    
375
                    // Add to HashMap
376
                    this.chunks.put(fileId + "-" + chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), bytes, 0));
377

    
378
                } catch (Exception e) {
379
                    e.printStackTrace();
380
                }
381

    
382
            }
383

    
384
        }
385

    
386
    }
387

    
388
    // Setup directory
389
    public void setupDirectory() {
390

    
391
        // Base path
392
        Path currentRelativePath = Paths.get("");
393
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
394

    
395
        // Backup path
396
        File file = new File(peerDir + "\\backup");
397
        file.mkdirs();
398

    
399
        // Backup path
400
        file = new File(peerDir + "\\restored");
401
        file.mkdirs();
402

    
403
    }
404

    
405
    // Default Socket Setup
406
    public void setupSockets() throws IOException {
407

    
408
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
409

    
410
        MC = new Socket(this, Socket.Type.MC);
411
        pool.execute(MC);
412

    
413
        MDB = new Socket(this, Socket.Type.MDB);
414
        pool.execute(MDB);
415

    
416
        MDR = new Socket(this, Socket.Type.MDR);
417
        pool.execute(MDR);
418

    
419
    }
420

    
421
    // Specifc Socket Setup
422
    public void setupSockets(int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
423
            String addressMDR) throws IOException {
424

    
425
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
426

    
427
        MC = new Socket(this, Socket.Type.MC, portMC, addressMC);
428
        pool.execute(MC);
429

    
430
        MDB = new Socket(this, Socket.Type.MDB, portMDB, addressMDB);
431
        pool.execute(MDB);
432

    
433
        MDR = new Socket(this, Socket.Type.MDR, portMDR, addressMDR);
434
        pool.execute(MDR);
435

    
436
    }
437

    
438
    public static void main(String args[]) {
439

    
440
        // TODO Arguments are not correct, need to receive all the specific socket
441
        // arguments too, version and server(RMI)
442

    
443
        // Check arguments
444
        if (args.length < 1) {
445
            System.err.println("Invalid number of arguments");
446
            return;
447
        }
448

    
449
        try {
450

    
451
            // Create Server
452
            Server obj = new Server(args[0]);
453

    
454
            // Bind the server as a registry
455
            PeerInterface stub = (PeerInterface) UnicastRemoteObject.exportObject(obj, 0);
456

    
457
            // Bind the remote object's stub in the registry
458

    
459
            // TODO Get registry what port?
460
            Registry registry = LocateRegistry.getRegistry();
461
            registry.bind(args[0], stub);
462

    
463
            System.err.println("Server ready");
464

    
465
        } catch (Exception e) {
466

    
467
            System.err.println("Server exception: " + e.toString());
468
            e.printStackTrace();
469

    
470
        }
471
    }
472

    
473
}