Project

General

Profile

Statistics
| Revision:

root / src / Interpreter.java

History | View | Annotate | Download (10.7 KB)

1 3 up20160792
2
import java.util.Random;
3
import java.rmi.registry.Registry;
4
import java.rmi.registry.LocateRegistry;
5
import java.rmi.RemoteException;
6
import java.rmi.server.UnicastRemoteObject;
7
8
import java.util.Map;
9
10
import java.util.concurrent.ConcurrentHashMap;
11
import java.util.concurrent.ScheduledThreadPoolExecutor;
12
import java.util.concurrent.TimeUnit;
13
import java.util.concurrent.atomic.AtomicBoolean;
14
import java.util.concurrent.Executors;
15
import java.util.concurrent.CopyOnWriteArrayList;
16
import java.util.ArrayList;
17
import java.util.List;
18
import java.util.Comparator;
19
import java.util.HashSet;
20
import java.util.Collections;
21
22
import java.io.*;
23
24
import java.net.DatagramPacket;
25
26
import java.nio.file.*;
27
28
public class Interpreter implements Runnable {
29
30
    public Random rand = new Random();
31
32
    private static final byte CR = 0xD;
33
    private static final byte LF = 0xA;
34
35
    public Server peer;
36
37
    public String raw;
38
39
    public String header;
40
    public byte[] body;
41
42
    public String version;
43
    public String senderId;
44
    public String fileId;
45
    public String chunkNo;
46
    public String replication;
47
48
    public AtomicBoolean elseSentChunk = new AtomicBoolean(false);
49
    public AtomicBoolean elseSentChunkReclaim = new AtomicBoolean(true);
50
51
    public Message.MessageType type;
52
53
    public Interpreter() {
54
    }
55
56
    public Interpreter(Server peer, DatagramPacket packet) {
57
58
        // Set peer
59
        this.peer = peer;
60
61
        // Seperate header and body
62
        messageSplitHeaderBody(packet.getData(), packet.getLength());
63
64
        System.out.println(header);
65
66
        // Get parts
67
        String[] parts = header.split(" ");
68
69
        // Identify type and specific arguments
70
        switch (parts[0]) {
71
        case "PUTCHUNK":
72
            type = Message.MessageType.PUTCHUNK;
73
            chunkNo = parts[4];
74
            replication = parts[5];
75
            break;
76
        case "STORED":
77
            type = Message.MessageType.STORED;
78
            chunkNo = parts[4];
79
            break;
80
        case "GETCHUNK":
81
            type = Message.MessageType.GETCHUNK;
82
            chunkNo = parts[4];
83
            break;
84
        case "CHUNK":
85
            type = Message.MessageType.CHUNK;
86
            chunkNo = parts[4];
87
            break;
88
        case "DELETE":
89
            type = Message.MessageType.DELETE;
90
            break;
91
        case "REMOVED":
92
            type = Message.MessageType.REMOVED;
93
            chunkNo = parts[4];
94
            break;
95
        default:
96
            break;
97
        }
98
99
        // Version, SenderId and FileId are shared
100
        version = parts[1];
101
        senderId = parts[2];
102
        fileId = parts[3];
103
104
    }
105
106
    // Split message header and body
107
    public void messageSplitHeaderBody(byte[] message, int size) {
108
109
        // Find index of CRLF
110
        int i = 0;
111
        for (i = 0; i < size; i++) {
112
            if (i <= size - 5) {
113
                if (message[i] == CR && message[i + 1] == LF && message[i + 2] == CR && message[i + 3] == LF) {
114
                    break;
115
                }
116
            }
117
        }
118
119
        // Get header
120
        byte[] headerByte = new byte[i];
121
122
        System.arraycopy(message, 0, headerByte, 0, i);
123
124
        header = new String(headerByte);
125
        header = header.trim();
126
127
        // Get body
128
        if (size > i + 3) {
129
            body = new byte[size - i - 4];
130
            System.arraycopy(message, i + 4, body, 0, size - i - 4);
131
        } else {
132
            body = null;
133
        }
134
135
    }
136
137
    @Override
138
    public void run() {
139
140
        // Do whatever the message asks
141
        try {
142
143
            // Identify Type
144
            switch (type) {
145
            case PUTCHUNK:
146
147
                // Someone(NOT US!) sent a CHUNK, cancel it
148
                if (this.elseSentChunkReclaim.get() && senderId != this.peer.id)
149
                    this.elseSentChunkReclaim.set(true);
150
151
                // Dont backup self
152
                if (peer.doingBackUp)
153
                    return;
154
155
                // Check peer has enough size available
156
                if (peer.used + body.length > peer.limit)
157
                    return;
158
159
                // Get path to peer
160
                Path currentRelativePath = Paths.get("");
161
                String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + peer.id;
162
                peerDir += "\\backup";
163
                peerDir += "\\" + fileId;
164
                File file = new File(peerDir);
165
166
                // Create path
167
                file.mkdirs();
168
169
                // Store chunk in file system
170
                peerDir += "\\chk" + chunkNo + ".chk";
171
                File chunk = new File(peerDir);
172
                FileOutputStream out = new FileOutputStream(chunk);
173
                out.write(body);
174
                out.close();
175
176
                // Store chunk in Peer cache(hashmap)
177
                peer.chunks.put(fileId + "-" + chunkNo,
178
                        new Chunk(fileId, Integer.parseInt(chunkNo), body, Integer.parseInt(replication)));
179
180
                // Add to used memory
181
                peer.used += body.length;
182
183
                // Random delay
184
                Thread.sleep(rand.nextInt(400));
185
186
                // Send STORED to MC
187
                Message msg = new Message(Message.MessageType.STORED, peer.id, fileId, Integer.parseInt(chunkNo), 0,
188
                        new byte[0]);
189
190
                DatagramPacket packet = msg.packit(Server.MC.address, Server.MC.port);
191
192
                Server.MC.socket.send(packet);
193
194
                break;
195
            case STORED:
196
197
                // If doing backup then add to replications
198
                if (peer.doingBackUp && peer.file.equals(fileId)) {
199
                    peer.replications++;
200
                }
201
202
                // Chunk that was stored
203
                String keyStored = fileId + "-" + chunkNo;
204
205
                // If we have the chunk
206
                if (this.peer.chunks.get(keyStored) != null) {
207
                    this.peer.chunks.get(keyStored).cReps++;
208
                }
209
210
                break;
211
            case GETCHUNK:
212
213
                // Find chunk in store
214
                Chunk requestedChunk = peer.chunks.get(fileId + "-" + chunkNo);
215
216
                // Chunk not found, just return
217
                if (requestedChunk == null)
218
                    return;
219
220
                // Set flag to lookout for CHUNK
221
                this.elseSentChunk.set(false);
222
223
                // Wait random delay
224
                Thread.sleep(rand.nextInt(400));
225
226
                // Someone else sent the CHUNK, just return
227
                if (this.elseSentChunk.get())
228
                    return;
229
230
                // Send CHUNK to MDR
231
                Message msgGetChunk = new Message(Message.MessageType.CHUNK, peer.id, fileId, Integer.parseInt(chunkNo),
232
                        0, requestedChunk.bytes);
233
234
                DatagramPacket packetGetChunk = msgGetChunk.packit(Server.MDR.address, Server.MDR.port);
235
236
                Server.MDR.socket.send(packetGetChunk);
237
238
                break;
239
            case CHUNK:
240
241
                // Find chunk in store
242
                Chunk receivedChunk = peer.chunks.get(fileId + "-" + chunkNo);
243
244
                // Someone(NOT US!) sent a CHUNK we were going to send, cancel it
245
                if (receivedChunk != null && senderId != this.peer.id) {
246
                    this.elseSentChunk.set(true);
247
                }
248
249
                // Add to recovered if not there yet
250
                if (this.peer.restoring) {
251
                    if (!this.peer.recoveredChunks.containsKey(chunkNo)) {
252
                        this.peer.recoveredChunks.put(chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), body, 0));
253
                        // Chunk received, continue asking for rest
254
                        if (peer.waitRestoreReply.get())
255
                            peer.waitRestoreReply.set(false);
256
                    }
257
258
                }
259
260
                break;
261
            case DELETE:
262
263
                boolean found1 = false;
264
265
                // Delete cache chunks
266
                for (String keyDelete : this.peer.chunks.keySet()) {
267
                    if (keyDelete.contains(this.fileId)) {
268
                        found1 = true;
269
                        this.peer.chunks.remove(keyDelete);
270
                    }
271
                }
272
273
                // Delete filesystem chunks
274
                // If at least one in cache was found then surely it exists in file system and
275
                // we should delete
276
                if (found1) {
277
                    Path current = Paths.get("");
278
                    String fileDir = current.toAbsolutePath().toString() + "\\peer" + peer.id;
279
                    fileDir += "\\backup";
280
                    fileDir += "\\" + fileId;
281
                    File fileDirFile = new File(fileDir);
282
                    FileManager.deleteDir(fileDirFile);
283
                }
284
285
                break;
286
            case REMOVED:
287
288
                // Chunk requested
289
                String key = fileId + "-" + chunkNo;
290
291
                // If we have the chunk
292
                if (this.peer.chunks.get(key) != null) {
293
294
                    // Remove replication
295
                    this.peer.chunks.get(key).cReps--;
296
297
                    System.out.println(this.peer.chunks.get(key).cReps);
298
                    System.out.println(this.peer.chunks.get(key).reps);
299
300
                    // If bellow replication degree
301
                    if (this.peer.chunks.get(key).cReps < this.peer.chunks.get(key).reps) {
302
                        // Wait random delay
303
                        this.elseSentChunkReclaim.set(false);
304
                        Thread.sleep(rand.nextInt(400));
305
306
                        // Someone already started backup of this chunk
307
                        if (this.elseSentChunkReclaim.get())
308
                            return;
309
310
                        // Prepare message
311
                        Message msgRemovedChunk = new Message(Message.MessageType.PUTCHUNK, this.peer.id, fileId,
312
                                this.peer.chunks.get(key).chunkN, this.peer.chunks.get(key).reps,
313
                                this.peer.chunks.get(key).bytes);
314
315
                        // Pack in datagram
316
                        DatagramPacket packetRemovedChunk = msgRemovedChunk.packit(this.peer.MDB.address,
317
                                this.peer.MDB.port);
318
319
                        // Send message
320
                        this.peer.MDB.socket.send(packetRemovedChunk);
321
                    }
322
323
                }
324
325
                break;
326
327
            default:
328
                break;
329
            }
330
331
        } catch (Exception e) {
332
            e.printStackTrace();
333
        }
334
335
    }
336
337
}