root / Server.java @ 2
History | View | Annotate | Download (15 KB)
1 |
import java.rmi.registry.Registry; |
---|---|
2 |
import java.rmi.registry.LocateRegistry; |
3 |
import java.rmi.RemoteException; |
4 |
import java.rmi.server.UnicastRemoteObject; |
5 |
|
6 |
import java.util.Map; |
7 |
|
8 |
import java.util.concurrent.ConcurrentHashMap; |
9 |
import java.util.concurrent.ScheduledThreadPoolExecutor; |
10 |
import java.util.concurrent.TimeUnit; |
11 |
import java.util.concurrent.atomic.AtomicBoolean; |
12 |
import java.util.concurrent.Executors; |
13 |
import java.util.concurrent.CopyOnWriteArrayList;; |
14 |
import java.util.ArrayList; |
15 |
import java.util.List; |
16 |
import java.util.Comparator; |
17 |
import java.util.HashSet; |
18 |
import java.util.Collections; |
19 |
|
20 |
import java.io.*; |
21 |
|
22 |
import java.net.DatagramPacket; |
23 |
|
24 |
import java.nio.file.*; |
25 |
|
26 |
//Server class essentially a Peer
|
27 |
public class Server implements PeerInterface { |
28 |
|
29 |
// Unique peer id
|
30 |
public String id; |
31 |
|
32 |
// Size controls
|
33 |
public static final int DEFAULT_MAX_SIZE = 1000000000; |
34 |
public int limit; |
35 |
public int used; |
36 |
|
37 |
// Thread Pool
|
38 |
public static ScheduledThreadPoolExecutor pool; |
39 |
|
40 |
// Sockets
|
41 |
public static Socket MC; |
42 |
public static Socket MDB; |
43 |
public static Socket MDR; |
44 |
|
45 |
// State Backup
|
46 |
public boolean doingBackUp; |
47 |
public String file; |
48 |
public int replications; |
49 |
|
50 |
// State Restore
|
51 |
public boolean restoring; |
52 |
public AtomicBoolean waitRestoreReply = new AtomicBoolean(false); |
53 |
public ConcurrentHashMap<String, Chunk> recoveredChunks = new ConcurrentHashMap<String, Chunk>(); // ChunkNo -> |
54 |
// Chunk
|
55 |
|
56 |
// Service Management Information
|
57 |
public ConcurrentHashMap<String, Chunk> chunks = new ConcurrentHashMap<String, Chunk>(); // fileId-ChunkNo |
58 |
// -> Chunk
|
59 |
public ConcurrentHashMap<String, String> files = new ConcurrentHashMap<String, String>(); // fileId -> // file name |
60 |
|
61 |
// Empty Constructor
|
62 |
public Server() {
|
63 |
} |
64 |
|
65 |
// Simplified Constructor(Default values)
|
66 |
public Server(String id) throws IOException { |
67 |
this.used = 0; |
68 |
this.id = id;
|
69 |
this.limit = DEFAULT_MAX_SIZE;
|
70 |
this.setupSockets();
|
71 |
this.loadLocal();
|
72 |
this.setupDirectory();
|
73 |
} |
74 |
|
75 |
// Full constructor
|
76 |
public Server(String id, int portMC, String addressMC, int portMDB, String addressMDB, int portMDR, |
77 |
String addressMDR) throws IOException { |
78 |
this.used = 0; |
79 |
this.id = id;
|
80 |
this.limit = DEFAULT_MAX_SIZE;
|
81 |
this.setupSockets(portMC, addressMC, portMDB, addressMDB, portMDR, addressMDR);
|
82 |
this.loadLocal();
|
83 |
this.setupDirectory();
|
84 |
} |
85 |
|
86 |
// Backup Protocol
|
87 |
public void backup(String path, int replications) throws RemoteException { |
88 |
|
89 |
// Start backup
|
90 |
this.doingBackUp = true; |
91 |
|
92 |
// Extract file name
|
93 |
int index = path.lastIndexOf("\\"); |
94 |
String fileName = path.substring(index + 1); |
95 |
|
96 |
// Generate Hash from file name and date
|
97 |
String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified())); |
98 |
|
99 |
// Is file backed up already?
|
100 |
if (this.files.get(fileId) != null) { |
101 |
System.out.println("File already backed up"); |
102 |
return;
|
103 |
} |
104 |
|
105 |
// Add to file list
|
106 |
this.files.put(fileId, fileName);
|
107 |
|
108 |
try {
|
109 |
|
110 |
// Get file bytes
|
111 |
byte[] bytes = FileManager.readFile(path); |
112 |
|
113 |
// Get file in chunks
|
114 |
byte[][] chunks = FileManager.split(bytes); |
115 |
|
116 |
// Send each chunk
|
117 |
for (int i = 0; i < chunks.length; i++) { |
118 |
|
119 |
// Base receive time of 1 second
|
120 |
int receiveTime = 1000; |
121 |
|
122 |
// PUTCHUNK tries
|
123 |
for (int j = 0; j < 5; j++) { |
124 |
|
125 |
// Prepare message
|
126 |
Message msg = new Message(Message.MessageType.PUTCHUNK, id, fileId, i, replications, chunks[i]);
|
127 |
|
128 |
// Pack in datagram
|
129 |
DatagramPacket packet = msg.packit(MDB.address, MDB.port);
|
130 |
|
131 |
// Send message
|
132 |
MDB.socket.send(packet); |
133 |
|
134 |
// Wait for replies and check Replication level
|
135 |
this.replications = 0; |
136 |
this.file = fileId;
|
137 |
Thread.sleep(receiveTime);
|
138 |
if (this.replications >= replications) { |
139 |
break;
|
140 |
} |
141 |
|
142 |
// Double receive time
|
143 |
receiveTime *= 2;
|
144 |
|
145 |
} |
146 |
|
147 |
} |
148 |
|
149 |
} catch (Exception e) { |
150 |
e.printStackTrace(); |
151 |
} |
152 |
|
153 |
// Terminate backup
|
154 |
this.doingBackUp = false; |
155 |
|
156 |
} |
157 |
|
158 |
// Restore Protocol
|
159 |
public void restore(String path) throws RemoteException { |
160 |
|
161 |
this.restoring = true; |
162 |
|
163 |
// File name
|
164 |
int index = path.lastIndexOf("\\"); |
165 |
String fileName = path.substring(index + 1); |
166 |
|
167 |
// Generate Hash from file name and date
|
168 |
String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified())); |
169 |
|
170 |
// Find in list of backed up
|
171 |
String name = this.files.get(fileId); |
172 |
|
173 |
// File not backed up yet, just return
|
174 |
if (name == null) { |
175 |
System.out.println("File not backed up by this peer"); |
176 |
return;
|
177 |
} |
178 |
|
179 |
try {
|
180 |
|
181 |
// Get byte count (probably better than actually fetching the bytes array)
|
182 |
File file = new File(path); |
183 |
int bytes = (int) file.length(); |
184 |
|
185 |
// How many chunks to expect
|
186 |
int chunks = 1 + bytes / FileManager.chunkSize; |
187 |
|
188 |
// Reset list of Chunks recovered
|
189 |
this.recoveredChunks.clear();
|
190 |
|
191 |
// Request chunks
|
192 |
for (int i = 0; i < chunks; i++) { |
193 |
|
194 |
// Prepare message
|
195 |
Message msg = new Message(Message.MessageType.GETCHUNK, id, fileId, i, 0, new byte[0]); |
196 |
|
197 |
// Pack in datagram
|
198 |
DatagramPacket packet = msg.packit(MC.address, MC.port);
|
199 |
|
200 |
// Send message
|
201 |
MC.socket.send(packet); |
202 |
|
203 |
// Wait for response
|
204 |
this.waitRestoreReply.set(true); |
205 |
long time = System.nanoTime(); |
206 |
while (this.waitRestoreReply.get()) { |
207 |
// Restore timeout
|
208 |
if (TimeUnit.NANOSECONDS.toMillis(time) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) > 1000) { |
209 |
System.out.println("Restore timeout, missing chunks"); |
210 |
return;
|
211 |
} |
212 |
} |
213 |
|
214 |
} |
215 |
|
216 |
// Get simple list from recovered chunks
|
217 |
List<Chunk> list = new ArrayList<Chunk>(this.recoveredChunks.values()); |
218 |
|
219 |
// Sort chunks
|
220 |
Collections.sort(list, new Comparator<Chunk>() { |
221 |
@Override
|
222 |
public int compare(Chunk p1, Chunk p2) { |
223 |
return p1.chunkN - p2.chunkN; // Ascending |
224 |
} |
225 |
}); |
226 |
|
227 |
// Pack chunks
|
228 |
byte[][] allBytes = new byte[list.size()][]; |
229 |
for (int i = 0; i < list.size(); i++) { |
230 |
allBytes[i] = list.get(i).bytes; |
231 |
} |
232 |
|
233 |
// Merge chunks
|
234 |
byte[] fileBytes = FileManager.merge(allBytes); |
235 |
|
236 |
// Save file
|
237 |
Path current = Paths.get("");
|
238 |
String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\restored\\" + fileName; |
239 |
try (FileOutputStream stream = new FileOutputStream(place)) { |
240 |
stream.write(fileBytes); |
241 |
stream.close(); |
242 |
} |
243 |
|
244 |
} catch (Exception e) { |
245 |
e.printStackTrace(); |
246 |
} |
247 |
|
248 |
this.restoring = false; |
249 |
|
250 |
} |
251 |
|
252 |
// Delete chunks associated to this file
|
253 |
public void delete(String path) throws RemoteException { |
254 |
|
255 |
// File name
|
256 |
int index = path.lastIndexOf("\\"); |
257 |
String fileName = path.substring(index + 1); |
258 |
|
259 |
// Generate Hash from file name and date
|
260 |
String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified())); |
261 |
|
262 |
// Remove from list of backed up
|
263 |
String name = this.files.get(fileId); |
264 |
if (name != null) { |
265 |
this.files.remove(fileId);
|
266 |
} |
267 |
|
268 |
try {
|
269 |
|
270 |
// Prepare message
|
271 |
Message msg = new Message(Message.MessageType.DELETE, id, fileId, 0, 0, new byte[0]); |
272 |
|
273 |
// Pack in datagram
|
274 |
DatagramPacket packet = msg.packit(MC.address, MC.port);
|
275 |
|
276 |
// Send message
|
277 |
MC.socket.send(packet); |
278 |
|
279 |
} catch (Exception e) { |
280 |
e.printStackTrace(); |
281 |
} |
282 |
|
283 |
} |
284 |
|
285 |
public void reclaim(int memory) throws RemoteException { |
286 |
|
287 |
// Memory can't be lower than 0
|
288 |
if (memory < 0) { |
289 |
return;
|
290 |
} |
291 |
|
292 |
// Assign new limit
|
293 |
limit = memory; |
294 |
|
295 |
try {
|
296 |
|
297 |
// Check need to delete chunks
|
298 |
// Going to delete by order in the map
|
299 |
while (used > limit) {
|
300 |
|
301 |
// Pick an entry
|
302 |
Map.Entry<String, Chunk> entry = chunks.entrySet().iterator().next(); |
303 |
|
304 |
// Remove from used space
|
305 |
used -= entry.getValue().bytes.length; |
306 |
|
307 |
// Prepare message
|
308 |
Message msg = new Message(Message.MessageType.REMOVED, id, entry.getValue().fileId,
|
309 |
entry.getValue().chunkN, 0, new byte[0]); |
310 |
|
311 |
// Pack in datagram
|
312 |
DatagramPacket packet = msg.packit(MC.address, MC.port);
|
313 |
|
314 |
// Send message
|
315 |
MC.socket.send(packet); |
316 |
|
317 |
// Remove from map
|
318 |
chunks.remove(entry.getKey()); |
319 |
|
320 |
// Remove from file system
|
321 |
Path current = Paths.get("");
|
322 |
String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\backup\\" |
323 |
+ entry.getValue().fileId + "\\chk" + entry.getValue().chunkN + ".chk"; |
324 |
File toDelete = new File(place); |
325 |
toDelete.delete(); |
326 |
|
327 |
} |
328 |
|
329 |
} catch (Exception e) { |
330 |
e.printStackTrace(); |
331 |
} |
332 |
|
333 |
} |
334 |
|
335 |
public void state() throws RemoteException { |
336 |
|
337 |
} |
338 |
|
339 |
// Get information of previous peer life from file system
|
340 |
public void loadLocal() { |
341 |
|
342 |
// Base path
|
343 |
Path currentRelativePath = Paths.get("");
|
344 |
String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id; |
345 |
|
346 |
// Backup path
|
347 |
File file = new File(peerDir + "\\backup"); |
348 |
|
349 |
// FileIds backed up
|
350 |
File[] files = file.listFiles(File::isDirectory); |
351 |
|
352 |
// Peer didn't exist, just return
|
353 |
if (files == null) |
354 |
return;
|
355 |
|
356 |
// Each file directory
|
357 |
for (int i = 0; i < files.length; i++) { |
358 |
|
359 |
// FileId
|
360 |
String fileId = files[i].getName();
|
361 |
|
362 |
// Get Chunks inside
|
363 |
file = new File(peerDir + "\\backup" + "\\" + fileId); |
364 |
File[] chunks = file.listFiles(); |
365 |
|
366 |
// No chunks, just return
|
367 |
if (chunks == null) |
368 |
return;
|
369 |
|
370 |
// Each chunk
|
371 |
for (int j = 0; j < chunks.length; j++) { |
372 |
|
373 |
// Chunk name
|
374 |
String chunkNo = chunks[j].getName();
|
375 |
|
376 |
// Remove chk
|
377 |
chunkNo = chunkNo.replaceAll("\\.", ""); |
378 |
chunkNo = chunkNo.replaceAll("chk", ""); |
379 |
|
380 |
try {
|
381 |
|
382 |
// Read bytes
|
383 |
byte[] bytes = Files.readAllBytes(chunks[j].toPath()); |
384 |
|
385 |
// Updated used bytes
|
386 |
used += bytes.length; |
387 |
|
388 |
// Add to HashMap
|
389 |
this.chunks.put(fileId + "-" + chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), bytes, 0)); |
390 |
|
391 |
} catch (Exception e) { |
392 |
e.printStackTrace(); |
393 |
} |
394 |
|
395 |
} |
396 |
|
397 |
} |
398 |
|
399 |
} |
400 |
|
401 |
// Setup directory
|
402 |
public void setupDirectory() { |
403 |
|
404 |
// Base path
|
405 |
Path currentRelativePath = Paths.get("");
|
406 |
String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id; |
407 |
|
408 |
// Backup path
|
409 |
File file = new File(peerDir + "\\backup"); |
410 |
file.mkdirs(); |
411 |
|
412 |
// Backup path
|
413 |
file = new File(peerDir + "\\restored"); |
414 |
file.mkdirs(); |
415 |
|
416 |
} |
417 |
|
418 |
// Default Socket Setup
|
419 |
public void setupSockets() throws IOException { |
420 |
|
421 |
pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000); |
422 |
|
423 |
MC = new Socket(this, Socket.Type.MC); |
424 |
pool.execute(MC); |
425 |
|
426 |
MDB = new Socket(this, Socket.Type.MDB); |
427 |
pool.execute(MDB); |
428 |
|
429 |
MDR = new Socket(this, Socket.Type.MDR); |
430 |
pool.execute(MDR); |
431 |
|
432 |
} |
433 |
|
434 |
// Specifc Socket Setup
|
435 |
public void setupSockets(int portMC, String addressMC, int portMDB, String addressMDB, int portMDR, |
436 |
String addressMDR) throws IOException { |
437 |
|
438 |
pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000); |
439 |
|
440 |
MC = new Socket(this, Socket.Type.MC, portMC, addressMC); |
441 |
pool.execute(MC); |
442 |
|
443 |
MDB = new Socket(this, Socket.Type.MDB, portMDB, addressMDB); |
444 |
pool.execute(MDB); |
445 |
|
446 |
MDR = new Socket(this, Socket.Type.MDR, portMDR, addressMDR); |
447 |
pool.execute(MDR); |
448 |
|
449 |
} |
450 |
|
451 |
public static void main(String args[]) { |
452 |
|
453 |
try {
|
454 |
|
455 |
// Server object
|
456 |
Server obj; |
457 |
|
458 |
// Basic (version id)
|
459 |
if (args.length == 2) { |
460 |
obj = new Server(args[1]); |
461 |
} |
462 |
// Full (version id access port mc port mdb port mdr)
|
463 |
else if (args.length == 9) { |
464 |
obj = new Server(args[1], Integer.parseInt(args[3]), args[4], Integer.parseInt(args[5]), args[6], |
465 |
Integer.parseInt(args[7]), args[8]); |
466 |
} |
467 |
// Invalid arguments
|
468 |
else {
|
469 |
System.out.println("java Server <version> <server id>"); |
470 |
System.out.println(
|
471 |
"java Server <version> <server id> <access_point> <MC_port> <MC_IP_address> <MDB_port> <MDB_IP_address> <MDR_port> <MDR_IP_address>");
|
472 |
return;
|
473 |
} |
474 |
|
475 |
// Bind the server as a registry
|
476 |
PeerInterface stub = (PeerInterface) UnicastRemoteObject.exportObject(obj, 0); |
477 |
|
478 |
// Bind the remote object's stub in the registry
|
479 |
Registry registry = LocateRegistry.getRegistry(); |
480 |
registry.bind(obj.id, stub); |
481 |
|
482 |
System.err.println("Server ready"); |
483 |
|
484 |
} catch (Exception e) { |
485 |
|
486 |
System.err.println("Server exception: " + e.toString()); |
487 |
e.printStackTrace(); |
488 |
|
489 |
} |
490 |
} |
491 |
|
492 |
} |