root / src / Server.java
History | View | Annotate | Download (14.8 KB)
1 |
import java.rmi.registry.Registry; |
---|---|
2 |
import java.rmi.registry.LocateRegistry; |
3 |
import java.rmi.RemoteException; |
4 |
import java.rmi.server.UnicastRemoteObject; |
5 |
|
6 |
import java.util.Map; |
7 |
|
8 |
import java.util.concurrent.ConcurrentHashMap; |
9 |
import java.util.concurrent.ScheduledThreadPoolExecutor; |
10 |
import java.util.concurrent.TimeUnit; |
11 |
import java.util.concurrent.atomic.AtomicBoolean; |
12 |
import java.util.concurrent.Executors; |
13 |
import java.util.concurrent.CopyOnWriteArrayList;; |
14 |
import java.util.ArrayList; |
15 |
import java.util.List; |
16 |
import java.util.Comparator; |
17 |
import java.util.HashSet; |
18 |
import java.util.Collections; |
19 |
|
20 |
import java.io.*; |
21 |
|
22 |
import java.net.DatagramPacket; |
23 |
|
24 |
import java.nio.file.*; |
25 |
|
26 |
//Server class essentially a Peer
|
27 |
public class Server implements PeerInterface { |
28 |
|
29 |
// Unique peer id
|
30 |
public String id; |
31 |
|
32 |
// Size controls
|
33 |
public static final int DEFAULT_MAX_SIZE = 1000000000; |
34 |
public int limit; |
35 |
public int used; |
36 |
|
37 |
// Thread Pool
|
38 |
public static ScheduledThreadPoolExecutor pool; |
39 |
|
40 |
// Sockets
|
41 |
public static Socket MC; |
42 |
public static Socket MDB; |
43 |
public static Socket MDR; |
44 |
|
45 |
// State Backup
|
46 |
public boolean doingBackUp; |
47 |
public String file; |
48 |
public int replications; |
49 |
|
50 |
// State Restore
|
51 |
public boolean restoring; |
52 |
public AtomicBoolean waitRestoreReply = new AtomicBoolean(false); |
53 |
public ConcurrentHashMap<String, Chunk> recoveredChunks = new ConcurrentHashMap<String, Chunk>(); // ChunkNo -> |
54 |
// Chunk
|
55 |
|
56 |
// Service Management Information
|
57 |
public ConcurrentHashMap<String, Chunk> chunks = new ConcurrentHashMap<String, Chunk>(); // fileId-ChunkNo |
58 |
// -> Chunk
|
59 |
public ConcurrentHashMap<String, String> files = new ConcurrentHashMap<String, String>(); // fileId -> // file name |
60 |
|
61 |
// Empty Constructor
|
62 |
public Server() {
|
63 |
} |
64 |
|
65 |
// Simplified Constructor(Default values)
|
66 |
public Server(String id) throws IOException { |
67 |
this.used = 0; |
68 |
this.id = id;
|
69 |
this.limit = DEFAULT_MAX_SIZE;
|
70 |
this.setupSockets();
|
71 |
this.loadLocal();
|
72 |
this.setupDirectory();
|
73 |
} |
74 |
|
75 |
// Full constructor
|
76 |
public Server(String id, int portMC, String addressMC, int portMDB, String addressMDB, int portMDR, |
77 |
String addressMDR) throws IOException { |
78 |
this.used = 0; |
79 |
this.id = id;
|
80 |
this.limit = DEFAULT_MAX_SIZE;
|
81 |
this.setupSockets(portMC, addressMC, portMDB, addressMDB, portMDR, addressMDR);
|
82 |
this.loadLocal();
|
83 |
this.setupDirectory();
|
84 |
} |
85 |
|
86 |
// Backup Protocol
|
87 |
public void backup(String path, int replications) throws RemoteException { |
88 |
|
89 |
// Start backup
|
90 |
this.doingBackUp = true; |
91 |
|
92 |
// Extract file name
|
93 |
String fileName = new File(path).getName(); |
94 |
|
95 |
// Generate Hash from file name and date
|
96 |
String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified())); |
97 |
|
98 |
// Is file backed up already?
|
99 |
if (this.files.get(fileId) != null) { |
100 |
System.out.println("File already backed up"); |
101 |
return;
|
102 |
} |
103 |
|
104 |
// Add to file list
|
105 |
this.files.put(fileId, fileName);
|
106 |
|
107 |
try {
|
108 |
|
109 |
// Get file bytes
|
110 |
byte[] bytes = FileManager.readFile(path); |
111 |
|
112 |
// Get file in chunks
|
113 |
byte[][] chunks = FileManager.split(bytes); |
114 |
|
115 |
// Send each chunk
|
116 |
for (int i = 0; i < chunks.length; i++) { |
117 |
|
118 |
// Base receive time of 1 second
|
119 |
int receiveTime = 1000; |
120 |
|
121 |
// PUTCHUNK tries
|
122 |
for (int j = 0; j < 5; j++) { |
123 |
|
124 |
// Prepare message
|
125 |
Message msg = new Message(Message.MessageType.PUTCHUNK, id, fileId, i, replications, chunks[i]);
|
126 |
|
127 |
// Pack in datagram
|
128 |
DatagramPacket packet = msg.packit(MDB.address, MDB.port);
|
129 |
|
130 |
// Send message
|
131 |
MDB.socket.send(packet); |
132 |
|
133 |
// Wait for replies and check Replication level
|
134 |
this.replications = 0; |
135 |
this.file = fileId;
|
136 |
Thread.sleep(receiveTime);
|
137 |
if (this.replications >= replications) { |
138 |
break;
|
139 |
} |
140 |
|
141 |
// Double receive time
|
142 |
receiveTime *= 2;
|
143 |
|
144 |
} |
145 |
|
146 |
} |
147 |
|
148 |
} catch (Exception e) { |
149 |
e.printStackTrace(); |
150 |
} |
151 |
|
152 |
// Terminate backup
|
153 |
this.doingBackUp = false; |
154 |
|
155 |
} |
156 |
|
157 |
// Restore Protocol
|
158 |
public void restore(String path) throws RemoteException { |
159 |
|
160 |
this.restoring = true; |
161 |
|
162 |
// File name
|
163 |
String fileName = new File(path).getName(); |
164 |
|
165 |
// Generate Hash from file name and date
|
166 |
String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified())); |
167 |
|
168 |
// Find in list of backed up
|
169 |
String name = this.files.get(fileId); |
170 |
|
171 |
// File not backed up yet, just return
|
172 |
if (name == null) { |
173 |
System.out.println("File not backed up by this peer"); |
174 |
return;
|
175 |
} |
176 |
|
177 |
try {
|
178 |
|
179 |
// Get byte count (probably better than actually fetching the bytes array)
|
180 |
File file = new File(path); |
181 |
int bytes = (int) file.length(); |
182 |
|
183 |
// How many chunks to expect
|
184 |
int chunks = 1 + bytes / FileManager.chunkSize; |
185 |
|
186 |
// Reset list of Chunks recovered
|
187 |
this.recoveredChunks.clear();
|
188 |
|
189 |
// Request chunks
|
190 |
for (int i = 0; i < chunks; i++) { |
191 |
|
192 |
// Prepare message
|
193 |
Message msg = new Message(Message.MessageType.GETCHUNK, id, fileId, i, 0, new byte[0]); |
194 |
|
195 |
// Pack in datagram
|
196 |
DatagramPacket packet = msg.packit(MC.address, MC.port);
|
197 |
|
198 |
// Send message
|
199 |
MC.socket.send(packet); |
200 |
|
201 |
// Wait for response
|
202 |
this.waitRestoreReply.set(true); |
203 |
long time = System.nanoTime(); |
204 |
while (this.waitRestoreReply.get()) { |
205 |
// Restore timeout
|
206 |
if (TimeUnit.NANOSECONDS.toMillis(time) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) > 1000) { |
207 |
System.out.println("Restore timeout, missing chunks"); |
208 |
return;
|
209 |
} |
210 |
} |
211 |
|
212 |
} |
213 |
|
214 |
// Get simple list from recovered chunks
|
215 |
List<Chunk> list = new ArrayList<Chunk>(this.recoveredChunks.values()); |
216 |
|
217 |
// Sort chunks
|
218 |
Collections.sort(list, new Comparator<Chunk>() { |
219 |
@Override
|
220 |
public int compare(Chunk p1, Chunk p2) { |
221 |
return p1.chunkN - p2.chunkN; // Ascending |
222 |
} |
223 |
}); |
224 |
|
225 |
// Pack chunks
|
226 |
byte[][] allBytes = new byte[list.size()][]; |
227 |
for (int i = 0; i < list.size(); i++) { |
228 |
allBytes[i] = list.get(i).bytes; |
229 |
} |
230 |
|
231 |
// Merge chunks
|
232 |
byte[] fileBytes = FileManager.merge(allBytes); |
233 |
|
234 |
// Save file
|
235 |
Path current = Paths.get("");
|
236 |
String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\restored\\" + fileName; |
237 |
try (FileOutputStream stream = new FileOutputStream(place)) { |
238 |
stream.write(fileBytes); |
239 |
stream.close(); |
240 |
} |
241 |
|
242 |
} catch (Exception e) { |
243 |
e.printStackTrace(); |
244 |
} |
245 |
|
246 |
this.restoring = false; |
247 |
|
248 |
} |
249 |
|
250 |
// Delete chunks associated to this file
|
251 |
public void delete(String path) throws RemoteException { |
252 |
|
253 |
// File name
|
254 |
String fileName = new File(path).getName(); |
255 |
|
256 |
// Generate Hash from file name and date
|
257 |
String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified())); |
258 |
|
259 |
// Remove from list of backed up
|
260 |
String name = this.files.get(fileId); |
261 |
if (name != null) { |
262 |
this.files.remove(fileId);
|
263 |
} |
264 |
|
265 |
try {
|
266 |
|
267 |
// Prepare message
|
268 |
Message msg = new Message(Message.MessageType.DELETE, id, fileId, 0, 0, new byte[0]); |
269 |
|
270 |
// Pack in datagram
|
271 |
DatagramPacket packet = msg.packit(MC.address, MC.port);
|
272 |
|
273 |
// Send message
|
274 |
MC.socket.send(packet); |
275 |
|
276 |
} catch (Exception e) { |
277 |
e.printStackTrace(); |
278 |
} |
279 |
|
280 |
} |
281 |
|
282 |
public void reclaim(int memory) throws RemoteException { |
283 |
|
284 |
// Memory can't be lower than 0
|
285 |
if (memory < 0) { |
286 |
return;
|
287 |
} |
288 |
|
289 |
// Assign new limit
|
290 |
limit = memory; |
291 |
|
292 |
try {
|
293 |
|
294 |
// Check need to delete chunks
|
295 |
// Going to delete by order in the map
|
296 |
while (used > limit) {
|
297 |
|
298 |
// Pick an entry
|
299 |
Map.Entry<String, Chunk> entry = chunks.entrySet().iterator().next(); |
300 |
|
301 |
// Remove from used space
|
302 |
used -= entry.getValue().bytes.length; |
303 |
|
304 |
// Prepare message
|
305 |
Message msg = new Message(Message.MessageType.REMOVED, id, entry.getValue().fileId,
|
306 |
entry.getValue().chunkN, 0, new byte[0]); |
307 |
|
308 |
// Pack in datagram
|
309 |
DatagramPacket packet = msg.packit(MC.address, MC.port);
|
310 |
|
311 |
// Send message
|
312 |
MC.socket.send(packet); |
313 |
|
314 |
// Remove from map
|
315 |
chunks.remove(entry.getKey()); |
316 |
|
317 |
// Remove from file system
|
318 |
Path current = Paths.get("");
|
319 |
String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\backup\\" |
320 |
+ entry.getValue().fileId + "\\chk" + entry.getValue().chunkN + ".chk"; |
321 |
File toDelete = new File(place); |
322 |
toDelete.delete(); |
323 |
|
324 |
} |
325 |
|
326 |
} catch (Exception e) { |
327 |
e.printStackTrace(); |
328 |
} |
329 |
|
330 |
} |
331 |
|
332 |
public void state() throws RemoteException { |
333 |
|
334 |
} |
335 |
|
336 |
// Get information of previous peer life from file system
|
337 |
public void loadLocal() { |
338 |
|
339 |
// Base path
|
340 |
Path currentRelativePath = Paths.get("");
|
341 |
String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id; |
342 |
|
343 |
// Backup path
|
344 |
File file = new File(peerDir + "\\backup"); |
345 |
|
346 |
// FileIds backed up
|
347 |
File[] files = file.listFiles(File::isDirectory); |
348 |
|
349 |
// Peer didn't exist, just return
|
350 |
if (files == null) |
351 |
return;
|
352 |
|
353 |
// Each file directory
|
354 |
for (int i = 0; i < files.length; i++) { |
355 |
|
356 |
// FileId
|
357 |
String fileId = files[i].getName();
|
358 |
|
359 |
// Get Chunks inside
|
360 |
file = new File(peerDir + "\\backup" + "\\" + fileId); |
361 |
File[] chunks = file.listFiles(); |
362 |
|
363 |
// No chunks, just return
|
364 |
if (chunks == null) |
365 |
return;
|
366 |
|
367 |
// Each chunk
|
368 |
for (int j = 0; j < chunks.length; j++) { |
369 |
|
370 |
// Chunk name
|
371 |
String chunkNo = chunks[j].getName();
|
372 |
|
373 |
// Remove chk
|
374 |
chunkNo = chunkNo.replaceAll("\\.", ""); |
375 |
chunkNo = chunkNo.replaceAll("chk", ""); |
376 |
|
377 |
try {
|
378 |
|
379 |
// Read bytes
|
380 |
byte[] bytes = Files.readAllBytes(chunks[j].toPath()); |
381 |
|
382 |
// Updated used bytes
|
383 |
used += bytes.length; |
384 |
|
385 |
// Add to HashMap
|
386 |
this.chunks.put(fileId + "-" + chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), bytes, 0)); |
387 |
|
388 |
} catch (Exception e) { |
389 |
e.printStackTrace(); |
390 |
} |
391 |
|
392 |
} |
393 |
|
394 |
} |
395 |
|
396 |
} |
397 |
|
398 |
// Setup directory
|
399 |
public void setupDirectory() { |
400 |
|
401 |
// Base path
|
402 |
Path currentRelativePath = Paths.get("");
|
403 |
String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id; |
404 |
|
405 |
// Backup path
|
406 |
File file = new File(peerDir + "\\backup"); |
407 |
file.mkdirs(); |
408 |
|
409 |
// Backup path
|
410 |
file = new File(peerDir + "\\restored"); |
411 |
file.mkdirs(); |
412 |
|
413 |
} |
414 |
|
415 |
// Default Socket Setup
|
416 |
public void setupSockets() throws IOException { |
417 |
|
418 |
pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000); |
419 |
|
420 |
MC = new Socket(this, Socket.Type.MC); |
421 |
pool.execute(MC); |
422 |
|
423 |
MDB = new Socket(this, Socket.Type.MDB); |
424 |
pool.execute(MDB); |
425 |
|
426 |
MDR = new Socket(this, Socket.Type.MDR); |
427 |
pool.execute(MDR); |
428 |
|
429 |
} |
430 |
|
431 |
// Specifc Socket Setup
|
432 |
public void setupSockets(int portMC, String addressMC, int portMDB, String addressMDB, int portMDR, |
433 |
String addressMDR) throws IOException { |
434 |
|
435 |
pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000); |
436 |
|
437 |
MC = new Socket(this, Socket.Type.MC, portMC, addressMC); |
438 |
pool.execute(MC); |
439 |
|
440 |
MDB = new Socket(this, Socket.Type.MDB, portMDB, addressMDB); |
441 |
pool.execute(MDB); |
442 |
|
443 |
MDR = new Socket(this, Socket.Type.MDR, portMDR, addressMDR); |
444 |
pool.execute(MDR); |
445 |
|
446 |
} |
447 |
|
448 |
public static void main(String args[]) { |
449 |
|
450 |
try {
|
451 |
|
452 |
// Server object
|
453 |
Server obj; |
454 |
|
455 |
// Basic (version id)
|
456 |
if (args.length == 2) { |
457 |
obj = new Server(args[1]); |
458 |
} |
459 |
// Full (version id access port mc port mdb port mdr)
|
460 |
else if (args.length == 9) { |
461 |
obj = new Server(args[1], Integer.parseInt(args[3]), args[4], Integer.parseInt(args[5]), args[6], |
462 |
Integer.parseInt(args[7]), args[8]); |
463 |
} |
464 |
// Invalid arguments
|
465 |
else {
|
466 |
System.out.println("java Server <version> <server id>"); |
467 |
System.out.println(
|
468 |
"java Server <version> <server id> <access_point> <MC_port> <MC_IP_address> <MDB_port> <MDB_IP_address> <MDR_port> <MDR_IP_address>");
|
469 |
return;
|
470 |
} |
471 |
|
472 |
// Bind the server as a registry
|
473 |
PeerInterface stub = (PeerInterface) UnicastRemoteObject.exportObject(obj, 0); |
474 |
|
475 |
// Bind the remote object's stub in the registry
|
476 |
Registry registry = LocateRegistry.getRegistry(); |
477 |
registry.bind(obj.id, stub); |
478 |
|
479 |
System.err.println("Server ready"); |
480 |
|
481 |
} catch (Exception e) { |
482 |
|
483 |
System.err.println("Server exception: " + e.toString()); |
484 |
e.printStackTrace(); |
485 |
|
486 |
} |
487 |
} |
488 |
|
489 |
} |