root / src / Server.java
History | View | Annotate | Download (14.8 KB)
1 | 3 | up20160792 | import java.rmi.registry.Registry; |
---|---|---|---|
2 | import java.rmi.registry.LocateRegistry; |
||
3 | import java.rmi.RemoteException; |
||
4 | import java.rmi.server.UnicastRemoteObject; |
||
5 | |||
6 | import java.util.Map; |
||
7 | |||
8 | import java.util.concurrent.ConcurrentHashMap; |
||
9 | import java.util.concurrent.ScheduledThreadPoolExecutor; |
||
10 | import java.util.concurrent.TimeUnit; |
||
11 | import java.util.concurrent.atomic.AtomicBoolean; |
||
12 | import java.util.concurrent.Executors; |
||
13 | import java.util.concurrent.CopyOnWriteArrayList;; |
||
14 | import java.util.ArrayList; |
||
15 | import java.util.List; |
||
16 | import java.util.Comparator; |
||
17 | import java.util.HashSet; |
||
18 | import java.util.Collections; |
||
19 | |||
20 | import java.io.*; |
||
21 | |||
22 | import java.net.DatagramPacket; |
||
23 | |||
24 | import java.nio.file.*; |
||
25 | |||
26 | //Server class essentially a Peer
|
||
27 | public class Server implements PeerInterface { |
||
28 | |||
29 | // Unique peer id
|
||
30 | public String id; |
||
31 | |||
32 | // Size controls
|
||
33 | public static final int DEFAULT_MAX_SIZE = 1000000000; |
||
34 | public int limit; |
||
35 | public int used; |
||
36 | |||
37 | // Thread Pool
|
||
38 | public static ScheduledThreadPoolExecutor pool; |
||
39 | |||
40 | // Sockets
|
||
41 | public static Socket MC; |
||
42 | public static Socket MDB; |
||
43 | public static Socket MDR; |
||
44 | |||
45 | // State Backup
|
||
46 | public boolean doingBackUp; |
||
47 | public String file; |
||
48 | public int replications; |
||
49 | |||
50 | // State Restore
|
||
51 | public boolean restoring; |
||
52 | public AtomicBoolean waitRestoreReply = new AtomicBoolean(false); |
||
53 | public ConcurrentHashMap<String, Chunk> recoveredChunks = new ConcurrentHashMap<String, Chunk>(); // ChunkNo -> |
||
54 | // Chunk
|
||
55 | |||
56 | // Service Management Information
|
||
57 | public ConcurrentHashMap<String, Chunk> chunks = new ConcurrentHashMap<String, Chunk>(); // fileId-ChunkNo |
||
58 | // -> Chunk
|
||
59 | public ConcurrentHashMap<String, String> files = new ConcurrentHashMap<String, String>(); // fileId -> // file name |
||
60 | |||
61 | // Empty Constructor
|
||
62 | public Server() {
|
||
63 | } |
||
64 | |||
65 | // Simplified Constructor(Default values)
|
||
66 | public Server(String id) throws IOException { |
||
67 | this.used = 0; |
||
68 | this.id = id;
|
||
69 | this.limit = DEFAULT_MAX_SIZE;
|
||
70 | this.setupSockets();
|
||
71 | this.loadLocal();
|
||
72 | this.setupDirectory();
|
||
73 | } |
||
74 | |||
75 | // Full constructor
|
||
76 | public Server(String id, int portMC, String addressMC, int portMDB, String addressMDB, int portMDR, |
||
77 | String addressMDR) throws IOException { |
||
78 | this.used = 0; |
||
79 | this.id = id;
|
||
80 | this.limit = DEFAULT_MAX_SIZE;
|
||
81 | this.setupSockets(portMC, addressMC, portMDB, addressMDB, portMDR, addressMDR);
|
||
82 | this.loadLocal();
|
||
83 | this.setupDirectory();
|
||
84 | } |
||
85 | |||
86 | // Backup Protocol
|
||
87 | public void backup(String path, int replications) throws RemoteException { |
||
88 | |||
89 | // Start backup
|
||
90 | this.doingBackUp = true; |
||
91 | |||
92 | // Extract file name
|
||
93 | String fileName = new File(path).getName(); |
||
94 | |||
95 | // Generate Hash from file name and date
|
||
96 | String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified())); |
||
97 | |||
98 | // Is file backed up already?
|
||
99 | if (this.files.get(fileId) != null) { |
||
100 | System.out.println("File already backed up"); |
||
101 | return;
|
||
102 | } |
||
103 | |||
104 | // Add to file list
|
||
105 | this.files.put(fileId, fileName);
|
||
106 | |||
107 | try {
|
||
108 | |||
109 | // Get file bytes
|
||
110 | byte[] bytes = FileManager.readFile(path); |
||
111 | |||
112 | // Get file in chunks
|
||
113 | byte[][] chunks = FileManager.split(bytes); |
||
114 | |||
115 | // Send each chunk
|
||
116 | for (int i = 0; i < chunks.length; i++) { |
||
117 | |||
118 | // Base receive time of 1 second
|
||
119 | int receiveTime = 1000; |
||
120 | |||
121 | // PUTCHUNK tries
|
||
122 | for (int j = 0; j < 5; j++) { |
||
123 | |||
124 | // Prepare message
|
||
125 | Message msg = new Message(Message.MessageType.PUTCHUNK, id, fileId, i, replications, chunks[i]);
|
||
126 | |||
127 | // Pack in datagram
|
||
128 | DatagramPacket packet = msg.packit(MDB.address, MDB.port);
|
||
129 | |||
130 | // Send message
|
||
131 | MDB.socket.send(packet); |
||
132 | |||
133 | // Wait for replies and check Replication level
|
||
134 | this.replications = 0; |
||
135 | this.file = fileId;
|
||
136 | Thread.sleep(receiveTime);
|
||
137 | if (this.replications >= replications) { |
||
138 | break;
|
||
139 | } |
||
140 | |||
141 | // Double receive time
|
||
142 | receiveTime *= 2;
|
||
143 | |||
144 | } |
||
145 | |||
146 | } |
||
147 | |||
148 | } catch (Exception e) { |
||
149 | e.printStackTrace(); |
||
150 | } |
||
151 | |||
152 | // Terminate backup
|
||
153 | this.doingBackUp = false; |
||
154 | |||
155 | } |
||
156 | |||
157 | // Restore Protocol
|
||
158 | public void restore(String path) throws RemoteException { |
||
159 | |||
160 | this.restoring = true; |
||
161 | |||
162 | // File name
|
||
163 | String fileName = new File(path).getName(); |
||
164 | |||
165 | // Generate Hash from file name and date
|
||
166 | String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified())); |
||
167 | |||
168 | // Find in list of backed up
|
||
169 | String name = this.files.get(fileId); |
||
170 | |||
171 | // File not backed up yet, just return
|
||
172 | if (name == null) { |
||
173 | System.out.println("File not backed up by this peer"); |
||
174 | return;
|
||
175 | } |
||
176 | |||
177 | try {
|
||
178 | |||
179 | // Get byte count (probably better than actually fetching the bytes array)
|
||
180 | File file = new File(path); |
||
181 | int bytes = (int) file.length(); |
||
182 | |||
183 | // How many chunks to expect
|
||
184 | int chunks = 1 + bytes / FileManager.chunkSize; |
||
185 | |||
186 | // Reset list of Chunks recovered
|
||
187 | this.recoveredChunks.clear();
|
||
188 | |||
189 | // Request chunks
|
||
190 | for (int i = 0; i < chunks; i++) { |
||
191 | |||
192 | // Prepare message
|
||
193 | Message msg = new Message(Message.MessageType.GETCHUNK, id, fileId, i, 0, new byte[0]); |
||
194 | |||
195 | // Pack in datagram
|
||
196 | DatagramPacket packet = msg.packit(MC.address, MC.port);
|
||
197 | |||
198 | // Send message
|
||
199 | MC.socket.send(packet); |
||
200 | |||
201 | // Wait for response
|
||
202 | this.waitRestoreReply.set(true); |
||
203 | long time = System.nanoTime(); |
||
204 | while (this.waitRestoreReply.get()) { |
||
205 | // Restore timeout
|
||
206 | if (TimeUnit.NANOSECONDS.toMillis(time) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) > 1000) { |
||
207 | System.out.println("Restore timeout, missing chunks"); |
||
208 | return;
|
||
209 | } |
||
210 | } |
||
211 | |||
212 | } |
||
213 | |||
214 | // Get simple list from recovered chunks
|
||
215 | List<Chunk> list = new ArrayList<Chunk>(this.recoveredChunks.values()); |
||
216 | |||
217 | // Sort chunks
|
||
218 | Collections.sort(list, new Comparator<Chunk>() { |
||
219 | @Override
|
||
220 | public int compare(Chunk p1, Chunk p2) { |
||
221 | return p1.chunkN - p2.chunkN; // Ascending |
||
222 | } |
||
223 | }); |
||
224 | |||
225 | // Pack chunks
|
||
226 | byte[][] allBytes = new byte[list.size()][]; |
||
227 | for (int i = 0; i < list.size(); i++) { |
||
228 | allBytes[i] = list.get(i).bytes; |
||
229 | } |
||
230 | |||
231 | // Merge chunks
|
||
232 | byte[] fileBytes = FileManager.merge(allBytes); |
||
233 | |||
234 | // Save file
|
||
235 | Path current = Paths.get("");
|
||
236 | String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\restored\\" + fileName; |
||
237 | try (FileOutputStream stream = new FileOutputStream(place)) { |
||
238 | stream.write(fileBytes); |
||
239 | stream.close(); |
||
240 | } |
||
241 | |||
242 | } catch (Exception e) { |
||
243 | e.printStackTrace(); |
||
244 | } |
||
245 | |||
246 | this.restoring = false; |
||
247 | |||
248 | } |
||
249 | |||
250 | // Delete chunks associated to this file
|
||
251 | public void delete(String path) throws RemoteException { |
||
252 | |||
253 | // File name
|
||
254 | String fileName = new File(path).getName(); |
||
255 | |||
256 | // Generate Hash from file name and date
|
||
257 | String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified())); |
||
258 | |||
259 | // Remove from list of backed up
|
||
260 | String name = this.files.get(fileId); |
||
261 | if (name != null) { |
||
262 | this.files.remove(fileId);
|
||
263 | } |
||
264 | |||
265 | try {
|
||
266 | |||
267 | // Prepare message
|
||
268 | Message msg = new Message(Message.MessageType.DELETE, id, fileId, 0, 0, new byte[0]); |
||
269 | |||
270 | // Pack in datagram
|
||
271 | DatagramPacket packet = msg.packit(MC.address, MC.port);
|
||
272 | |||
273 | // Send message
|
||
274 | MC.socket.send(packet); |
||
275 | |||
276 | } catch (Exception e) { |
||
277 | e.printStackTrace(); |
||
278 | } |
||
279 | |||
280 | } |
||
281 | |||
282 | public void reclaim(int memory) throws RemoteException { |
||
283 | |||
284 | // Memory can't be lower than 0
|
||
285 | if (memory < 0) { |
||
286 | return;
|
||
287 | } |
||
288 | |||
289 | // Assign new limit
|
||
290 | limit = memory; |
||
291 | |||
292 | try {
|
||
293 | |||
294 | // Check need to delete chunks
|
||
295 | // Going to delete by order in the map
|
||
296 | while (used > limit) {
|
||
297 | |||
298 | // Pick an entry
|
||
299 | Map.Entry<String, Chunk> entry = chunks.entrySet().iterator().next(); |
||
300 | |||
301 | // Remove from used space
|
||
302 | used -= entry.getValue().bytes.length; |
||
303 | |||
304 | // Prepare message
|
||
305 | Message msg = new Message(Message.MessageType.REMOVED, id, entry.getValue().fileId,
|
||
306 | entry.getValue().chunkN, 0, new byte[0]); |
||
307 | |||
308 | // Pack in datagram
|
||
309 | DatagramPacket packet = msg.packit(MC.address, MC.port);
|
||
310 | |||
311 | // Send message
|
||
312 | MC.socket.send(packet); |
||
313 | |||
314 | // Remove from map
|
||
315 | chunks.remove(entry.getKey()); |
||
316 | |||
317 | // Remove from file system
|
||
318 | Path current = Paths.get("");
|
||
319 | String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\backup\\" |
||
320 | + entry.getValue().fileId + "\\chk" + entry.getValue().chunkN + ".chk"; |
||
321 | File toDelete = new File(place); |
||
322 | toDelete.delete(); |
||
323 | |||
324 | } |
||
325 | |||
326 | } catch (Exception e) { |
||
327 | e.printStackTrace(); |
||
328 | } |
||
329 | |||
330 | } |
||
331 | |||
332 | public void state() throws RemoteException { |
||
333 | |||
334 | } |
||
335 | |||
336 | // Get information of previous peer life from file system
|
||
337 | public void loadLocal() { |
||
338 | |||
339 | // Base path
|
||
340 | Path currentRelativePath = Paths.get("");
|
||
341 | String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id; |
||
342 | |||
343 | // Backup path
|
||
344 | File file = new File(peerDir + "\\backup"); |
||
345 | |||
346 | // FileIds backed up
|
||
347 | File[] files = file.listFiles(File::isDirectory); |
||
348 | |||
349 | // Peer didn't exist, just return
|
||
350 | if (files == null) |
||
351 | return;
|
||
352 | |||
353 | // Each file directory
|
||
354 | for (int i = 0; i < files.length; i++) { |
||
355 | |||
356 | // FileId
|
||
357 | String fileId = files[i].getName();
|
||
358 | |||
359 | // Get Chunks inside
|
||
360 | file = new File(peerDir + "\\backup" + "\\" + fileId); |
||
361 | File[] chunks = file.listFiles(); |
||
362 | |||
363 | // No chunks, just return
|
||
364 | if (chunks == null) |
||
365 | return;
|
||
366 | |||
367 | // Each chunk
|
||
368 | for (int j = 0; j < chunks.length; j++) { |
||
369 | |||
370 | // Chunk name
|
||
371 | String chunkNo = chunks[j].getName();
|
||
372 | |||
373 | // Remove chk
|
||
374 | chunkNo = chunkNo.replaceAll("\\.", ""); |
||
375 | chunkNo = chunkNo.replaceAll("chk", ""); |
||
376 | |||
377 | try {
|
||
378 | |||
379 | // Read bytes
|
||
380 | byte[] bytes = Files.readAllBytes(chunks[j].toPath()); |
||
381 | |||
382 | // Updated used bytes
|
||
383 | used += bytes.length; |
||
384 | |||
385 | // Add to HashMap
|
||
386 | this.chunks.put(fileId + "-" + chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), bytes, 0)); |
||
387 | |||
388 | } catch (Exception e) { |
||
389 | e.printStackTrace(); |
||
390 | } |
||
391 | |||
392 | } |
||
393 | |||
394 | } |
||
395 | |||
396 | } |
||
397 | |||
398 | // Setup directory
|
||
399 | public void setupDirectory() { |
||
400 | |||
401 | // Base path
|
||
402 | Path currentRelativePath = Paths.get("");
|
||
403 | String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id; |
||
404 | |||
405 | // Backup path
|
||
406 | File file = new File(peerDir + "\\backup"); |
||
407 | file.mkdirs(); |
||
408 | |||
409 | // Backup path
|
||
410 | file = new File(peerDir + "\\restored"); |
||
411 | file.mkdirs(); |
||
412 | |||
413 | } |
||
414 | |||
415 | // Default Socket Setup
|
||
416 | public void setupSockets() throws IOException { |
||
417 | |||
418 | pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000); |
||
419 | |||
420 | MC = new Socket(this, Socket.Type.MC); |
||
421 | pool.execute(MC); |
||
422 | |||
423 | MDB = new Socket(this, Socket.Type.MDB); |
||
424 | pool.execute(MDB); |
||
425 | |||
426 | MDR = new Socket(this, Socket.Type.MDR); |
||
427 | pool.execute(MDR); |
||
428 | |||
429 | } |
||
430 | |||
431 | // Specifc Socket Setup
|
||
432 | public void setupSockets(int portMC, String addressMC, int portMDB, String addressMDB, int portMDR, |
||
433 | String addressMDR) throws IOException { |
||
434 | |||
435 | pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000); |
||
436 | |||
437 | MC = new Socket(this, Socket.Type.MC, portMC, addressMC); |
||
438 | pool.execute(MC); |
||
439 | |||
440 | MDB = new Socket(this, Socket.Type.MDB, portMDB, addressMDB); |
||
441 | pool.execute(MDB); |
||
442 | |||
443 | MDR = new Socket(this, Socket.Type.MDR, portMDR, addressMDR); |
||
444 | pool.execute(MDR); |
||
445 | |||
446 | } |
||
447 | |||
448 | public static void main(String args[]) { |
||
449 | |||
450 | try {
|
||
451 | |||
452 | // Server object
|
||
453 | Server obj; |
||
454 | |||
455 | // Basic (version id)
|
||
456 | if (args.length == 2) { |
||
457 | obj = new Server(args[1]); |
||
458 | } |
||
459 | // Full (version id access port mc port mdb port mdr)
|
||
460 | else if (args.length == 9) { |
||
461 | obj = new Server(args[1], Integer.parseInt(args[3]), args[4], Integer.parseInt(args[5]), args[6], |
||
462 | Integer.parseInt(args[7]), args[8]); |
||
463 | } |
||
464 | // Invalid arguments
|
||
465 | else {
|
||
466 | System.out.println("java Server <version> <server id>"); |
||
467 | System.out.println(
|
||
468 | "java Server <version> <server id> <access_point> <MC_port> <MC_IP_address> <MDB_port> <MDB_IP_address> <MDR_port> <MDR_IP_address>");
|
||
469 | return;
|
||
470 | } |
||
471 | |||
472 | // Bind the server as a registry
|
||
473 | PeerInterface stub = (PeerInterface) UnicastRemoteObject.exportObject(obj, 0); |
||
474 | |||
475 | // Bind the remote object's stub in the registry
|
||
476 | Registry registry = LocateRegistry.getRegistry(); |
||
477 | registry.bind(obj.id, stub); |
||
478 | |||
479 | System.err.println("Server ready"); |
||
480 | |||
481 | } catch (Exception e) { |
||
482 | |||
483 | System.err.println("Server exception: " + e.toString()); |
||
484 | e.printStackTrace(); |
||
485 | |||
486 | } |
||
487 | } |
||
488 | |||
489 | } |