Project

General

Profile

Revision 3

Improve organization and scripts

View differences:

Server.java
1
import java.rmi.registry.Registry;
2
import java.rmi.registry.LocateRegistry;
3
import java.rmi.RemoteException;
4
import java.rmi.server.UnicastRemoteObject;
5

  
6
import java.util.Map;
7

  
8
import java.util.concurrent.ConcurrentHashMap;
9
import java.util.concurrent.ScheduledThreadPoolExecutor;
10
import java.util.concurrent.TimeUnit;
11
import java.util.concurrent.atomic.AtomicBoolean;
12
import java.util.concurrent.Executors;
13
import java.util.concurrent.CopyOnWriteArrayList;;
14
import java.util.ArrayList;
15
import java.util.List;
16
import java.util.Comparator;
17
import java.util.HashSet;
18
import java.util.Collections;
19

  
20
import java.io.*;
21

  
22
import java.net.DatagramPacket;
23

  
24
import java.nio.file.*;
25

  
26
//Server class essentially a Peer
27
public class Server implements PeerInterface {
28

  
29
    // Unique peer id
30
    public String id;
31

  
32
    // Size controls
33
    public static final int DEFAULT_MAX_SIZE = 1000000000;
34
    public int limit;
35
    public int used;
36

  
37
    // Thread Pool
38
    public static ScheduledThreadPoolExecutor pool;
39

  
40
    // Sockets
41
    public static Socket MC;
42
    public static Socket MDB;
43
    public static Socket MDR;
44

  
45
    // State Backup
46
    public boolean doingBackUp;
47
    public String file;
48
    public int replications;
49

  
50
    // State Restore
51
    public boolean restoring;
52
    public AtomicBoolean waitRestoreReply = new AtomicBoolean(false);
53
    public ConcurrentHashMap<String, Chunk> recoveredChunks = new ConcurrentHashMap<String, Chunk>(); // ChunkNo ->
54
                                                                                                      // Chunk
55

  
56
    // Service Management Information
57
    public ConcurrentHashMap<String, Chunk> chunks = new ConcurrentHashMap<String, Chunk>(); // fileId-ChunkNo
58
                                                                                             // -> Chunk
59
    public ConcurrentHashMap<String, String> files = new ConcurrentHashMap<String, String>(); // fileId -> // file name
60

  
61
    // Empty Constructor
62
    public Server() {
63
    }
64

  
65
    // Simplified Constructor(Default values)
66
    public Server(String id) throws IOException {
67
        this.used = 0;
68
        this.id = id;
69
        this.limit = DEFAULT_MAX_SIZE;
70
        this.setupSockets();
71
        this.loadLocal();
72
        this.setupDirectory();
73
    }
74

  
75
    // Full constructor
76
    public Server(String id, int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
77
            String addressMDR) throws IOException {
78
        this.used = 0;
79
        this.id = id;
80
        this.limit = DEFAULT_MAX_SIZE;
81
        this.setupSockets(portMC, addressMC, portMDB, addressMDB, portMDR, addressMDR);
82
        this.loadLocal();
83
        this.setupDirectory();
84
    }
85

  
86
    // Backup Protocol
87
    public void backup(String path, int replications) throws RemoteException {
88

  
89
        // Start backup
90
        this.doingBackUp = true;
91

  
92
        // Extract file name
93
        int index = path.lastIndexOf("\\");
94
        String fileName = path.substring(index + 1);
95

  
96
        // Generate Hash from file name and date
97
        String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified()));
98

  
99
        // Is file backed up already?
100
        if (this.files.get(fileId) != null) {
101
            System.out.println("File already backed up");
102
            return;
103
        }
104

  
105
        // Add to file list
106
        this.files.put(fileId, fileName);
107

  
108
        try {
109

  
110
            // Get file bytes
111
            byte[] bytes = FileManager.readFile(path);
112

  
113
            // Get file in chunks
114
            byte[][] chunks = FileManager.split(bytes);
115

  
116
            // Send each chunk
117
            for (int i = 0; i < chunks.length; i++) {
118

  
119
                // Base receive time of 1 second
120
                int receiveTime = 1000;
121

  
122
                // PUTCHUNK tries
123
                for (int j = 0; j < 5; j++) {
124

  
125
                    // Prepare message
126
                    Message msg = new Message(Message.MessageType.PUTCHUNK, id, fileId, i, replications, chunks[i]);
127

  
128
                    // Pack in datagram
129
                    DatagramPacket packet = msg.packit(MDB.address, MDB.port);
130

  
131
                    // Send message
132
                    MDB.socket.send(packet);
133

  
134
                    // Wait for replies and check Replication level
135
                    this.replications = 0;
136
                    this.file = fileId;
137
                    Thread.sleep(receiveTime);
138
                    if (this.replications >= replications) {
139
                        break;
140
                    }
141

  
142
                    // Double receive time
143
                    receiveTime *= 2;
144

  
145
                }
146

  
147
            }
148

  
149
        } catch (Exception e) {
150
            e.printStackTrace();
151
        }
152

  
153
        // Terminate backup
154
        this.doingBackUp = false;
155

  
156
    }
157

  
158
    // Restore Protocol
159
    public void restore(String path) throws RemoteException {
160

  
161
        this.restoring = true;
162

  
163
        // File name
164
        int index = path.lastIndexOf("\\");
165
        String fileName = path.substring(index + 1);
166

  
167
        // Generate Hash from file name and date
168
        String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified()));
169

  
170
        // Find in list of backed up
171
        String name = this.files.get(fileId);
172

  
173
        // File not backed up yet, just return
174
        if (name == null) {
175
            System.out.println("File not backed up by this peer");
176
            return;
177
        }
178

  
179
        try {
180

  
181
            // Get byte count (probably better than actually fetching the bytes array)
182
            File file = new File(path);
183
            int bytes = (int) file.length();
184

  
185
            // How many chunks to expect
186
            int chunks = 1 + bytes / FileManager.chunkSize;
187

  
188
            // Reset list of Chunks recovered
189
            this.recoveredChunks.clear();
190

  
191
            // Request chunks
192
            for (int i = 0; i < chunks; i++) {
193

  
194
                // Prepare message
195
                Message msg = new Message(Message.MessageType.GETCHUNK, id, fileId, i, 0, new byte[0]);
196

  
197
                // Pack in datagram
198
                DatagramPacket packet = msg.packit(MC.address, MC.port);
199

  
200
                // Send message
201
                MC.socket.send(packet);
202

  
203
                // Wait for response
204
                this.waitRestoreReply.set(true);
205
                long time = System.nanoTime();
206
                while (this.waitRestoreReply.get()) {
207
                    // Restore timeout
208
                    if (TimeUnit.NANOSECONDS.toMillis(time) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) > 1000) {
209
                        System.out.println("Restore timeout, missing chunks");
210
                        return;
211
                    }
212
                }
213

  
214
            }
215

  
216
            // Get simple list from recovered chunks
217
            List<Chunk> list = new ArrayList<Chunk>(this.recoveredChunks.values());
218

  
219
            // Sort chunks
220
            Collections.sort(list, new Comparator<Chunk>() {
221
                @Override
222
                public int compare(Chunk p1, Chunk p2) {
223
                    return p1.chunkN - p2.chunkN; // Ascending
224
                }
225
            });
226

  
227
            // Pack chunks
228
            byte[][] allBytes = new byte[list.size()][];
229
            for (int i = 0; i < list.size(); i++) {
230
                allBytes[i] = list.get(i).bytes;
231
            }
232

  
233
            // Merge chunks
234
            byte[] fileBytes = FileManager.merge(allBytes);
235

  
236
            // Save file
237
            Path current = Paths.get("");
238
            String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\restored\\" + fileName;
239
            try (FileOutputStream stream = new FileOutputStream(place)) {
240
                stream.write(fileBytes);
241
                stream.close();
242
            }
243

  
244
        } catch (Exception e) {
245
            e.printStackTrace();
246
        }
247

  
248
        this.restoring = false;
249

  
250
    }
251

  
252
    // Delete chunks associated to this file
253
    public void delete(String path) throws RemoteException {
254

  
255
        // File name
256
        int index = path.lastIndexOf("\\");
257
        String fileName = path.substring(index + 1);
258

  
259
        // Generate Hash from file name and date
260
        String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified()));
261

  
262
        // Remove from list of backed up
263
        String name = this.files.get(fileId);
264
        if (name != null) {
265
            this.files.remove(fileId);
266
        }
267

  
268
        try {
269

  
270
            // Prepare message
271
            Message msg = new Message(Message.MessageType.DELETE, id, fileId, 0, 0, new byte[0]);
272

  
273
            // Pack in datagram
274
            DatagramPacket packet = msg.packit(MC.address, MC.port);
275

  
276
            // Send message
277
            MC.socket.send(packet);
278

  
279
        } catch (Exception e) {
280
            e.printStackTrace();
281
        }
282

  
283
    }
284

  
285
    public void reclaim(int memory) throws RemoteException {
286

  
287
        // Memory can't be lower than 0
288
        if (memory < 0) {
289
            return;
290
        }
291

  
292
        // Assign new limit
293
        limit = memory;
294

  
295
        try {
296

  
297
            // Check need to delete chunks
298
            // Going to delete by order in the map
299
            while (used > limit) {
300

  
301
                // Pick an entry
302
                Map.Entry<String, Chunk> entry = chunks.entrySet().iterator().next();
303

  
304
                // Remove from used space
305
                used -= entry.getValue().bytes.length;
306

  
307
                // Prepare message
308
                Message msg = new Message(Message.MessageType.REMOVED, id, entry.getValue().fileId,
309
                        entry.getValue().chunkN, 0, new byte[0]);
310

  
311
                // Pack in datagram
312
                DatagramPacket packet = msg.packit(MC.address, MC.port);
313

  
314
                // Send message
315
                MC.socket.send(packet);
316

  
317
                // Remove from map
318
                chunks.remove(entry.getKey());
319

  
320
                // Remove from file system
321
                Path current = Paths.get("");
322
                String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\backup\\"
323
                        + entry.getValue().fileId + "\\chk" + entry.getValue().chunkN + ".chk";
324
                File toDelete = new File(place);
325
                toDelete.delete();
326

  
327
            }
328

  
329
        } catch (Exception e) {
330
            e.printStackTrace();
331
        }
332

  
333
    }
334

  
335
    public void state() throws RemoteException {
336

  
337
    }
338

  
339
    // Get information of previous peer life from file system
340
    public void loadLocal() {
341

  
342
        // Base path
343
        Path currentRelativePath = Paths.get("");
344
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
345

  
346
        // Backup path
347
        File file = new File(peerDir + "\\backup");
348

  
349
        // FileIds backed up
350
        File[] files = file.listFiles(File::isDirectory);
351

  
352
        // Peer didn't exist, just return
353
        if (files == null)
354
            return;
355

  
356
        // Each file directory
357
        for (int i = 0; i < files.length; i++) {
358

  
359
            // FileId
360
            String fileId = files[i].getName();
361

  
362
            // Get Chunks inside
363
            file = new File(peerDir + "\\backup" + "\\" + fileId);
364
            File[] chunks = file.listFiles();
365

  
366
            // No chunks, just return
367
            if (chunks == null)
368
                return;
369

  
370
            // Each chunk
371
            for (int j = 0; j < chunks.length; j++) {
372

  
373
                // Chunk name
374
                String chunkNo = chunks[j].getName();
375

  
376
                // Remove chk
377
                chunkNo = chunkNo.replaceAll("\\.", "");
378
                chunkNo = chunkNo.replaceAll("chk", "");
379

  
380
                try {
381

  
382
                    // Read bytes
383
                    byte[] bytes = Files.readAllBytes(chunks[j].toPath());
384

  
385
                    // Updated used bytes
386
                    used += bytes.length;
387

  
388
                    // Add to HashMap
389
                    this.chunks.put(fileId + "-" + chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), bytes, 0));
390

  
391
                } catch (Exception e) {
392
                    e.printStackTrace();
393
                }
394

  
395
            }
396

  
397
        }
398

  
399
    }
400

  
401
    // Setup directory
402
    public void setupDirectory() {
403

  
404
        // Base path
405
        Path currentRelativePath = Paths.get("");
406
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
407

  
408
        // Backup path
409
        File file = new File(peerDir + "\\backup");
410
        file.mkdirs();
411

  
412
        // Backup path
413
        file = new File(peerDir + "\\restored");
414
        file.mkdirs();
415

  
416
    }
417

  
418
    // Default Socket Setup
419
    public void setupSockets() throws IOException {
420

  
421
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
422

  
423
        MC = new Socket(this, Socket.Type.MC);
424
        pool.execute(MC);
425

  
426
        MDB = new Socket(this, Socket.Type.MDB);
427
        pool.execute(MDB);
428

  
429
        MDR = new Socket(this, Socket.Type.MDR);
430
        pool.execute(MDR);
431

  
432
    }
433

  
434
    // Specifc Socket Setup
435
    public void setupSockets(int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
436
            String addressMDR) throws IOException {
437

  
438
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
439

  
440
        MC = new Socket(this, Socket.Type.MC, portMC, addressMC);
441
        pool.execute(MC);
442

  
443
        MDB = new Socket(this, Socket.Type.MDB, portMDB, addressMDB);
444
        pool.execute(MDB);
445

  
446
        MDR = new Socket(this, Socket.Type.MDR, portMDR, addressMDR);
447
        pool.execute(MDR);
448

  
449
    }
450

  
451
    public static void main(String args[]) {
452

  
453
        try {
454

  
455
            // Server object
456
            Server obj;
457

  
458
            // Basic (version id)
459
            if (args.length == 2) {
460
                obj = new Server(args[1]);
461
            }
462
            // Full (version id access port mc port mdb port mdr)
463
            else if (args.length == 9) {
464
                obj = new Server(args[1], Integer.parseInt(args[3]), args[4], Integer.parseInt(args[5]), args[6],
465
                        Integer.parseInt(args[7]), args[8]);
466
            }
467
            // Invalid arguments
468
            else {
469
                System.out.println("java Server <version> <server id>");
470
                System.out.println(
471
                        "java Server <version> <server id> <access_point> <MC_port> <MC_IP_address> <MDB_port> <MDB_IP_address> <MDR_port> <MDR_IP_address>");
472
                return;
473
            }
474

  
475
            // Bind the server as a registry
476
            PeerInterface stub = (PeerInterface) UnicastRemoteObject.exportObject(obj, 0);
477

  
478
            // Bind the remote object's stub in the registry
479
            Registry registry = LocateRegistry.getRegistry();
480
            registry.bind(obj.id, stub);
481

  
482
            System.err.println("Server ready");
483

  
484
        } catch (Exception e) {
485

  
486
            System.err.println("Server exception: " + e.toString());
487
            e.printStackTrace();
488

  
489
        }
490
    }
491

  
492
}
Interpreter.java
1

  
2
import java.util.Random;
3
import java.rmi.registry.Registry;
4
import java.rmi.registry.LocateRegistry;
5
import java.rmi.RemoteException;
6
import java.rmi.server.UnicastRemoteObject;
7

  
8
import java.util.Map;
9

  
10
import java.util.concurrent.ConcurrentHashMap;
11
import java.util.concurrent.ScheduledThreadPoolExecutor;
12
import java.util.concurrent.TimeUnit;
13
import java.util.concurrent.atomic.AtomicBoolean;
14
import java.util.concurrent.Executors;
15
import java.util.concurrent.CopyOnWriteArrayList;
16
import java.util.ArrayList;
17
import java.util.List;
18
import java.util.Comparator;
19
import java.util.HashSet;
20
import java.util.Collections;
21

  
22
import java.io.*;
23

  
24
import java.net.DatagramPacket;
25

  
26
import java.nio.file.*;
27

  
28
public class Interpreter implements Runnable {
29

  
30
    public Random rand = new Random();
31

  
32
    private static final byte CR = 0xD;
33
    private static final byte LF = 0xA;
34

  
35
    public Server peer;
36

  
37
    public String raw;
38

  
39
    public String header;
40
    public byte[] body;
41

  
42
    public String version;
43
    public String senderId;
44
    public String fileId;
45
    public String chunkNo;
46
    public String replication;
47

  
48
    public AtomicBoolean elseSentChunk = new AtomicBoolean(false);
49
    public AtomicBoolean elseSentChunkReclaim = new AtomicBoolean(true);
50

  
51
    public Message.MessageType type;
52

  
53
    public Interpreter() {
54
    }
55

  
56
    public Interpreter(Server peer, DatagramPacket packet) {
57

  
58
        // Set peer
59
        this.peer = peer;
60

  
61
        // Seperate header and body
62
        messageSplitHeaderBody(packet.getData(), packet.getLength());
63

  
64
        System.out.println(header);
65

  
66
        // Get parts
67
        String[] parts = header.split(" ");
68

  
69
        // Identify type and specific arguments
70
        switch (parts[0]) {
71
        case "PUTCHUNK":
72
            type = Message.MessageType.PUTCHUNK;
73
            chunkNo = parts[4];
74
            replication = parts[5];
75
            break;
76
        case "STORED":
77
            type = Message.MessageType.STORED;
78
            chunkNo = parts[4];
79
            break;
80
        case "GETCHUNK":
81
            type = Message.MessageType.GETCHUNK;
82
            chunkNo = parts[4];
83
            break;
84
        case "CHUNK":
85
            type = Message.MessageType.CHUNK;
86
            chunkNo = parts[4];
87
            break;
88
        case "DELETE":
89
            type = Message.MessageType.DELETE;
90
            break;
91
        case "REMOVED":
92
            type = Message.MessageType.REMOVED;
93
            chunkNo = parts[4];
94
            break;
95
        default:
96
            break;
97
        }
98

  
99
        // Version, SenderId and FileId are shared
100
        version = parts[1];
101
        senderId = parts[2];
102
        fileId = parts[3];
103

  
104
    }
105

  
106
    // Split message header and body
107
    public void messageSplitHeaderBody(byte[] message, int size) {
108

  
109
        // Find index of CRLF
110
        int i = 0;
111
        for (i = 0; i < size; i++) {
112
            if (i <= size - 5) {
113
                if (message[i] == CR && message[i + 1] == LF && message[i + 2] == CR && message[i + 3] == LF) {
114
                    break;
115
                }
116
            }
117
        }
118

  
119
        // Get header
120
        byte[] headerByte = new byte[i];
121

  
122
        System.arraycopy(message, 0, headerByte, 0, i);
123

  
124
        header = new String(headerByte);
125
        header = header.trim();
126

  
127
        // Get body
128
        if (size > i + 3) {
129
            body = new byte[size - i - 4];
130
            System.arraycopy(message, i + 4, body, 0, size - i - 4);
131
        } else {
132
            body = null;
133
        }
134

  
135
    }
136

  
137
    @Override
138
    public void run() {
139

  
140
        // Do whatever the message asks
141
        try {
142

  
143
            // Identify Type
144
            switch (type) {
145
            case PUTCHUNK:
146

  
147
                // Someone(NOT US!) sent a CHUNK, cancel it
148
                if (this.elseSentChunkReclaim.get() && senderId != this.peer.id)
149
                    this.elseSentChunkReclaim.set(true);
150

  
151
                // Dont backup self
152
                if (peer.doingBackUp)
153
                    return;
154

  
155
                // Check peer has enough size available
156
                if (peer.used + body.length > peer.limit)
157
                    return;
158

  
159
                // Get path to peer
160
                Path currentRelativePath = Paths.get("");
161
                String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + peer.id;
162
                peerDir += "\\backup";
163
                peerDir += "\\" + fileId;
164
                File file = new File(peerDir);
165

  
166
                // Create path
167
                file.mkdirs();
168

  
169
                // Store chunk in file system
170
                peerDir += "\\chk" + chunkNo + ".chk";
171
                File chunk = new File(peerDir);
172
                FileOutputStream out = new FileOutputStream(chunk);
173
                out.write(body);
174
                out.close();
175

  
176
                // Store chunk in Peer cache(hashmap)
177
                peer.chunks.put(fileId + "-" + chunkNo,
178
                        new Chunk(fileId, Integer.parseInt(chunkNo), body, Integer.parseInt(replication)));
179

  
180
                // Add to used memory
181
                peer.used += body.length;
182

  
183
                // Random delay
184
                Thread.sleep(rand.nextInt(400));
185

  
186
                // Send STORED to MC
187
                Message msg = new Message(Message.MessageType.STORED, peer.id, fileId, Integer.parseInt(chunkNo), 0,
188
                        new byte[0]);
189

  
190
                DatagramPacket packet = msg.packit(Server.MC.address, Server.MC.port);
191

  
192
                Server.MC.socket.send(packet);
193

  
194
                break;
195
            case STORED:
196

  
197
                // If doing backup then add to replications
198
                if (peer.doingBackUp && peer.file.equals(fileId)) {
199
                    peer.replications++;
200
                }
201

  
202
                // Chunk that was stored
203
                String keyStored = fileId + "-" + chunkNo;
204

  
205
                // If we have the chunk
206
                if (this.peer.chunks.get(keyStored) != null) {
207
                    this.peer.chunks.get(keyStored).cReps++;
208
                }
209

  
210
                break;
211
            case GETCHUNK:
212

  
213
                // Find chunk in store
214
                Chunk requestedChunk = peer.chunks.get(fileId + "-" + chunkNo);
215

  
216
                // Chunk not found, just return
217
                if (requestedChunk == null)
218
                    return;
219

  
220
                // Set flag to lookout for CHUNK
221
                this.elseSentChunk.set(false);
222

  
223
                // Wait random delay
224
                Thread.sleep(rand.nextInt(400));
225

  
226
                // Someone else sent the CHUNK, just return
227
                if (this.elseSentChunk.get())
228
                    return;
229

  
230
                // Send CHUNK to MDR
231
                Message msgGetChunk = new Message(Message.MessageType.CHUNK, peer.id, fileId, Integer.parseInt(chunkNo),
232
                        0, requestedChunk.bytes);
233

  
234
                DatagramPacket packetGetChunk = msgGetChunk.packit(Server.MDR.address, Server.MDR.port);
235

  
236
                Server.MDR.socket.send(packetGetChunk);
237

  
238
                break;
239
            case CHUNK:
240

  
241
                // Find chunk in store
242
                Chunk receivedChunk = peer.chunks.get(fileId + "-" + chunkNo);
243

  
244
                // Someone(NOT US!) sent a CHUNK we were going to send, cancel it
245
                if (receivedChunk != null && senderId != this.peer.id) {
246
                    this.elseSentChunk.set(true);
247
                }
248

  
249
                // Add to recovered if not there yet
250
                if (this.peer.restoring) {
251
                    if (!this.peer.recoveredChunks.containsKey(chunkNo)) {
252
                        this.peer.recoveredChunks.put(chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), body, 0));
253
                        // Chunk received, continue asking for rest
254
                        if (peer.waitRestoreReply.get())
255
                            peer.waitRestoreReply.set(false);
256
                    }
257

  
258
                }
259

  
260
                break;
261
            case DELETE:
262

  
263
                boolean found1 = false;
264

  
265
                // Delete cache chunks
266
                for (String keyDelete : this.peer.chunks.keySet()) {
267
                    if (keyDelete.contains(this.fileId)) {
268
                        found1 = true;
269
                        this.peer.chunks.remove(keyDelete);
270
                    }
271
                }
272

  
273
                // Delete filesystem chunks
274
                // If at least one in cache was found then surely it exists in file system and
275
                // we should delete
276
                if (found1) {
277
                    Path current = Paths.get("");
278
                    String fileDir = current.toAbsolutePath().toString() + "\\peer" + peer.id;
279
                    fileDir += "\\backup";
280
                    fileDir += "\\" + fileId;
281
                    File fileDirFile = new File(fileDir);
282
                    FileManager.deleteDir(fileDirFile);
283
                }
284

  
285
                break;
286
            case REMOVED:
287

  
288
                // Chunk requested
289
                String key = fileId + "-" + chunkNo;
290

  
291
                // If we have the chunk
292
                if (this.peer.chunks.get(key) != null) {
293

  
294
                    // Remove replication
295
                    this.peer.chunks.get(key).cReps--;
296

  
297
                    System.out.println(this.peer.chunks.get(key).cReps);
298
                    System.out.println(this.peer.chunks.get(key).reps);
299

  
300
                    // If bellow replication degree
301
                    if (this.peer.chunks.get(key).cReps < this.peer.chunks.get(key).reps) {
302
                        // Wait random delay
303
                        this.elseSentChunkReclaim.set(false);
304
                        Thread.sleep(rand.nextInt(400));
305

  
306
                        // Someone already started backup of this chunk
307
                        if (this.elseSentChunkReclaim.get())
308
                            return;
309

  
310
                        // Prepare message
311
                        Message msgRemovedChunk = new Message(Message.MessageType.PUTCHUNK, this.peer.id, fileId,
312
                                this.peer.chunks.get(key).chunkN, this.peer.chunks.get(key).reps,
313
                                this.peer.chunks.get(key).bytes);
314

  
315
                        // Pack in datagram
316
                        DatagramPacket packetRemovedChunk = msgRemovedChunk.packit(this.peer.MDB.address,
317
                                this.peer.MDB.port);
318

  
319
                        // Send message
320
                        this.peer.MDB.socket.send(packetRemovedChunk);
321
                    }
322

  
323
                }
324

  
325
                break;
326

  
327
            default:
328
                break;
329
            }
330

  
331
        } catch (Exception e) {
332
            e.printStackTrace();
333
        }
334

  
335
    }
336

  
337
}
FileManager.java
1
import java.io.*;
2
import java.nio.file.*;
3
import java.security.*;
4
import java.nio.charset.*;
5

  
6
public class FileManager {
7

  
8
    public static int chunkSize = 64000;
9

  
10
    // Save byte array chunk to path
11
    public static void saveChunk(byte[] bytes, String path, int chunkN) throws FileNotFoundException, IOException {
12

  
13
        String fileName = path + Integer.toString(chunkN);
14

  
15
        try (FileOutputStream stream = new FileOutputStream(fileName)) {
16
            stream.write(bytes);
17
        }
18

  
19
    }
20

  
21
    // Hash a string with SHA256
22
    public static String hash(String input) {
23

  
24
        try {
25

  
26
            byte[] bytes = digestHashing(input);
27
            return bytesToHex(bytes);
28

  
29
        } catch (Exception e) {
30

  
31
            e.printStackTrace();
32

  
33
        }
34

  
35
        return null;
36

  
37
    }
38

  
39
    // Use MessageDigest to hash in bytes
40
    private static byte[] digestHashing(String input) throws NoSuchAlgorithmException {
41

  
42
        MessageDigest digest = MessageDigest.getInstance("SHA-256");
43
        return digest.digest(input.getBytes(StandardCharsets.UTF_8));
44

  
45
    }
46

  
47
    // Use bytes from MessageDigest to a String
48
    private static String bytesToHex(byte[] hash) {
49

  
50
        StringBuffer hexString = new StringBuffer();
51

  
52
        for (int i = 0; i < hash.length; i++) {
53

  
54
            String hex = Integer.toHexString(0xff & hash[i]);
55

  
56
            if (hex.length() == 1)
57
                hexString.append('0');
58

  
59
            hexString.append(hex);
60

  
61
        }
62

  
63
        return hexString.toString();
64

  
65
    }
66

  
67
    // Delete directory and files
68
    public static void deleteDir(File file) {
69
        File[] contents = file.listFiles();
70
        if (contents != null) {
71
            for (File f : contents) {
72
                if (!Files.isSymbolicLink(f.toPath())) {
73
                    deleteDir(f);
74
                }
75
            }
76
        }
77
        file.delete();
78
    }
79

  
80
    // Read all bytes of file(generic use)
81
    public static byte[] readFile(String path) {
82

  
83
        Path filePath = Paths.get(path);
84

  
85
        byte[] bytes = null;
86

  
87
        try {
88
            bytes = Files.readAllBytes(filePath);
89
        } catch (Exception e) {
90
            e.printStackTrace();
91
        }
92

  
93
        return bytes;
94
    }
95

  
96
    // Split file in chunks
97
    public static byte[][] split(byte[] fileBytes) {
98

  
99
        // Number of chunks needed is always integer division plus 1 which may be
100
        // partially used or empty
101
        int nChunks = 1 + fileBytes.length / chunkSize;
102

  
103
        // Total bytes
104
        int bytes = fileBytes.length;
105
        int rBytes = 0;
106

  
107
        // Chunks seperated
108
        byte[][] chunks = new byte[nChunks][];
109
        for (int i = 0; i < nChunks; i++) {
110

  
111
            // Calculate proper size
112
            int sizeToCopy = chunkSize;
113
            if (bytes < sizeToCopy)
114
                sizeToCopy = bytes;
115

  
116
            // Chunk with proper size
117
            chunks[i] = new byte[sizeToCopy];
118

  
119
            // Copy to chunk
120
            System.arraycopy(fileBytes, rBytes, chunks[i], 0, sizeToCopy);
121

  
122
            // Remove form total bytes
123
            bytes -= sizeToCopy;
124
            rBytes += sizeToCopy;
125

  
126
        }
127
        return chunks;
128
    }
129

  
130
    // Merge chunks into single byte array
131
    public static byte[] merge(byte[][] chunks) throws IOException {
132

  
133
        // Total bytes
134
        int tBytes = 0;
135
        for (int i = 0; i < chunks.length; i++) {
136
            tBytes += chunks[i].length;
137
        }
138

  
139
        byte[] mergedFile = new byte[tBytes];
140

  
141
        // Written bytes count
142
        int nBytes = 0;
143

  
144
        // Write bytes
145
        for (int i = 0; i < chunks.length; i++) {
146
            System.arraycopy(chunks[i], 0, mergedFile, nBytes, chunks[i].length);
147
            nBytes += chunks[i].length;
148
        }
149

  
150
        return mergedFile;
151
    }
152

  
153
    // Test main for split and merge file
154
    public static void main(String[] args) {
155

  
156
        String fileName = args[0];
157
        Path filePath = Paths.get(fileName);
158
        byte[] fileBytes = null;
159

  
160
        try {
161
            fileBytes = Files.readAllBytes(filePath);
162
        } catch (Exception e) {
163
            e.printStackTrace();
164
        }
165

  
166
        byte[][] chunks = split(fileBytes);
167

  
168
        try {
169
            fileBytes = merge(chunks);
170
        } catch (Exception e) {
171
            e.printStackTrace();
172
        }
173

  
174
    }
175

  
176
}
Client.java
1

  
2
import java.rmi.registry.LocateRegistry;
3
import java.rmi.registry.Registry;
4

  
5
public class Client {
6

  
7
    public static void main(String[] args) {
8

  
9
        // Command parameters
10
        String[] opnd = { "operand", "opnd1", "opnd2" };
11

  
12
        // Process Arguments
13

  
14
        // 2 arguments can only be STATE check
15
        if (args.length == 2) {
16
            opnd[0] = "STATE";
17
        }
18
        // 3 arguments can only be RECLAIM/DELETE/RESTORE
19
        else if (args.length == 3) {
20
            switch (args[1]) {
21
            case "RESTORE":
22
                opnd[0] = "RESTORE";
23
                // File
24
                opnd[1] = args[2];
25
                break;
26
            case "DELETE":
27
                opnd[0] = "DELETE";
28
                // File
29
                opnd[1] = args[2];
30
                break;
31
            case "RECLAIM":
32
                opnd[0] = "RECLAIM";
33
                // Reclaim size
34
                opnd[1] = args[2];
35
                break;
36
            default:
37
                System.out.println("Error: Invalid operation name");
38
                return;
39
            }
40
        }
41
        // 4 arguments can only be BACKUP
42
        else if (args.length == 4) {
43
            opnd[0] = "BACKUP";
44
            // File
45
            opnd[1] = args[2];
46
            // Replication level
47
            opnd[2] = args[3];
48
        }
49
        // Any more arguments are invalid too
50
        else {
51
            System.out.println("java Client <peer_ap> <protocol> <opnd_1> <opnd_2");
52
            return;
53
        }
54

  
55
        // Host id
56
        String[] host = new String(args[0]).split("/");
57

  
58
        try {
59

  
60
            PeerInterface stub;
61

  
62
            // Lookup for RMI
63
            if(host.length > 2){
64
                Registry registry = LocateRegistry.getRegistry(host[2]);
65
                stub = (PeerInterface) registry.lookup(host[3]);
66
            }else{
67
                Registry registry = LocateRegistry.getRegistry();
68
                stub = (PeerInterface) registry.lookup(args[0]);
69
            }                     
70

  
71
            // Identify action
72
            switch (opnd[0]) {
73
            case "BACKUP":
74
                stub.backup(opnd[1], Integer.parseInt(opnd[2]));
75
                break;
76
            case "RESTORE":
77
                stub.restore(opnd[1]);
78
                break;
79
            case "DELETE":
80
                stub.delete(opnd[1]);
81
                break;
82
            case "RECLAIM":
83
                stub.reclaim(Integer.parseInt(opnd[1]));
84
                break;
85
            default:
86
                break;
87
            }
88

  
89
        } catch (Exception e) {
90

  
91
            System.err.println("Client exception: " + e.toString());
92
            e.printStackTrace();
93

  
94
        }
95

  
96
    }
97

  
98
}
Chunk.java
1

  
2
public class Chunk {
3

  
4
    public String fileId;
5
    public int chunkN;
6
    public byte[] bytes;
7
    public int reps;
8
    public int cReps = 0;
9

  
10
    public Chunk(String fileId, int chunkN, byte[] bytes, int reps) {
11
        this.fileId = fileId;
12
        this.chunkN = chunkN;
13
        this.bytes = bytes;
14
        this.reps = reps;
15
    }
16

  
17
}
Socket.java
1

  
2
import java.io.IOException;
3
import java.net.DatagramPacket;
4
import java.net.InetAddress;
5
import java.net.MulticastSocket;
6
import java.util.AbstractMap.SimpleEntry;
7

  
8
public class Socket implements Runnable {
9

  
10
    public MulticastSocket socket;
11
    public int port;
12
    public InetAddress address;
13

  
14
    public enum Type {
15
        MC, MDB, MDR
16
    }
17

  
18
    public Type type;
19

  
20
    public Server peer;
21

  
22
    // Message size(safe size)
23
    public static final int MESSAGE_SIZE = 65500;
24

  
25
    // Default constructor
26
    public Socket(Server peer, Type type) throws IOException {
27

  
28
        this.peer = peer;
29

  
30
        switch (type) {
31
        case MC:
32
            this.port = 8000;
33
            this.address = InetAddress.getByName("224.0.0.1");
34
            break;
35
        case MDB:
36
            this.port = 8001;
37
            this.address = InetAddress.getByName("224.0.0.2");
38
            break;
39
        case MDR:
40
            this.port = 8002;
41
            this.address = InetAddress.getByName("224.0.0.3");
42
            break;
43
        default:
44
            return;
45
        }
46

  
47
        this.type = type;
48
        this.socket = new MulticastSocket(this.port);
49
        this.socket.joinGroup(address);
50

  
51
    }
52

  
53
    // Specific constructor
54
    public Socket(Server peer, Type type, int port, String address) throws IOException {
55

  
56
        this.peer = peer;
57
        this.type = type;
58
        this.port = port;
59
        this.address = InetAddress.getByName(address);
60
        this.socket = new MulticastSocket(this.port);
61
        this.socket.joinGroup(this.address);
62

  
63
    }
64

  
65
    @Override
66
    public void run() {
67

  
68
        while (true) {
69

  
70
            byte[] buf = new byte[MESSAGE_SIZE];
71
            DatagramPacket packet = new DatagramPacket(buf, buf.length);
72

  
73
            try {
74

  
75
                this.socket.receive(packet);
76

  
77
                System.out.println("Packet received, launching interpreter");
78

  
79
                Server.pool.execute(new Interpreter(peer, packet));
80

  
81
            } catch (IOException e) {
82
                e.printStackTrace();
83
            }
84

  
85
        }
86

  
87
    }
88

  
89
}
PeerInterface.java
1

  
2
import java.rmi.Remote;
3
import java.rmi.RemoteException;
4

  
5
public interface PeerInterface extends Remote {
6

  
7
    void backup(String path, int replications) throws RemoteException;
8

  
9
    void restore(String path) throws RemoteException;
10

  
11
    void state() throws RemoteException;
12

  
13
    void reclaim(int memory) throws RemoteException;
14

  
15
    void delete(String path) throws RemoteException;
16

  
17
}
Message.java
1
import java.net.DatagramPacket;
2
import java.net.InetAddress;
3

  
4
public class Message {
5

  
6
    private static final String CRLF = "\r\n";
7

  
8
    public String header;
9
    public byte[] body;
10

  
11
    public enum MessageType {
12
        PUTCHUNK, STORED, GETCHUNK, CHUNK, DELETE, REMOVED
13
    };
14

  
15
    public MessageType type;
16

  
17
    public Message() {
18
    }
19

  
20
    // Build a message
21
    public Message(MessageType type, String senderId, String fileId, int chunkN, int replicationDeg, byte[] bytes) {
22

  
23
        // Reset body
24
        body = new byte[0];
25

  
26
        // Add chunk
27
        body = bytes;
28

  
29
        // Assign type
30
        this.type = type;
31

  
32
        // Prepare type
33
        switch (type) {
34
        case PUTCHUNK:
35
            header = "PUTCHUNK";
36
            break;
37
        case STORED:
38
            header = "STORED";
39
            break;
40
        case GETCHUNK:
41
            header = "GETCHUNK";
42
            break;
43
        case CHUNK:
44
            header = "CHUNK";
45
            break;
46
        case DELETE:
47
            header = "DELETE";
48
            break;
49
        case REMOVED:
50
            header = "REMOVED";
51
            break;
52
        default:
53
            break;
54
        }
55

  
56
        // Version, SenderId and FileId are shared
57
        // TODO hardcoded version 1.0
58
        header += " 1.0";
59
        header += " " + senderId;
60
        header += " " + fileId;
61

  
62
        // Remaining message header
63
        // Technically it would be more efficient to add chunkN to all types and remove
64
        // in DELETE but eh
65
        switch (type) {
66
        case PUTCHUNK:
67
            header += " " + Integer.toString(chunkN);
68
            header += " " + Integer.toString(replicationDeg);
69
            break;
70
        case STORED:
71
            header += " " + Integer.toString(chunkN);
72
            break;
73
        case GETCHUNK:
74
            header += " " + Integer.toString(chunkN);
75
            break;
76
        case CHUNK:
77
            header += " " + Integer.toString(chunkN);
78
            break;
79
        case DELETE:
80
            // Nothing to add
81
            break;
82
        case REMOVED:
83
            header += " " + Integer.toString(chunkN);
84
            break;
85
        default:
86
            break;
87
        }
88

  
89
        // Finalize with double CRLF
90
        header += CRLF + CRLF;
91

  
92
    }
93

  
94
    // Pack a datagram from message
95
    public DatagramPacket packit(InetAddress address, int port) {
96

  
97
        byte[] headerBytes = header.getBytes();
98

  
99
        byte[] full = new byte[headerBytes.length + body.length];
100

  
101
        System.arraycopy(headerBytes, 0, full, 0, headerBytes.length);
102
        System.arraycopy(body, 0, full, headerBytes.length, body.length);
103

  
104
        DatagramPacket packet = new DatagramPacket(full, full.length, address, port);
105

  
106
        return packet;
107

  
108
    }
109

  
110
}
src/Chunk.java
1

  
2
public class Chunk {
3

  
4
    public String fileId;
5
    public int chunkN;
6
    public byte[] bytes;
7
    public int reps;
8
    public int cReps = 0;
9

  
10
    public Chunk(String fileId, int chunkN, byte[] bytes, int reps) {
11
        this.fileId = fileId;
12
        this.chunkN = chunkN;
13
        this.bytes = bytes;
14
        this.reps = reps;
15
    }
16

  
17
}
src/Client.java
1

  
2
import java.rmi.registry.LocateRegistry;
3
import java.rmi.registry.Registry;
4

  
5
public class Client {
6

  
7
    public static void main(String[] args) {
8

  
9
        // Command parameters
10
        String[] opnd = { "operand", "opnd1", "opnd2" };
11

  
12
        // Process Arguments
13

  
14
        // 2 arguments can only be STATE check
15
        if (args.length == 2) {
16
            opnd[0] = "STATE";
17
        }
18
        // 3 arguments can only be RECLAIM/DELETE/RESTORE
19
        else if (args.length == 3) {
20
            switch (args[1]) {
21
            case "RESTORE":
22
                opnd[0] = "RESTORE";
23
                // File
24
                opnd[1] = args[2];
25
                break;
26
            case "DELETE":
27
                opnd[0] = "DELETE";
28
                // File
29
                opnd[1] = args[2];
30
                break;
31
            case "RECLAIM":
32
                opnd[0] = "RECLAIM";
33
                // Reclaim size
34
                opnd[1] = args[2];
35
                break;
36
            default:
37
                System.out.println("Error: Invalid operation name");
38
                return;
39
            }
40
        }
41
        // 4 arguments can only be BACKUP
42
        else if (args.length == 4) {
43
            opnd[0] = "BACKUP";
44
            // File
45
            opnd[1] = args[2];
46
            // Replication level
47
            opnd[2] = args[3];
48
        }
49
        // Any more arguments are invalid too
50
        else {
51
            System.out.println("java Client <peer_ap> <protocol> <opnd_1> <opnd_2");
52
            return;
53
        }
54

  
55
        // Host id
56
        String[] host = new String(args[0]).split("/");
57

  
58
        try {
59

  
60
            PeerInterface stub;
61

  
62
            // Lookup for RMI
63
            if(host.length > 2){
64
                Registry registry = LocateRegistry.getRegistry(host[2]);
65
                stub = (PeerInterface) registry.lookup(host[3]);
66
            }else{
67
                Registry registry = LocateRegistry.getRegistry();
68
                stub = (PeerInterface) registry.lookup(args[0]);
69
            }                     
70

  
71
            // Identify action
72
            switch (opnd[0]) {
73
            case "BACKUP":
74
                stub.backup(opnd[1], Integer.parseInt(opnd[2]));
75
                break;
76
            case "RESTORE":
77
                stub.restore(opnd[1]);
78
                break;
79
            case "DELETE":
80
                stub.delete(opnd[1]);
81
                break;
82
            case "RECLAIM":
83
                stub.reclaim(Integer.parseInt(opnd[1]));
84
                break;
85
            default:
86
                break;
87
            }
88

  
89
        } catch (Exception e) {
90

  
91
            System.err.println("Client exception: " + e.toString());
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff