Project

General

Profile

Revision 1

Base Functionalities w/ scripts w/snooper w/test

View differences:

Chunk.java
1

  
2
public class Chunk {
3

  
4
    public String fileId;
5
    public int chunkN;
6
    public byte[] bytes;
7
    public int reps;
8
    public int cReps = 0;
9

  
10
    public Chunk(String fileId, int chunkN, byte[] bytes, int reps) {
11
        this.fileId = fileId;
12
        this.chunkN = chunkN;
13
        this.bytes = bytes;
14
        this.reps = reps;
15
    }
16

  
17
}
Client.java
1

  
2
import java.rmi.registry.LocateRegistry;
3
import java.rmi.registry.Registry;
4

  
5
public class Client {
6

  
7
    private Client() {
8

  
9
    }
10

  
11
    public static void main(String[] args) {
12

  
13
        // Command parameters
14
        String[] opnd = { "operand", "opnd1", "opnd2" };
15

  
16
        // Process Arguments
17

  
18
        // 2 arguments can only be STATE check
19
        if (args.length == 2) {
20
            opnd[0] = "STATE";
21
        }
22
        // 3 arguments can only be RECLAIM/DELETE/RESTORE
23
        else if (args.length == 3) {
24
            switch (args[1]) {
25
            case "RESTORE":
26
                opnd[0] = "RESTORE";
27
                // File
28
                opnd[1] = args[2];
29
                break;
30
            case "DELETE":
31
                opnd[0] = "DELETE";
32
                // File
33
                opnd[1] = args[2];
34
                break;
35
            case "RECLAIM":
36
                opnd[0] = "RECLAIM";
37
                // Reclaim size
38
                opnd[1] = args[2];
39
                break;
40
            default:
41
                System.out.println("Error: Invalid operation name");
42
                return;
43
            }
44
        }
45
        // 4 arguments can only be BACKUP
46
        else if (args.length == 4) {
47
            opnd[0] = "BACKUP";
48
            // File
49
            opnd[1] = args[2];
50
            // Replication level
51
            opnd[2] = args[3];
52
        }
53
        // Any more arguments are invalid too
54
        else {
55
            System.out.println("Invalid number of arguments");
56
            return;
57
        }
58

  
59
        // Peer Id to takeover
60
        String idPeer = args[0];
61

  
62
        try {
63

  
64
            // Lookup for RMI
65
            Registry registry = LocateRegistry.getRegistry();
66

  
67
            // Lookup for peer access point
68
            PeerInterface stub = (PeerInterface) registry.lookup(idPeer);
69

  
70
            // Identify action
71
            switch (opnd[0]) {
72
            case "BACKUP":
73
                stub.backup(opnd[1], Integer.parseInt(opnd[2]));
74
                break;
75
            case "RESTORE":
76
                stub.restore(opnd[1]);
77
                break;
78
            case "DELETE":
79
                stub.delete(opnd[1]);
80
                break;
81
            case "RECLAIM":
82
                stub.reclaim(Integer.parseInt(opnd[1]));
83
                break;
84
            default:
85
                break;
86
            }
87

  
88
        } catch (Exception e) {
89

  
90
            System.err.println("Client exception: " + e.toString());
91
            e.printStackTrace();
92

  
93
        }
94

  
95
    }
96

  
97
}
FileManager.java
1
import java.io.*;
2
import java.nio.file.*;
3
import java.security.*;
4
import java.nio.charset.*;
5

  
6
public class FileManager {
7

  
8
    public static int chunkSize = 64000;
9

  
10
    // Save byte array chunk to path
11
    public static void saveChunk(byte[] bytes, String path, int chunkN) throws FileNotFoundException, IOException {
12

  
13
        String fileName = path + Integer.toString(chunkN);
14

  
15
        try (FileOutputStream stream = new FileOutputStream(fileName)) {
16
            stream.write(bytes);
17
        }
18

  
19
    }
20

  
21
    // Hash a string with SHA256
22
    public static String hash(String input) {
23

  
24
        try {
25

  
26
            byte[] bytes = digestHashing(input);
27
            return bytesToHex(bytes);
28

  
29
        } catch (Exception e) {
30

  
31
            e.printStackTrace();
32

  
33
        }
34

  
35
        return null;
36

  
37
    }
38

  
39
    // Use MessageDigest to hash in bytes
40
    private static byte[] digestHashing(String input) throws NoSuchAlgorithmException {
41

  
42
        MessageDigest digest = MessageDigest.getInstance("SHA-256");
43
        return digest.digest(input.getBytes(StandardCharsets.UTF_8));
44

  
45
    }
46

  
47
    // Use bytes from MessageDigest to a String
48
    private static String bytesToHex(byte[] hash) {
49

  
50
        StringBuffer hexString = new StringBuffer();
51

  
52
        for (int i = 0; i < hash.length; i++) {
53

  
54
            String hex = Integer.toHexString(0xff & hash[i]);
55

  
56
            if (hex.length() == 1)
57
                hexString.append('0');
58

  
59
            hexString.append(hex);
60

  
61
        }
62

  
63
        return hexString.toString();
64

  
65
    }
66

  
67
    // Delete directory and files
68
    public static void deleteDir(File file) {
69
        File[] contents = file.listFiles();
70
        if (contents != null) {
71
            for (File f : contents) {
72
                if (!Files.isSymbolicLink(f.toPath())) {
73
                    deleteDir(f);
74
                }
75
            }
76
        }
77
        file.delete();
78
    }
79

  
80
    // Create metafile(TODO add more useful arguments)
81
    // Used to reconstruct the file name when Restoring
82
    public static void metaFile(String path, String fileId) throws IOException {
83

  
84
        // Extract file name
85
        int index = path.lastIndexOf("\\");
86
        String fileName = path.substring(index + 1);
87

  
88
        // Create actual file
89
        File file = new File(fileName + ".meta");
90
        if (file.createNewFile()) {
91
            // TODO File created
92
        } else {
93
            // TODO File already exists
94
        }
95

  
96
        // Write data to file
97
        FileWriter writer = new FileWriter(file);
98
        writer.write("Id:" + fileId);
99
        writer.close();
100

  
101
    }
102

  
103
    // Read all bytes of file(generic use)
104
    public static byte[] readFile(String path) {
105

  
106
        Path filePath = Paths.get(path);
107

  
108
        byte[] bytes = null;
109

  
110
        try {
111
            bytes = Files.readAllBytes(filePath);
112
        } catch (Exception e) {
113
            e.printStackTrace();
114
        }
115

  
116
        return bytes;
117
    }
118

  
119
    // Split file in chunks
120
    public static byte[][] split(byte[] fileBytes) {
121

  
122
        // Number of chunks needed is always integer division plus 1 which may be
123
        // partially used or empty
124
        int nChunks = 1 + fileBytes.length / chunkSize;
125

  
126
        // Total bytes
127
        int bytes = fileBytes.length;
128
        int rBytes = 0;
129

  
130
        // Chunks seperated
131
        byte[][] chunks = new byte[nChunks][];
132
        for (int i = 0; i < nChunks; i++) {
133

  
134
            // Calculate proper size
135
            int sizeToCopy = chunkSize;
136
            if (bytes < sizeToCopy)
137
                sizeToCopy = bytes;
138

  
139
            // Chunk with proper size
140
            chunks[i] = new byte[sizeToCopy];
141

  
142
            // Copy to chunk
143
            System.arraycopy(fileBytes, rBytes, chunks[i], 0, sizeToCopy);
144

  
145
            // Remove form total bytes
146
            bytes -= sizeToCopy;
147
            rBytes += sizeToCopy;
148

  
149
        }
150
        return chunks;
151
    }
152

  
153
    // Merge chunks into single byte array
154
    public static byte[] merge(byte[][] chunks) throws IOException {
155

  
156
        // Total bytes
157
        int tBytes = 0;
158
        for (int i = 0; i < chunks.length; i++) {
159
            tBytes += chunks[i].length;
160
        }
161

  
162
        byte[] mergedFile = new byte[tBytes];
163

  
164
        // Written bytes count
165
        int nBytes = 0;
166

  
167
        // Write bytes
168
        for (int i = 0; i < chunks.length; i++) {
169
            System.arraycopy(chunks[i], 0, mergedFile, nBytes, chunks[i].length);
170
            nBytes += chunks[i].length;
171
        }
172

  
173
        return mergedFile;
174
    }
175

  
176
    // Test main for split and merge file
177
    public static void main(String[] args) {
178

  
179
        String fileName = args[0];
180
        Path filePath = Paths.get(fileName);
181
        byte[] fileBytes = null;
182

  
183
        try {
184
            fileBytes = Files.readAllBytes(filePath);
185
        } catch (Exception e) {
186
            e.printStackTrace();
187
        }
188

  
189
        byte[][] chunks = split(fileBytes);
190

  
191
        try {
192
            fileBytes = merge(chunks);
193
        } catch (Exception e) {
194
            e.printStackTrace();
195
        }
196

  
197
    }
198

  
199
}
Interpreter.java
1

  
2
import java.util.Random;
3
import java.rmi.registry.Registry;
4
import java.rmi.registry.LocateRegistry;
5
import java.rmi.RemoteException;
6
import java.rmi.server.UnicastRemoteObject;
7

  
8
import java.util.Map;
9

  
10
import java.util.concurrent.ConcurrentHashMap;
11
import java.util.concurrent.ScheduledThreadPoolExecutor;
12
import java.util.concurrent.TimeUnit;
13
import java.util.concurrent.atomic.AtomicBoolean;
14
import java.util.concurrent.Executors;
15
import java.util.concurrent.CopyOnWriteArrayList;
16
import java.util.ArrayList;
17
import java.util.List;
18
import java.util.Comparator;
19
import java.util.HashSet;
20
import java.util.Collections;
21

  
22
import java.io.*;
23

  
24
import java.net.DatagramPacket;
25

  
26
import java.nio.file.*;
27

  
28
public class Interpreter implements Runnable {
29

  
30
    public Random rand = new Random();
31

  
32
    private static final byte CR = 0xD;
33
    private static final byte LF = 0xA;
34

  
35
    public Server peer;
36

  
37
    public String raw;
38

  
39
    public String header;
40
    public byte[] body;
41

  
42
    public String version;
43
    public String senderId;
44
    public String fileId;
45
    public String chunkNo;
46
    public String replication;
47

  
48
    public AtomicBoolean elseSentChunk = new AtomicBoolean(false);
49
    public AtomicBoolean elseSentChunkReclaim = new AtomicBoolean(true);
50

  
51
    public Message.MessageType type;
52

  
53
    public Interpreter() {
54
    }
55

  
56
    public Interpreter(Server peer, DatagramPacket packet) {
57

  
58
        // Set peer
59
        this.peer = peer;
60

  
61
        // Seperate header and body
62
        messageSplitHeaderBody(packet.getData(), packet.getLength());
63

  
64
        System.out.println(header);
65

  
66
        // Get parts
67
        String[] parts = header.split(" ");
68

  
69
        // Identify type and specific arguments
70
        switch (parts[0]) {
71
        case "PUTCHUNK":
72
            type = Message.MessageType.PUTCHUNK;
73
            chunkNo = parts[4];
74
            replication = parts[5];
75
            break;
76
        case "STORED":
77
            type = Message.MessageType.STORED;
78
            chunkNo = parts[4];
79
            break;
80
        case "GETCHUNK":
81
            type = Message.MessageType.GETCHUNK;
82
            chunkNo = parts[4];
83
            break;
84
        case "CHUNK":
85
            type = Message.MessageType.CHUNK;
86
            chunkNo = parts[4];
87
            break;
88
        case "DELETE":
89
            type = Message.MessageType.DELETE;
90
            break;
91
        case "REMOVED":
92
            type = Message.MessageType.REMOVED;
93
            chunkNo = parts[4];
94
            break;
95
        default:
96
            break;
97
        }
98

  
99
        // Version, SenderId and FileId are shared
100
        version = parts[1];
101
        senderId = parts[2];
102
        fileId = parts[3];
103

  
104
    }
105

  
106
    // Split message header and body
107
    public void messageSplitHeaderBody(byte[] message, int size) {
108

  
109
        // Find index of CRLF
110
        int i = 0;
111
        for (i = 0; i < size; i++) {
112
            if (i <= size - 5) {
113
                if (message[i] == CR && message[i + 1] == LF && message[i + 2] == CR && message[i + 3] == LF) {
114
                    break;
115
                }
116
            }
117
        }
118

  
119
        // Get header
120
        byte[] headerByte = new byte[i];
121

  
122
        System.arraycopy(message, 0, headerByte, 0, i);
123

  
124
        header = new String(headerByte);
125
        header = header.trim();
126

  
127
        // Get body
128
        if (size > i + 3) {
129
            body = new byte[size - i - 4];
130
            System.arraycopy(message, i + 4, body, 0, size - i - 4);
131
        } else {
132
            body = null;
133
        }
134

  
135
    }
136

  
137
    @Override
138
    public void run() {
139

  
140
        // Do whatever the message asks
141
        try {
142

  
143
            // Identify Type
144
            switch (type) {
145
            case PUTCHUNK:
146

  
147
                // Someone(NOT US!) sent a CHUNK, cancel it
148
                if (this.elseSentChunkReclaim.get() && senderId != this.peer.id)
149
                    this.elseSentChunkReclaim.set(true);
150

  
151
                // Dont backup self
152
                if (peer.doingBackUp)
153
                    return;
154

  
155
                // Check peer has enough size available
156
                if (peer.used + body.length > peer.limit)
157
                    return;
158

  
159
                // Get path to peer
160
                Path currentRelativePath = Paths.get("");
161
                String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + peer.id;
162
                peerDir += "\\backup";
163
                peerDir += "\\" + fileId;
164
                File file = new File(peerDir);
165

  
166
                // Create path
167
                file.mkdirs();
168

  
169
                // Store chunk in file system
170
                peerDir += "\\chk" + chunkNo + ".chk";
171
                File chunk = new File(peerDir);
172
                FileOutputStream out = new FileOutputStream(chunk);
173
                out.write(body);
174
                out.close();
175

  
176
                // Store chunk in Peer cache(hashmap)
177
                peer.chunks.put(fileId + "-" + chunkNo,
178
                        new Chunk(fileId, Integer.parseInt(chunkNo), body, Integer.parseInt(replication)));
179

  
180
                // Add to used memory
181
                peer.used += body.length;
182

  
183
                // Random delay
184
                Thread.sleep(rand.nextInt(400));
185

  
186
                // Send STORED to MC
187
                Message msg = new Message(Message.MessageType.STORED, peer.id, fileId, Integer.parseInt(chunkNo), 0,
188
                        new byte[0]);
189

  
190
                DatagramPacket packet = msg.packit(Server.MC.address, Server.MC.port);
191

  
192
                Server.MC.socket.send(packet);
193

  
194
                break;
195
            case STORED:
196

  
197
                // If doing backup then add to replications
198
                if (peer.doingBackUp && peer.file.equals(fileId)) {
199
                    peer.replications++;
200
                }
201

  
202
                // Chunk that was stored
203
                String keyStored = fileId + "-" + chunkNo;
204

  
205
                // If we have the chunk
206
                if (this.peer.chunks.get(keyStored) != null) {
207
                    this.peer.chunks.get(keyStored).cReps++;
208
                }
209

  
210
                break;
211
            case GETCHUNK:
212

  
213
                // Find chunk in store
214
                Chunk requestedChunk = peer.chunks.get(fileId + "-" + chunkNo);
215

  
216
                // Chunk not found, just return
217
                if (requestedChunk == null)
218
                    return;
219

  
220
                // Set flag to lookout for CHUNK
221
                this.elseSentChunk.set(false);
222

  
223
                // Wait random delay
224
                Thread.sleep(rand.nextInt(400));
225

  
226
                // Someone else sent the CHUNK, just return
227
                if (this.elseSentChunk.get())
228
                    return;
229

  
230
                // Send CHUNK to MDR
231
                Message msgGetChunk = new Message(Message.MessageType.CHUNK, peer.id, fileId, Integer.parseInt(chunkNo),
232
                        0, requestedChunk.bytes);
233

  
234
                DatagramPacket packetGetChunk = msgGetChunk.packit(Server.MDR.address, Server.MDR.port);
235

  
236
                Server.MDR.socket.send(packetGetChunk);
237

  
238
                break;
239
            case CHUNK:
240

  
241
                // Find chunk in store
242
                Chunk receivedChunk = peer.chunks.get(fileId + "-" + chunkNo);
243

  
244
                // Someone(NOT US!) sent a CHUNK we were going to send, cancel it
245
                if (receivedChunk != null && senderId != this.peer.id) {
246
                    this.elseSentChunk.set(true);
247
                }
248

  
249
                // Add to recovered if not there yet
250
                if (this.peer.restoring) {
251
                    if (!this.peer.recoveredChunks.containsKey(chunkNo)) {
252
                        this.peer.recoveredChunks.put(chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), body, 0));
253
                        // Chunk received, continue asking for rest
254
                        if (peer.waitRestoreReply.get())
255
                            peer.waitRestoreReply.set(false);
256
                    }
257

  
258
                }
259

  
260
                break;
261
            case DELETE:
262

  
263
                boolean found1 = false;
264

  
265
                // Delete cache chunks
266
                for (String keyDelete : this.peer.chunks.keySet()) {
267
                    if (keyDelete.contains(this.fileId)) {
268
                        found1 = true;
269
                        this.peer.chunks.remove(keyDelete);
270
                    }
271
                }
272

  
273
                // Delete filesystem chunks
274
                // If at least one in cache was found then surely it exists in file system and
275
                // we should delete
276
                if (found1) {
277
                    Path current = Paths.get("");
278
                    String fileDir = current.toAbsolutePath().toString() + "\\peer" + peer.id;
279
                    fileDir += "\\backup";
280
                    fileDir += "\\" + fileId;
281
                    File fileDirFile = new File(fileDir);
282
                    FileManager.deleteDir(fileDirFile);
283
                }
284

  
285
                break;
286
            case REMOVED:
287

  
288
                // Chunk requested
289
                String key = fileId + "-" + chunkNo;
290

  
291
                // If we have the chunk
292
                if (this.peer.chunks.get(key) != null) {
293

  
294
                    // Remove replication
295
                    this.peer.chunks.get(key).cReps--;
296

  
297
                    System.out.println(this.peer.chunks.get(key).cReps);
298
                    System.out.println(this.peer.chunks.get(key).reps);
299

  
300
                    // If bellow replication degree
301
                    if (this.peer.chunks.get(key).cReps < this.peer.chunks.get(key).reps) {
302
                        // Wait random delay
303
                        this.elseSentChunkReclaim.set(false);
304
                        Thread.sleep(rand.nextInt(400));
305

  
306
                        // Someone already started backup of this chunk
307
                        if (this.elseSentChunkReclaim.get())
308
                            return;
309

  
310
                        // Prepare message
311
                        Message msgRemovedChunk = new Message(Message.MessageType.PUTCHUNK, this.peer.id, fileId,
312
                                this.peer.chunks.get(key).chunkN, this.peer.chunks.get(key).reps,
313
                                this.peer.chunks.get(key).bytes);
314

  
315
                        // Pack in datagram
316
                        DatagramPacket packetRemovedChunk = msgRemovedChunk.packit(this.peer.MDB.address,
317
                                this.peer.MDB.port);
318

  
319
                        // Send message
320
                        this.peer.MDB.socket.send(packetRemovedChunk);
321
                    }
322

  
323
                }
324

  
325
                break;
326

  
327
            default:
328
                break;
329
            }
330

  
331
        } catch (Exception e) {
332
            e.printStackTrace();
333
        }
334

  
335
    }
336

  
337
}
Message.java
1
import java.net.DatagramPacket;
2
import java.net.InetAddress;
3

  
4
public class Message {
5

  
6
    private static final String CRLF = "\r\n";
7

  
8
    public String header;
9
    public byte[] body;
10

  
11
    public enum MessageType {
12
        PUTCHUNK, STORED, GETCHUNK, CHUNK, DELETE, REMOVED
13
    };
14

  
15
    public MessageType type;
16

  
17
    public Message() {
18
    }
19

  
20
    // Build a message
21
    public Message(MessageType type, String senderId, String fileId, int chunkN, int replicationDeg, byte[] bytes) {
22

  
23
        // Reset body
24
        body = new byte[0];
25

  
26
        // Add chunk
27
        body = bytes;
28

  
29
        // Assign type
30
        this.type = type;
31

  
32
        // Prepare type
33
        switch (type) {
34
        case PUTCHUNK:
35
            header = "PUTCHUNK";
36
            break;
37
        case STORED:
38
            header = "STORED";
39
            break;
40
        case GETCHUNK:
41
            header = "GETCHUNK";
42
            break;
43
        case CHUNK:
44
            header = "CHUNK";
45
            break;
46
        case DELETE:
47
            header = "DELETE";
48
            break;
49
        case REMOVED:
50
            header = "REMOVED";
51
            break;
52
        default:
53
            break;
54
        }
55

  
56
        // Version, SenderId and FileId are shared
57
        // TODO hardcoded version 1.0
58
        header += " 1.0";
59
        header += " " + senderId;
60
        header += " " + fileId;
61

  
62
        // Remaining message header
63
        // Technically it would be more efficient to add chunkN to all types and remove
64
        // in DELETE but eh
65
        switch (type) {
66
        case PUTCHUNK:
67
            header += " " + Integer.toString(chunkN);
68
            header += " " + Integer.toString(replicationDeg);
69
            break;
70
        case STORED:
71
            header += " " + Integer.toString(chunkN);
72
            break;
73
        case GETCHUNK:
74
            header += " " + Integer.toString(chunkN);
75
            break;
76
        case CHUNK:
77
            header += " " + Integer.toString(chunkN);
78
            break;
79
        case DELETE:
80
            // Nothing to add
81
            break;
82
        case REMOVED:
83
            header += " " + Integer.toString(chunkN);
84
            break;
85
        default:
86
            break;
87
        }
88

  
89
        // Finalize with double CRLF
90
        header += CRLF + CRLF;
91

  
92
    }
93

  
94
    // Pack a datagram from message
95
    public DatagramPacket packit(InetAddress address, int port) {
96

  
97
        byte[] headerBytes = header.getBytes();
98

  
99
        byte[] full = new byte[headerBytes.length + body.length];
100

  
101
        System.arraycopy(headerBytes, 0, full, 0, headerBytes.length);
102
        System.arraycopy(body, 0, full, headerBytes.length, body.length);
103

  
104
        DatagramPacket packet = new DatagramPacket(full, full.length, address, port);
105

  
106
        return packet;
107

  
108
    }
109

  
110
}
PeerInterface.java
1

  
2
import java.rmi.Remote;
3
import java.rmi.RemoteException;
4

  
5
public interface PeerInterface extends Remote {
6

  
7
    void backup(String path, int replications) throws RemoteException;
8

  
9
    void restore(String path) throws RemoteException;
10

  
11
    void state() throws RemoteException;
12

  
13
    void reclaim(int memory) throws RemoteException;
14

  
15
    void delete(String path) throws RemoteException;
16

  
17
}
Server.java
1
import java.rmi.registry.Registry;
2
import java.rmi.registry.LocateRegistry;
3
import java.rmi.RemoteException;
4
import java.rmi.server.UnicastRemoteObject;
5

  
6
import java.util.Map;
7

  
8
import java.util.concurrent.ConcurrentHashMap;
9
import java.util.concurrent.ScheduledThreadPoolExecutor;
10
import java.util.concurrent.TimeUnit;
11
import java.util.concurrent.atomic.AtomicBoolean;
12
import java.util.concurrent.Executors;
13
import java.util.concurrent.CopyOnWriteArrayList;;
14
import java.util.ArrayList;
15
import java.util.List;
16
import java.util.Comparator;
17
import java.util.HashSet;
18
import java.util.Collections;
19

  
20
import java.io.*;
21

  
22
import java.net.DatagramPacket;
23

  
24
import java.nio.file.*;
25

  
26
//Server class essentially a Peer
27
public class Server implements PeerInterface {
28

  
29
    // Unique peer id
30
    public String id;
31

  
32
    // Size controls
33
    public static final int DEFAULT_MAX_SIZE = 1000000000;
34
    public int limit;
35
    public int used;
36

  
37
    // Thread Pool
38
    public static ScheduledThreadPoolExecutor pool;
39

  
40
    // Sockets
41
    public static Socket MC;
42
    public static Socket MDB;
43
    public static Socket MDR;
44

  
45
    // State Backup
46
    public boolean doingBackUp;
47
    public String file;
48
    public int replications;
49

  
50
    // State Restore
51
    public boolean restoring;
52
    public AtomicBoolean waitRestoreReply = new AtomicBoolean(false);
53
    public ConcurrentHashMap<String, Chunk> recoveredChunks = new ConcurrentHashMap<String, Chunk>(); // ChunkNo -> Chunk
54

  
55
    // Service Management Information
56
    public ConcurrentHashMap<String, Chunk> chunks = new ConcurrentHashMap<String, Chunk>(); // fileId-ChunkNo
57
                                                                                             // -> Chunk
58
    public ConcurrentHashMap<String, String> files = new ConcurrentHashMap<String, String>(); // fileId -> // file name
59

  
60
    // Empty Constructor
61
    public Server() {
62
    }
63

  
64
    // Simplified Constructor(Default values)
65
    public Server(String id) throws IOException {
66
        this.used = 0;
67
        this.id = id;
68
        this.limit = DEFAULT_MAX_SIZE;
69
        this.setupSockets();
70
        this.loadLocal();
71
        this.setupDirectory();
72
    }
73

  
74
    // Full constructor
75
    public Server(String id, int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
76
            String addressMDR) throws IOException {
77
        this.used = 0;
78
        this.id = id;
79
        this.limit = DEFAULT_MAX_SIZE;
80
        this.setupSockets(portMC, addressMC, portMDB, addressMDB, portMDR, addressMDR);
81
        this.loadLocal();
82
        this.setupDirectory();
83
    }
84

  
85
    // Backup Protocol
86
    public void backup(String path, int replications) throws RemoteException {
87

  
88
        // TODO is file backed up already?
89

  
90
        // Start backup
91
        this.doingBackUp = true;
92

  
93
        // Extract file name
94
        int index = path.lastIndexOf("\\");
95
        String fileName = path.substring(index + 1);
96

  
97
        // Generate Hash
98
        // TODO dont use just file name for hashing(multiple file versions with same
99
        // name problem)
100
        String fileId = FileManager.hash(fileName);
101

  
102
        // Add to file list
103
        this.files.put(fileId, fileName);
104

  
105
        try {
106

  
107
            // Get file bytes
108
            byte[] bytes = FileManager.readFile(path);
109

  
110
            // Get file in chunks
111
            byte[][] chunks = FileManager.split(bytes);
112

  
113
            // Send each chunk
114
            for (int i = 0; i < chunks.length; i++) {
115

  
116
                // Base receive time of 1 second
117
                int receiveTime = 1000;
118

  
119
                // PUTCHUNK tries
120
                for (int j = 0; j < 5; j++) {
121

  
122
                    // Prepare message
123
                    Message msg = new Message(Message.MessageType.PUTCHUNK, id, fileId, i, replications, chunks[i]);
124

  
125
                    // Pack in datagram
126
                    DatagramPacket packet = msg.packit(MDB.address, MDB.port);
127

  
128
                    // Send message
129
                    MDB.socket.send(packet);
130

  
131
                    // Wait for replies and check Replication level
132
                    this.replications = 0;
133
                    this.file = fileId;
134
                    Thread.sleep(receiveTime);
135
                    if (this.replications >= replications) {
136
                        break;
137
                    }
138

  
139
                    // Double receive time
140
                    receiveTime *= 2;
141

  
142
                }
143

  
144
            }
145

  
146
        } catch (Exception e) {
147
            e.printStackTrace();
148
        }
149

  
150
        // Terminate backup
151
        this.doingBackUp = false;
152

  
153
    }
154

  
155
    // Restore Protocol
156
    public void restore(String path) throws RemoteException {
157

  
158
        this.restoring = true;
159

  
160
        // File name
161
        int index = path.lastIndexOf("\\");
162
        String fileName = path.substring(index + 1);
163

  
164
        // Generate Hash
165
        // TODO dont use just file name for hashing(multiple file versions with same
166
        // name problem)
167
        String fileId = FileManager.hash(fileName);
168

  
169
        // Find in list of backed up
170
        String name = this.files.get(fileId);
171

  
172
        // File not backed up yet, just return
173
        if (name == null) {
174
            return;
175
        }
176

  
177
        try {
178

  
179
            // Get byte count (probably better than actually fetching the bytes array)
180
            File file = new File(path);
181
            int bytes = (int) file.length();
182

  
183
            // How many chunks to expect
184
            int chunks = 1 + bytes / FileManager.chunkSize;
185

  
186
            // Reset list of Chunks recovered
187
            this.recoveredChunks.clear();
188

  
189
            // Request chunks
190
            for (int i = 0; i < chunks; i++) {
191

  
192
                // Prepare message
193
                Message msg = new Message(Message.MessageType.GETCHUNK, id, fileId, i, 0, new byte[0]);
194

  
195
                // Pack in datagram
196
                DatagramPacket packet = msg.packit(MC.address, MC.port);
197

  
198
                // Send message
199
                MC.socket.send(packet);
200

  
201
                // Wait for response
202
                this.waitRestoreReply.set(true);
203
                while (this.waitRestoreReply.get()) {
204
                }
205

  
206
            }      
207

  
208
            //Get simple list from recovered chunks
209
            List<Chunk> list = new ArrayList<Chunk>(this.recoveredChunks.values());
210

  
211
            // Sort chunks
212
            Collections.sort(list, new Comparator<Chunk>() {
213
                @Override
214
                public int compare(Chunk p1, Chunk p2) {
215
                    return p1.chunkN - p2.chunkN; // Ascending
216
                }
217
            });
218

  
219
            // Pack chunks
220
            byte[][] allBytes = new byte[list.size()][];
221
            for (int i = 0; i < list.size(); i++) {
222
                allBytes[i] = list.get(i).bytes;
223
            }
224

  
225
            // Merge chunks
226
            byte[] fileBytes = FileManager.merge(allBytes);
227

  
228
            // Save file
229
            Path current = Paths.get("");
230
            String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\restored\\" + fileName;
231
            try (FileOutputStream stream = new FileOutputStream(place)) {
232
                stream.write(fileBytes);
233
                stream.close();
234
            }
235

  
236
        } catch (Exception e) {
237
            e.printStackTrace();
238
        }
239

  
240
        this.restoring = false;
241

  
242
    }
243

  
244
    // Delete chunks associated to this file
245
    public void delete(String path) throws RemoteException {
246

  
247
        // File name
248
        int index = path.lastIndexOf("\\");
249
        String fileName = path.substring(index + 1);
250

  
251
        // Generate Hash
252
        // TODO dont use just file name for hashing(multiple file versions with same
253
        // name problem)
254
        String fileId = FileManager.hash(fileName);
255

  
256
        try {
257

  
258
            // Prepare message
259
            Message msg = new Message(Message.MessageType.DELETE, id, fileId, 0, 0, new byte[0]);
260

  
261
            // Pack in datagram
262
            DatagramPacket packet = msg.packit(MC.address, MC.port);
263

  
264
            // Send message
265
            MC.socket.send(packet);
266

  
267
        } catch (Exception e) {
268
            e.printStackTrace();
269
        }
270

  
271
    }
272

  
273
    public void reclaim(int memory) throws RemoteException {
274

  
275
        // Memory can't be lower than 0
276
        if (memory < 0) {
277
            return;
278
        }
279

  
280
        // Assign new limit
281
        limit = memory;
282

  
283
        try {
284

  
285
            // Check need to delete chunks
286
            // Going to delete by order in the map
287
            while (used > limit) {
288

  
289
                // Pick an entry
290
                Map.Entry<String, Chunk> entry = chunks.entrySet().iterator().next();
291

  
292
                // Remove from used space
293
                used -= entry.getValue().bytes.length;
294

  
295
                // Prepare message
296
                Message msg = new Message(Message.MessageType.REMOVED, id, entry.getValue().fileId,
297
                        entry.getValue().chunkN, 0, new byte[0]);
298

  
299
                // Pack in datagram
300
                DatagramPacket packet = msg.packit(MC.address, MC.port);
301

  
302
                // Send message
303
                MC.socket.send(packet);
304

  
305
                // Remove from map
306
                chunks.remove(entry.getKey());
307

  
308
                // Remove from file system
309
                Path current = Paths.get("");
310
                String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\backup\\" + entry.getValue().fileId + "\\chk" + entry.getValue().chunkN + ".chk";
311
                File toDelete = new File(place);
312
                toDelete.delete();
313

  
314
            }
315

  
316
        } catch (Exception e) {
317
            e.printStackTrace();
318
        }
319

  
320
    }
321

  
322
    public void state() throws RemoteException {
323

  
324
    }
325

  
326
    // Get information of previous peer life from file system
327
    public void loadLocal() {
328

  
329
        // Base path
330
        Path currentRelativePath = Paths.get("");
331
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
332

  
333
        // Backup path
334
        File file = new File(peerDir + "\\backup");
335

  
336
        // FileIds backed up
337
        File[] files = file.listFiles(File::isDirectory);
338

  
339
        // Peer didn't exist, just return
340
        if (files == null)
341
            return;
342

  
343
        // Each file directory
344
        for (int i = 0; i < files.length; i++) {
345

  
346
            // FileId
347
            String fileId = files[i].getName();
348

  
349
            // Get Chunks inside
350
            file = new File(peerDir + "\\backup" + "\\" + fileId);
351
            File[] chunks = file.listFiles();
352

  
353
            // No chunks, just return
354
            if (chunks == null)
355
                return;
356

  
357
            // Each chunk
358
            for (int j = 0; j < chunks.length; j++) {
359

  
360
                // Chunk name
361
                String chunkNo = chunks[j].getName();
362

  
363
                // Remove chk
364
                chunkNo = chunkNo.replaceAll("\\.", "");
365
                chunkNo = chunkNo.replaceAll("chk", "");
366

  
367
                try {
368

  
369
                    // Read bytes
370
                    byte[] bytes = Files.readAllBytes(chunks[j].toPath());
371

  
372
                    // Updated used bytes
373
                    used += bytes.length;
374

  
375
                    // Add to HashMap
376
                    this.chunks.put(fileId + "-" + chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), bytes, 0));
377

  
378
                } catch (Exception e) {
379
                    e.printStackTrace();
380
                }
381

  
382
            }
383

  
384
        }
385

  
386
    }
387

  
388
    // Setup directory
389
    public void setupDirectory() {
390

  
391
        // Base path
392
        Path currentRelativePath = Paths.get("");
393
        String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id;
394

  
395
        // Backup path
396
        File file = new File(peerDir + "\\backup");
397
        file.mkdirs();
398

  
399
        // Backup path
400
        file = new File(peerDir + "\\restored");
401
        file.mkdirs();
402

  
403
    }
404

  
405
    // Default Socket Setup
406
    public void setupSockets() throws IOException {
407

  
408
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
409

  
410
        MC = new Socket(this, Socket.Type.MC);
411
        pool.execute(MC);
412

  
413
        MDB = new Socket(this, Socket.Type.MDB);
414
        pool.execute(MDB);
415

  
416
        MDR = new Socket(this, Socket.Type.MDR);
417
        pool.execute(MDR);
418

  
419
    }
420

  
421
    // Specifc Socket Setup
422
    public void setupSockets(int portMC, String addressMC, int portMDB, String addressMDB, int portMDR,
423
            String addressMDR) throws IOException {
424

  
425
        pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000);
426

  
427
        MC = new Socket(this, Socket.Type.MC, portMC, addressMC);
428
        pool.execute(MC);
429

  
430
        MDB = new Socket(this, Socket.Type.MDB, portMDB, addressMDB);
431
        pool.execute(MDB);
432

  
433
        MDR = new Socket(this, Socket.Type.MDR, portMDR, addressMDR);
434
        pool.execute(MDR);
435

  
436
    }
437

  
438
    public static void main(String args[]) {
439

  
440
        // TODO Arguments are not correct, need to receive all the specific socket
441
        // arguments too, version and server(RMI)
442

  
443
        // Check arguments
444
        if (args.length < 1) {
445
            System.err.println("Invalid number of arguments");
446
            return;
447
        }
448

  
449
        try {
450

  
451
            // Create Server
452
            Server obj = new Server(args[0]);
453

  
454
            // Bind the server as a registry
455
            PeerInterface stub = (PeerInterface) UnicastRemoteObject.exportObject(obj, 0);
456

  
457
            // Bind the remote object's stub in the registry
458

  
459
            // TODO Get registry what port?
460
            Registry registry = LocateRegistry.getRegistry();
461
            registry.bind(args[0], stub);
462

  
463
            System.err.println("Server ready");
464

  
465
        } catch (Exception e) {
466

  
467
            System.err.println("Server exception: " + e.toString());
468
            e.printStackTrace();
469

  
470
        }
471
    }
472

  
473
}
Socket.java
1

  
2
import java.io.IOException;
3
import java.net.DatagramPacket;
4
import java.net.InetAddress;
5
import java.net.MulticastSocket;
6
import java.util.AbstractMap.SimpleEntry;
7

  
8
public class Socket implements Runnable {
9

  
10
    public MulticastSocket socket;
11
    public int port;
12
    public InetAddress address;
13

  
14
    public enum Type {
15
        MC, MDB, MDR
16
    }
17

  
18
    public Type type;
19

  
20
    public Server peer;
21

  
22
    // Message size(safe size)
23
    public static final int MESSAGE_SIZE = 65500;
24

  
25
    // Default constructor
26
    public Socket(Server peer, Type type) throws IOException {
27

  
28
        this.peer = peer;
29

  
30
        switch (type) {
31
        case MC:
32
            this.port = 8000;
33
            this.address = InetAddress.getByName("224.0.0.1");
34
            break;
35
        case MDB:
36
            this.port = 8001;
37
            this.address = InetAddress.getByName("224.0.0.2");
38
            break;
39
        case MDR:
40
            this.port = 8002;
41
            this.address = InetAddress.getByName("224.0.0.3");
42
            break;
43
        default:
44
            return;
45
        }
46

  
47
        this.type = type;
48
        this.socket = new MulticastSocket(this.port);
49
        this.socket.joinGroup(address);
50

  
51
    }
52

  
53
    // Specific constructor
54
    public Socket(Server peer, Type type, int port, String address) throws IOException {
55

  
56
        this.peer = peer;
57
        this.type = type;
58
        this.port = port;
59
        this.address = InetAddress.getByName(address);
60
        this.socket = new MulticastSocket(this.port);
61
        this.socket.joinGroup(this.address);
62

  
63
    }
64

  
65
    @Override
66
    public void run() {
67

  
68
        while (true) {
69

  
70
            byte[] buf = new byte[MESSAGE_SIZE];
71
            DatagramPacket packet = new DatagramPacket(buf, buf.length);
72

  
73
            try {
74

  
75
                this.socket.receive(packet);
76

  
77
                System.out.println("Packet received, launching interpreter");
78

  
79
                Server.pool.execute(new Interpreter(peer, packet));
80

  
81
            } catch (IOException e) {
82
                e.printStackTrace();
83
            }
84

  
85
        }
86

  
87
    }
88

  
89
}
clientBackup.bat
1
@echo off 
2
java Client 1 BACKUP test.png 2
clientDelete.bat
1
@echo off 
2
java Client 1 DELETE test.png
clientReclaim.bat
1
@echo off 
2
java Client 2 RECLAIM 0
clientRestore.bat
1
@echo off 
2
java Client 1 RESTORE test.png
compile.bat
1
@echo off 
2
javac *.java
peer1.bat
1
@echo off 
2
java Server 1
peer2.bat
1
@echo off 
2
java Server 2
peer3.bat
1
@echo off 
2
java Server 3
peer4.bat
1
@echo off 
2
java Server 4
rmi.bat
1
@echo off 
2
start rmiregistry
snooper.bat
1
@echo off 
2
java -jar McastSnooper.jar 224.0.0.1:8000 224.0.0.2:8001 224.0.0.3:8002
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff