root / Server.java @ 2
History | View | Annotate | Download (15 KB)
1 | 1 | up20160792 | import java.rmi.registry.Registry; |
---|---|---|---|
2 | import java.rmi.registry.LocateRegistry; |
||
3 | import java.rmi.RemoteException; |
||
4 | import java.rmi.server.UnicastRemoteObject; |
||
5 | |||
6 | import java.util.Map; |
||
7 | |||
8 | import java.util.concurrent.ConcurrentHashMap; |
||
9 | import java.util.concurrent.ScheduledThreadPoolExecutor; |
||
10 | import java.util.concurrent.TimeUnit; |
||
11 | import java.util.concurrent.atomic.AtomicBoolean; |
||
12 | import java.util.concurrent.Executors; |
||
13 | import java.util.concurrent.CopyOnWriteArrayList;; |
||
14 | import java.util.ArrayList; |
||
15 | import java.util.List; |
||
16 | import java.util.Comparator; |
||
17 | import java.util.HashSet; |
||
18 | import java.util.Collections; |
||
19 | |||
20 | import java.io.*; |
||
21 | |||
22 | import java.net.DatagramPacket; |
||
23 | |||
24 | import java.nio.file.*; |
||
25 | |||
26 | //Server class essentially a Peer
|
||
27 | public class Server implements PeerInterface { |
||
28 | |||
29 | // Unique peer id
|
||
30 | public String id; |
||
31 | |||
32 | // Size controls
|
||
33 | public static final int DEFAULT_MAX_SIZE = 1000000000; |
||
34 | public int limit; |
||
35 | public int used; |
||
36 | |||
37 | // Thread Pool
|
||
38 | public static ScheduledThreadPoolExecutor pool; |
||
39 | |||
40 | // Sockets
|
||
41 | public static Socket MC; |
||
42 | public static Socket MDB; |
||
43 | public static Socket MDR; |
||
44 | |||
45 | // State Backup
|
||
46 | public boolean doingBackUp; |
||
47 | public String file; |
||
48 | public int replications; |
||
49 | |||
50 | // State Restore
|
||
51 | public boolean restoring; |
||
52 | public AtomicBoolean waitRestoreReply = new AtomicBoolean(false); |
||
53 | 2 | up20160792 | public ConcurrentHashMap<String, Chunk> recoveredChunks = new ConcurrentHashMap<String, Chunk>(); // ChunkNo -> |
54 | // Chunk
|
||
55 | 1 | up20160792 | |
56 | // Service Management Information
|
||
57 | public ConcurrentHashMap<String, Chunk> chunks = new ConcurrentHashMap<String, Chunk>(); // fileId-ChunkNo |
||
58 | // -> Chunk
|
||
59 | public ConcurrentHashMap<String, String> files = new ConcurrentHashMap<String, String>(); // fileId -> // file name |
||
60 | |||
61 | // Empty Constructor
|
||
62 | public Server() {
|
||
63 | } |
||
64 | |||
65 | // Simplified Constructor(Default values)
|
||
66 | public Server(String id) throws IOException { |
||
67 | this.used = 0; |
||
68 | this.id = id;
|
||
69 | this.limit = DEFAULT_MAX_SIZE;
|
||
70 | this.setupSockets();
|
||
71 | this.loadLocal();
|
||
72 | this.setupDirectory();
|
||
73 | } |
||
74 | |||
75 | // Full constructor
|
||
76 | public Server(String id, int portMC, String addressMC, int portMDB, String addressMDB, int portMDR, |
||
77 | String addressMDR) throws IOException { |
||
78 | this.used = 0; |
||
79 | this.id = id;
|
||
80 | this.limit = DEFAULT_MAX_SIZE;
|
||
81 | this.setupSockets(portMC, addressMC, portMDB, addressMDB, portMDR, addressMDR);
|
||
82 | this.loadLocal();
|
||
83 | this.setupDirectory();
|
||
84 | } |
||
85 | |||
86 | // Backup Protocol
|
||
87 | public void backup(String path, int replications) throws RemoteException { |
||
88 | |||
89 | // Start backup
|
||
90 | this.doingBackUp = true; |
||
91 | |||
92 | // Extract file name
|
||
93 | int index = path.lastIndexOf("\\"); |
||
94 | String fileName = path.substring(index + 1); |
||
95 | |||
96 | 2 | up20160792 | // Generate Hash from file name and date
|
97 | String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified())); |
||
98 | 1 | up20160792 | |
99 | 2 | up20160792 | // Is file backed up already?
|
100 | if (this.files.get(fileId) != null) { |
||
101 | System.out.println("File already backed up"); |
||
102 | return;
|
||
103 | } |
||
104 | |||
105 | 1 | up20160792 | // Add to file list
|
106 | this.files.put(fileId, fileName);
|
||
107 | |||
108 | try {
|
||
109 | |||
110 | // Get file bytes
|
||
111 | byte[] bytes = FileManager.readFile(path); |
||
112 | |||
113 | // Get file in chunks
|
||
114 | byte[][] chunks = FileManager.split(bytes); |
||
115 | |||
116 | // Send each chunk
|
||
117 | for (int i = 0; i < chunks.length; i++) { |
||
118 | |||
119 | // Base receive time of 1 second
|
||
120 | int receiveTime = 1000; |
||
121 | |||
122 | // PUTCHUNK tries
|
||
123 | for (int j = 0; j < 5; j++) { |
||
124 | |||
125 | // Prepare message
|
||
126 | Message msg = new Message(Message.MessageType.PUTCHUNK, id, fileId, i, replications, chunks[i]);
|
||
127 | |||
128 | // Pack in datagram
|
||
129 | DatagramPacket packet = msg.packit(MDB.address, MDB.port);
|
||
130 | |||
131 | // Send message
|
||
132 | MDB.socket.send(packet); |
||
133 | |||
134 | // Wait for replies and check Replication level
|
||
135 | this.replications = 0; |
||
136 | this.file = fileId;
|
||
137 | Thread.sleep(receiveTime);
|
||
138 | if (this.replications >= replications) { |
||
139 | break;
|
||
140 | } |
||
141 | |||
142 | // Double receive time
|
||
143 | receiveTime *= 2;
|
||
144 | |||
145 | } |
||
146 | |||
147 | } |
||
148 | |||
149 | } catch (Exception e) { |
||
150 | e.printStackTrace(); |
||
151 | } |
||
152 | |||
153 | // Terminate backup
|
||
154 | this.doingBackUp = false; |
||
155 | |||
156 | } |
||
157 | |||
158 | // Restore Protocol
|
||
159 | public void restore(String path) throws RemoteException { |
||
160 | |||
161 | this.restoring = true; |
||
162 | |||
163 | // File name
|
||
164 | int index = path.lastIndexOf("\\"); |
||
165 | String fileName = path.substring(index + 1); |
||
166 | |||
167 | 2 | up20160792 | // Generate Hash from file name and date
|
168 | String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified())); |
||
169 | 1 | up20160792 | |
170 | // Find in list of backed up
|
||
171 | String name = this.files.get(fileId); |
||
172 | |||
173 | // File not backed up yet, just return
|
||
174 | if (name == null) { |
||
175 | 2 | up20160792 | System.out.println("File not backed up by this peer"); |
176 | 1 | up20160792 | return;
|
177 | } |
||
178 | |||
179 | try {
|
||
180 | |||
181 | // Get byte count (probably better than actually fetching the bytes array)
|
||
182 | File file = new File(path); |
||
183 | int bytes = (int) file.length(); |
||
184 | |||
185 | // How many chunks to expect
|
||
186 | int chunks = 1 + bytes / FileManager.chunkSize; |
||
187 | |||
188 | // Reset list of Chunks recovered
|
||
189 | this.recoveredChunks.clear();
|
||
190 | |||
191 | // Request chunks
|
||
192 | for (int i = 0; i < chunks; i++) { |
||
193 | |||
194 | // Prepare message
|
||
195 | Message msg = new Message(Message.MessageType.GETCHUNK, id, fileId, i, 0, new byte[0]); |
||
196 | |||
197 | // Pack in datagram
|
||
198 | DatagramPacket packet = msg.packit(MC.address, MC.port);
|
||
199 | |||
200 | // Send message
|
||
201 | MC.socket.send(packet); |
||
202 | |||
203 | // Wait for response
|
||
204 | this.waitRestoreReply.set(true); |
||
205 | 2 | up20160792 | long time = System.nanoTime(); |
206 | 1 | up20160792 | while (this.waitRestoreReply.get()) { |
207 | 2 | up20160792 | // Restore timeout
|
208 | if (TimeUnit.NANOSECONDS.toMillis(time) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) > 1000) { |
||
209 | System.out.println("Restore timeout, missing chunks"); |
||
210 | return;
|
||
211 | } |
||
212 | 1 | up20160792 | } |
213 | |||
214 | 2 | up20160792 | } |
215 | 1 | up20160792 | |
216 | 2 | up20160792 | // Get simple list from recovered chunks
|
217 | 1 | up20160792 | List<Chunk> list = new ArrayList<Chunk>(this.recoveredChunks.values()); |
218 | |||
219 | // Sort chunks
|
||
220 | Collections.sort(list, new Comparator<Chunk>() { |
||
221 | @Override
|
||
222 | public int compare(Chunk p1, Chunk p2) { |
||
223 | return p1.chunkN - p2.chunkN; // Ascending |
||
224 | } |
||
225 | }); |
||
226 | |||
227 | // Pack chunks
|
||
228 | byte[][] allBytes = new byte[list.size()][]; |
||
229 | for (int i = 0; i < list.size(); i++) { |
||
230 | allBytes[i] = list.get(i).bytes; |
||
231 | } |
||
232 | |||
233 | // Merge chunks
|
||
234 | byte[] fileBytes = FileManager.merge(allBytes); |
||
235 | |||
236 | // Save file
|
||
237 | Path current = Paths.get("");
|
||
238 | String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\restored\\" + fileName; |
||
239 | try (FileOutputStream stream = new FileOutputStream(place)) { |
||
240 | stream.write(fileBytes); |
||
241 | stream.close(); |
||
242 | } |
||
243 | |||
244 | } catch (Exception e) { |
||
245 | e.printStackTrace(); |
||
246 | } |
||
247 | |||
248 | this.restoring = false; |
||
249 | |||
250 | } |
||
251 | |||
252 | // Delete chunks associated to this file
|
||
253 | public void delete(String path) throws RemoteException { |
||
254 | |||
255 | // File name
|
||
256 | int index = path.lastIndexOf("\\"); |
||
257 | String fileName = path.substring(index + 1); |
||
258 | |||
259 | 2 | up20160792 | // Generate Hash from file name and date
|
260 | String fileId = FileManager.hash(fileName + Long.toString(new File(path).lastModified())); |
||
261 | 1 | up20160792 | |
262 | 2 | up20160792 | // Remove from list of backed up
|
263 | String name = this.files.get(fileId); |
||
264 | if (name != null) { |
||
265 | this.files.remove(fileId);
|
||
266 | } |
||
267 | |||
268 | 1 | up20160792 | try {
|
269 | |||
270 | // Prepare message
|
||
271 | Message msg = new Message(Message.MessageType.DELETE, id, fileId, 0, 0, new byte[0]); |
||
272 | |||
273 | // Pack in datagram
|
||
274 | DatagramPacket packet = msg.packit(MC.address, MC.port);
|
||
275 | |||
276 | // Send message
|
||
277 | MC.socket.send(packet); |
||
278 | |||
279 | } catch (Exception e) { |
||
280 | e.printStackTrace(); |
||
281 | } |
||
282 | |||
283 | } |
||
284 | |||
285 | public void reclaim(int memory) throws RemoteException { |
||
286 | |||
287 | // Memory can't be lower than 0
|
||
288 | if (memory < 0) { |
||
289 | return;
|
||
290 | } |
||
291 | |||
292 | // Assign new limit
|
||
293 | limit = memory; |
||
294 | |||
295 | try {
|
||
296 | |||
297 | // Check need to delete chunks
|
||
298 | // Going to delete by order in the map
|
||
299 | while (used > limit) {
|
||
300 | |||
301 | // Pick an entry
|
||
302 | Map.Entry<String, Chunk> entry = chunks.entrySet().iterator().next(); |
||
303 | |||
304 | // Remove from used space
|
||
305 | used -= entry.getValue().bytes.length; |
||
306 | |||
307 | // Prepare message
|
||
308 | Message msg = new Message(Message.MessageType.REMOVED, id, entry.getValue().fileId,
|
||
309 | entry.getValue().chunkN, 0, new byte[0]); |
||
310 | |||
311 | // Pack in datagram
|
||
312 | DatagramPacket packet = msg.packit(MC.address, MC.port);
|
||
313 | |||
314 | // Send message
|
||
315 | MC.socket.send(packet); |
||
316 | |||
317 | // Remove from map
|
||
318 | chunks.remove(entry.getKey()); |
||
319 | |||
320 | // Remove from file system
|
||
321 | Path current = Paths.get("");
|
||
322 | 2 | up20160792 | String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\backup\\" |
323 | + entry.getValue().fileId + "\\chk" + entry.getValue().chunkN + ".chk"; |
||
324 | 1 | up20160792 | File toDelete = new File(place); |
325 | toDelete.delete(); |
||
326 | |||
327 | } |
||
328 | |||
329 | } catch (Exception e) { |
||
330 | e.printStackTrace(); |
||
331 | } |
||
332 | |||
333 | } |
||
334 | |||
335 | public void state() throws RemoteException { |
||
336 | |||
337 | } |
||
338 | |||
339 | // Get information of previous peer life from file system
|
||
340 | public void loadLocal() { |
||
341 | |||
342 | // Base path
|
||
343 | Path currentRelativePath = Paths.get("");
|
||
344 | String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id; |
||
345 | |||
346 | // Backup path
|
||
347 | File file = new File(peerDir + "\\backup"); |
||
348 | |||
349 | // FileIds backed up
|
||
350 | File[] files = file.listFiles(File::isDirectory); |
||
351 | |||
352 | // Peer didn't exist, just return
|
||
353 | if (files == null) |
||
354 | return;
|
||
355 | |||
356 | // Each file directory
|
||
357 | for (int i = 0; i < files.length; i++) { |
||
358 | |||
359 | // FileId
|
||
360 | String fileId = files[i].getName();
|
||
361 | |||
362 | // Get Chunks inside
|
||
363 | file = new File(peerDir + "\\backup" + "\\" + fileId); |
||
364 | File[] chunks = file.listFiles(); |
||
365 | |||
366 | // No chunks, just return
|
||
367 | if (chunks == null) |
||
368 | return;
|
||
369 | |||
370 | // Each chunk
|
||
371 | for (int j = 0; j < chunks.length; j++) { |
||
372 | |||
373 | // Chunk name
|
||
374 | String chunkNo = chunks[j].getName();
|
||
375 | |||
376 | // Remove chk
|
||
377 | chunkNo = chunkNo.replaceAll("\\.", ""); |
||
378 | chunkNo = chunkNo.replaceAll("chk", ""); |
||
379 | |||
380 | try {
|
||
381 | |||
382 | // Read bytes
|
||
383 | byte[] bytes = Files.readAllBytes(chunks[j].toPath()); |
||
384 | |||
385 | // Updated used bytes
|
||
386 | used += bytes.length; |
||
387 | |||
388 | // Add to HashMap
|
||
389 | this.chunks.put(fileId + "-" + chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), bytes, 0)); |
||
390 | |||
391 | } catch (Exception e) { |
||
392 | e.printStackTrace(); |
||
393 | } |
||
394 | |||
395 | } |
||
396 | |||
397 | } |
||
398 | |||
399 | } |
||
400 | |||
401 | // Setup directory
|
||
402 | public void setupDirectory() { |
||
403 | |||
404 | // Base path
|
||
405 | Path currentRelativePath = Paths.get("");
|
||
406 | String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id; |
||
407 | |||
408 | // Backup path
|
||
409 | File file = new File(peerDir + "\\backup"); |
||
410 | file.mkdirs(); |
||
411 | |||
412 | // Backup path
|
||
413 | file = new File(peerDir + "\\restored"); |
||
414 | file.mkdirs(); |
||
415 | |||
416 | } |
||
417 | |||
418 | // Default Socket Setup
|
||
419 | public void setupSockets() throws IOException { |
||
420 | |||
421 | pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000); |
||
422 | |||
423 | MC = new Socket(this, Socket.Type.MC); |
||
424 | pool.execute(MC); |
||
425 | |||
426 | MDB = new Socket(this, Socket.Type.MDB); |
||
427 | pool.execute(MDB); |
||
428 | |||
429 | MDR = new Socket(this, Socket.Type.MDR); |
||
430 | pool.execute(MDR); |
||
431 | |||
432 | } |
||
433 | |||
434 | // Specifc Socket Setup
|
||
435 | public void setupSockets(int portMC, String addressMC, int portMDB, String addressMDB, int portMDR, |
||
436 | String addressMDR) throws IOException { |
||
437 | |||
438 | pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000); |
||
439 | |||
440 | MC = new Socket(this, Socket.Type.MC, portMC, addressMC); |
||
441 | pool.execute(MC); |
||
442 | |||
443 | MDB = new Socket(this, Socket.Type.MDB, portMDB, addressMDB); |
||
444 | pool.execute(MDB); |
||
445 | |||
446 | MDR = new Socket(this, Socket.Type.MDR, portMDR, addressMDR); |
||
447 | pool.execute(MDR); |
||
448 | |||
449 | } |
||
450 | |||
451 | public static void main(String args[]) { |
||
452 | |||
453 | 2 | up20160792 | try {
|
454 | 1 | up20160792 | |
455 | 2 | up20160792 | // Server object
|
456 | Server obj; |
||
457 | 1 | up20160792 | |
458 | 2 | up20160792 | // Basic (version id)
|
459 | if (args.length == 2) { |
||
460 | obj = new Server(args[1]); |
||
461 | } |
||
462 | // Full (version id access port mc port mdb port mdr)
|
||
463 | else if (args.length == 9) { |
||
464 | obj = new Server(args[1], Integer.parseInt(args[3]), args[4], Integer.parseInt(args[5]), args[6], |
||
465 | Integer.parseInt(args[7]), args[8]); |
||
466 | } |
||
467 | // Invalid arguments
|
||
468 | else {
|
||
469 | System.out.println("java Server <version> <server id>"); |
||
470 | System.out.println(
|
||
471 | "java Server <version> <server id> <access_point> <MC_port> <MC_IP_address> <MDB_port> <MDB_IP_address> <MDR_port> <MDR_IP_address>");
|
||
472 | return;
|
||
473 | } |
||
474 | 1 | up20160792 | |
475 | // Bind the server as a registry
|
||
476 | PeerInterface stub = (PeerInterface) UnicastRemoteObject.exportObject(obj, 0); |
||
477 | |||
478 | // Bind the remote object's stub in the registry
|
||
479 | Registry registry = LocateRegistry.getRegistry(); |
||
480 | 2 | up20160792 | registry.bind(obj.id, stub); |
481 | 1 | up20160792 | |
482 | System.err.println("Server ready"); |
||
483 | |||
484 | } catch (Exception e) { |
||
485 | |||
486 | System.err.println("Server exception: " + e.toString()); |
||
487 | e.printStackTrace(); |
||
488 | |||
489 | } |
||
490 | } |
||
491 | |||
492 | } |