root / Server.java @ 1
History | View | Annotate | Download (13.8 KB)
1 | 1 | up20160792 | import java.rmi.registry.Registry; |
---|---|---|---|
2 | import java.rmi.registry.LocateRegistry; |
||
3 | import java.rmi.RemoteException; |
||
4 | import java.rmi.server.UnicastRemoteObject; |
||
5 | |||
6 | import java.util.Map; |
||
7 | |||
8 | import java.util.concurrent.ConcurrentHashMap; |
||
9 | import java.util.concurrent.ScheduledThreadPoolExecutor; |
||
10 | import java.util.concurrent.TimeUnit; |
||
11 | import java.util.concurrent.atomic.AtomicBoolean; |
||
12 | import java.util.concurrent.Executors; |
||
13 | import java.util.concurrent.CopyOnWriteArrayList;; |
||
14 | import java.util.ArrayList; |
||
15 | import java.util.List; |
||
16 | import java.util.Comparator; |
||
17 | import java.util.HashSet; |
||
18 | import java.util.Collections; |
||
19 | |||
20 | import java.io.*; |
||
21 | |||
22 | import java.net.DatagramPacket; |
||
23 | |||
24 | import java.nio.file.*; |
||
25 | |||
26 | //Server class essentially a Peer
|
||
27 | public class Server implements PeerInterface { |
||
28 | |||
29 | // Unique peer id
|
||
30 | public String id; |
||
31 | |||
32 | // Size controls
|
||
33 | public static final int DEFAULT_MAX_SIZE = 1000000000; |
||
34 | public int limit; |
||
35 | public int used; |
||
36 | |||
37 | // Thread Pool
|
||
38 | public static ScheduledThreadPoolExecutor pool; |
||
39 | |||
40 | // Sockets
|
||
41 | public static Socket MC; |
||
42 | public static Socket MDB; |
||
43 | public static Socket MDR; |
||
44 | |||
45 | // State Backup
|
||
46 | public boolean doingBackUp; |
||
47 | public String file; |
||
48 | public int replications; |
||
49 | |||
50 | // State Restore
|
||
51 | public boolean restoring; |
||
52 | public AtomicBoolean waitRestoreReply = new AtomicBoolean(false); |
||
53 | public ConcurrentHashMap<String, Chunk> recoveredChunks = new ConcurrentHashMap<String, Chunk>(); // ChunkNo -> Chunk |
||
54 | |||
55 | // Service Management Information
|
||
56 | public ConcurrentHashMap<String, Chunk> chunks = new ConcurrentHashMap<String, Chunk>(); // fileId-ChunkNo |
||
57 | // -> Chunk
|
||
58 | public ConcurrentHashMap<String, String> files = new ConcurrentHashMap<String, String>(); // fileId -> // file name |
||
59 | |||
60 | // Empty Constructor
|
||
61 | public Server() {
|
||
62 | } |
||
63 | |||
64 | // Simplified Constructor(Default values)
|
||
65 | public Server(String id) throws IOException { |
||
66 | this.used = 0; |
||
67 | this.id = id;
|
||
68 | this.limit = DEFAULT_MAX_SIZE;
|
||
69 | this.setupSockets();
|
||
70 | this.loadLocal();
|
||
71 | this.setupDirectory();
|
||
72 | } |
||
73 | |||
74 | // Full constructor
|
||
75 | public Server(String id, int portMC, String addressMC, int portMDB, String addressMDB, int portMDR, |
||
76 | String addressMDR) throws IOException { |
||
77 | this.used = 0; |
||
78 | this.id = id;
|
||
79 | this.limit = DEFAULT_MAX_SIZE;
|
||
80 | this.setupSockets(portMC, addressMC, portMDB, addressMDB, portMDR, addressMDR);
|
||
81 | this.loadLocal();
|
||
82 | this.setupDirectory();
|
||
83 | } |
||
84 | |||
85 | // Backup Protocol
|
||
86 | public void backup(String path, int replications) throws RemoteException { |
||
87 | |||
88 | // TODO is file backed up already?
|
||
89 | |||
90 | // Start backup
|
||
91 | this.doingBackUp = true; |
||
92 | |||
93 | // Extract file name
|
||
94 | int index = path.lastIndexOf("\\"); |
||
95 | String fileName = path.substring(index + 1); |
||
96 | |||
97 | // Generate Hash
|
||
98 | // TODO dont use just file name for hashing(multiple file versions with same
|
||
99 | // name problem)
|
||
100 | String fileId = FileManager.hash(fileName);
|
||
101 | |||
102 | // Add to file list
|
||
103 | this.files.put(fileId, fileName);
|
||
104 | |||
105 | try {
|
||
106 | |||
107 | // Get file bytes
|
||
108 | byte[] bytes = FileManager.readFile(path); |
||
109 | |||
110 | // Get file in chunks
|
||
111 | byte[][] chunks = FileManager.split(bytes); |
||
112 | |||
113 | // Send each chunk
|
||
114 | for (int i = 0; i < chunks.length; i++) { |
||
115 | |||
116 | // Base receive time of 1 second
|
||
117 | int receiveTime = 1000; |
||
118 | |||
119 | // PUTCHUNK tries
|
||
120 | for (int j = 0; j < 5; j++) { |
||
121 | |||
122 | // Prepare message
|
||
123 | Message msg = new Message(Message.MessageType.PUTCHUNK, id, fileId, i, replications, chunks[i]);
|
||
124 | |||
125 | // Pack in datagram
|
||
126 | DatagramPacket packet = msg.packit(MDB.address, MDB.port);
|
||
127 | |||
128 | // Send message
|
||
129 | MDB.socket.send(packet); |
||
130 | |||
131 | // Wait for replies and check Replication level
|
||
132 | this.replications = 0; |
||
133 | this.file = fileId;
|
||
134 | Thread.sleep(receiveTime);
|
||
135 | if (this.replications >= replications) { |
||
136 | break;
|
||
137 | } |
||
138 | |||
139 | // Double receive time
|
||
140 | receiveTime *= 2;
|
||
141 | |||
142 | } |
||
143 | |||
144 | } |
||
145 | |||
146 | } catch (Exception e) { |
||
147 | e.printStackTrace(); |
||
148 | } |
||
149 | |||
150 | // Terminate backup
|
||
151 | this.doingBackUp = false; |
||
152 | |||
153 | } |
||
154 | |||
155 | // Restore Protocol
|
||
156 | public void restore(String path) throws RemoteException { |
||
157 | |||
158 | this.restoring = true; |
||
159 | |||
160 | // File name
|
||
161 | int index = path.lastIndexOf("\\"); |
||
162 | String fileName = path.substring(index + 1); |
||
163 | |||
164 | // Generate Hash
|
||
165 | // TODO dont use just file name for hashing(multiple file versions with same
|
||
166 | // name problem)
|
||
167 | String fileId = FileManager.hash(fileName);
|
||
168 | |||
169 | // Find in list of backed up
|
||
170 | String name = this.files.get(fileId); |
||
171 | |||
172 | // File not backed up yet, just return
|
||
173 | if (name == null) { |
||
174 | return;
|
||
175 | } |
||
176 | |||
177 | try {
|
||
178 | |||
179 | // Get byte count (probably better than actually fetching the bytes array)
|
||
180 | File file = new File(path); |
||
181 | int bytes = (int) file.length(); |
||
182 | |||
183 | // How many chunks to expect
|
||
184 | int chunks = 1 + bytes / FileManager.chunkSize; |
||
185 | |||
186 | // Reset list of Chunks recovered
|
||
187 | this.recoveredChunks.clear();
|
||
188 | |||
189 | // Request chunks
|
||
190 | for (int i = 0; i < chunks; i++) { |
||
191 | |||
192 | // Prepare message
|
||
193 | Message msg = new Message(Message.MessageType.GETCHUNK, id, fileId, i, 0, new byte[0]); |
||
194 | |||
195 | // Pack in datagram
|
||
196 | DatagramPacket packet = msg.packit(MC.address, MC.port);
|
||
197 | |||
198 | // Send message
|
||
199 | MC.socket.send(packet); |
||
200 | |||
201 | // Wait for response
|
||
202 | this.waitRestoreReply.set(true); |
||
203 | while (this.waitRestoreReply.get()) { |
||
204 | } |
||
205 | |||
206 | } |
||
207 | |||
208 | //Get simple list from recovered chunks
|
||
209 | List<Chunk> list = new ArrayList<Chunk>(this.recoveredChunks.values()); |
||
210 | |||
211 | // Sort chunks
|
||
212 | Collections.sort(list, new Comparator<Chunk>() { |
||
213 | @Override
|
||
214 | public int compare(Chunk p1, Chunk p2) { |
||
215 | return p1.chunkN - p2.chunkN; // Ascending |
||
216 | } |
||
217 | }); |
||
218 | |||
219 | // Pack chunks
|
||
220 | byte[][] allBytes = new byte[list.size()][]; |
||
221 | for (int i = 0; i < list.size(); i++) { |
||
222 | allBytes[i] = list.get(i).bytes; |
||
223 | } |
||
224 | |||
225 | // Merge chunks
|
||
226 | byte[] fileBytes = FileManager.merge(allBytes); |
||
227 | |||
228 | // Save file
|
||
229 | Path current = Paths.get("");
|
||
230 | String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\restored\\" + fileName; |
||
231 | try (FileOutputStream stream = new FileOutputStream(place)) { |
||
232 | stream.write(fileBytes); |
||
233 | stream.close(); |
||
234 | } |
||
235 | |||
236 | } catch (Exception e) { |
||
237 | e.printStackTrace(); |
||
238 | } |
||
239 | |||
240 | this.restoring = false; |
||
241 | |||
242 | } |
||
243 | |||
244 | // Delete chunks associated to this file
|
||
245 | public void delete(String path) throws RemoteException { |
||
246 | |||
247 | // File name
|
||
248 | int index = path.lastIndexOf("\\"); |
||
249 | String fileName = path.substring(index + 1); |
||
250 | |||
251 | // Generate Hash
|
||
252 | // TODO dont use just file name for hashing(multiple file versions with same
|
||
253 | // name problem)
|
||
254 | String fileId = FileManager.hash(fileName);
|
||
255 | |||
256 | try {
|
||
257 | |||
258 | // Prepare message
|
||
259 | Message msg = new Message(Message.MessageType.DELETE, id, fileId, 0, 0, new byte[0]); |
||
260 | |||
261 | // Pack in datagram
|
||
262 | DatagramPacket packet = msg.packit(MC.address, MC.port);
|
||
263 | |||
264 | // Send message
|
||
265 | MC.socket.send(packet); |
||
266 | |||
267 | } catch (Exception e) { |
||
268 | e.printStackTrace(); |
||
269 | } |
||
270 | |||
271 | } |
||
272 | |||
273 | public void reclaim(int memory) throws RemoteException { |
||
274 | |||
275 | // Memory can't be lower than 0
|
||
276 | if (memory < 0) { |
||
277 | return;
|
||
278 | } |
||
279 | |||
280 | // Assign new limit
|
||
281 | limit = memory; |
||
282 | |||
283 | try {
|
||
284 | |||
285 | // Check need to delete chunks
|
||
286 | // Going to delete by order in the map
|
||
287 | while (used > limit) {
|
||
288 | |||
289 | // Pick an entry
|
||
290 | Map.Entry<String, Chunk> entry = chunks.entrySet().iterator().next(); |
||
291 | |||
292 | // Remove from used space
|
||
293 | used -= entry.getValue().bytes.length; |
||
294 | |||
295 | // Prepare message
|
||
296 | Message msg = new Message(Message.MessageType.REMOVED, id, entry.getValue().fileId,
|
||
297 | entry.getValue().chunkN, 0, new byte[0]); |
||
298 | |||
299 | // Pack in datagram
|
||
300 | DatagramPacket packet = msg.packit(MC.address, MC.port);
|
||
301 | |||
302 | // Send message
|
||
303 | MC.socket.send(packet); |
||
304 | |||
305 | // Remove from map
|
||
306 | chunks.remove(entry.getKey()); |
||
307 | |||
308 | // Remove from file system
|
||
309 | Path current = Paths.get("");
|
||
310 | String place = current.toAbsolutePath().toString() + "\\peer" + id + "\\backup\\" + entry.getValue().fileId + "\\chk" + entry.getValue().chunkN + ".chk"; |
||
311 | File toDelete = new File(place); |
||
312 | toDelete.delete(); |
||
313 | |||
314 | } |
||
315 | |||
316 | } catch (Exception e) { |
||
317 | e.printStackTrace(); |
||
318 | } |
||
319 | |||
320 | } |
||
321 | |||
322 | public void state() throws RemoteException { |
||
323 | |||
324 | } |
||
325 | |||
326 | // Get information of previous peer life from file system
|
||
327 | public void loadLocal() { |
||
328 | |||
329 | // Base path
|
||
330 | Path currentRelativePath = Paths.get("");
|
||
331 | String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id; |
||
332 | |||
333 | // Backup path
|
||
334 | File file = new File(peerDir + "\\backup"); |
||
335 | |||
336 | // FileIds backed up
|
||
337 | File[] files = file.listFiles(File::isDirectory); |
||
338 | |||
339 | // Peer didn't exist, just return
|
||
340 | if (files == null) |
||
341 | return;
|
||
342 | |||
343 | // Each file directory
|
||
344 | for (int i = 0; i < files.length; i++) { |
||
345 | |||
346 | // FileId
|
||
347 | String fileId = files[i].getName();
|
||
348 | |||
349 | // Get Chunks inside
|
||
350 | file = new File(peerDir + "\\backup" + "\\" + fileId); |
||
351 | File[] chunks = file.listFiles(); |
||
352 | |||
353 | // No chunks, just return
|
||
354 | if (chunks == null) |
||
355 | return;
|
||
356 | |||
357 | // Each chunk
|
||
358 | for (int j = 0; j < chunks.length; j++) { |
||
359 | |||
360 | // Chunk name
|
||
361 | String chunkNo = chunks[j].getName();
|
||
362 | |||
363 | // Remove chk
|
||
364 | chunkNo = chunkNo.replaceAll("\\.", ""); |
||
365 | chunkNo = chunkNo.replaceAll("chk", ""); |
||
366 | |||
367 | try {
|
||
368 | |||
369 | // Read bytes
|
||
370 | byte[] bytes = Files.readAllBytes(chunks[j].toPath()); |
||
371 | |||
372 | // Updated used bytes
|
||
373 | used += bytes.length; |
||
374 | |||
375 | // Add to HashMap
|
||
376 | this.chunks.put(fileId + "-" + chunkNo, new Chunk(fileId, Integer.parseInt(chunkNo), bytes, 0)); |
||
377 | |||
378 | } catch (Exception e) { |
||
379 | e.printStackTrace(); |
||
380 | } |
||
381 | |||
382 | } |
||
383 | |||
384 | } |
||
385 | |||
386 | } |
||
387 | |||
388 | // Setup directory
|
||
389 | public void setupDirectory() { |
||
390 | |||
391 | // Base path
|
||
392 | Path currentRelativePath = Paths.get("");
|
||
393 | String peerDir = currentRelativePath.toAbsolutePath().toString() + "\\peer" + id; |
||
394 | |||
395 | // Backup path
|
||
396 | File file = new File(peerDir + "\\backup"); |
||
397 | file.mkdirs(); |
||
398 | |||
399 | // Backup path
|
||
400 | file = new File(peerDir + "\\restored"); |
||
401 | file.mkdirs(); |
||
402 | |||
403 | } |
||
404 | |||
405 | // Default Socket Setup
|
||
406 | public void setupSockets() throws IOException { |
||
407 | |||
408 | pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000); |
||
409 | |||
410 | MC = new Socket(this, Socket.Type.MC); |
||
411 | pool.execute(MC); |
||
412 | |||
413 | MDB = new Socket(this, Socket.Type.MDB); |
||
414 | pool.execute(MDB); |
||
415 | |||
416 | MDR = new Socket(this, Socket.Type.MDR); |
||
417 | pool.execute(MDR); |
||
418 | |||
419 | } |
||
420 | |||
421 | // Specifc Socket Setup
|
||
422 | public void setupSockets(int portMC, String addressMC, int portMDB, String addressMDB, int portMDR, |
||
423 | String addressMDR) throws IOException { |
||
424 | |||
425 | pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10000); |
||
426 | |||
427 | MC = new Socket(this, Socket.Type.MC, portMC, addressMC); |
||
428 | pool.execute(MC); |
||
429 | |||
430 | MDB = new Socket(this, Socket.Type.MDB, portMDB, addressMDB); |
||
431 | pool.execute(MDB); |
||
432 | |||
433 | MDR = new Socket(this, Socket.Type.MDR, portMDR, addressMDR); |
||
434 | pool.execute(MDR); |
||
435 | |||
436 | } |
||
437 | |||
438 | public static void main(String args[]) { |
||
439 | |||
440 | // TODO Arguments are not correct, need to receive all the specific socket
|
||
441 | // arguments too, version and server(RMI)
|
||
442 | |||
443 | // Check arguments
|
||
444 | if (args.length < 1) { |
||
445 | System.err.println("Invalid number of arguments"); |
||
446 | return;
|
||
447 | } |
||
448 | |||
449 | try {
|
||
450 | |||
451 | // Create Server
|
||
452 | Server obj = new Server(args[0]); |
||
453 | |||
454 | // Bind the server as a registry
|
||
455 | PeerInterface stub = (PeerInterface) UnicastRemoteObject.exportObject(obj, 0); |
||
456 | |||
457 | // Bind the remote object's stub in the registry
|
||
458 | |||
459 | // TODO Get registry what port?
|
||
460 | Registry registry = LocateRegistry.getRegistry(); |
||
461 | registry.bind(args[0], stub);
|
||
462 | |||
463 | System.err.println("Server ready"); |
||
464 | |||
465 | } catch (Exception e) { |
||
466 | |||
467 | System.err.println("Server exception: " + e.toString()); |
||
468 | e.printStackTrace(); |
||
469 | |||
470 | } |
||
471 | } |
||
472 | |||
473 | } |