sdis1819-t7g02 / service / Peer.java @ 2
History | View | Annotate | Download (11.5 KB)
1 | 1 | up20150366 | package service; |
---|---|---|---|
2 | |||
3 | import java.io.IOException; |
||
4 | import java.nio.file.Files; |
||
5 | import java.nio.file.Path; |
||
6 | import java.nio.file.Paths; |
||
7 | import java.rmi.registry.Registry; |
||
8 | import java.rmi.registry.LocateRegistry; |
||
9 | import java.rmi.server.UnicastRemoteObject; |
||
10 | import java.util.ArrayList; |
||
11 | import java.util.List; |
||
12 | import java.util.concurrent.Callable; |
||
13 | import java.util.concurrent.ExecutionException; |
||
14 | import java.util.concurrent.ExecutorService; |
||
15 | import java.util.concurrent.Executors; |
||
16 | import java.util.concurrent.Future; |
||
17 | import java.util.concurrent.ScheduledThreadPoolExecutor; |
||
18 | import java.util.concurrent.TimeUnit; |
||
19 | |||
20 | import channels.Message; |
||
21 | import channels.MessageHeader; |
||
22 | import protocols.BackupChunkProtocol; |
||
23 | import protocols.RestoreChunksProtocol; |
||
24 | |||
25 | public class Peer implements RMI { |
||
26 | |||
27 | ScheduledThreadPoolExecutor exec;
|
||
28 | private static Cloud cloud; |
||
29 | |||
30 | public Peer() {}
|
||
31 | |||
32 | public static void main(String[] args) { |
||
33 | |||
34 | try
|
||
35 | { |
||
36 | Peer peer = new Peer();
|
||
37 | RMI rmi = (RMI) UnicastRemoteObject.exportObject(peer, 0); |
||
38 | String acess_point = args[2]; |
||
39 | Registry registry = LocateRegistry.getRegistry(); |
||
40 | registry.bind(acess_point, rmi); |
||
41 | System.err.println("Peer ready"); |
||
42 | } |
||
43 | catch (Exception e) { |
||
44 | System.err.println("Peer exception: " + e.toString()); |
||
45 | return;
|
||
46 | } |
||
47 | |||
48 | if(args.length != 9) { |
||
49 | printHelp(); |
||
50 | return;
|
||
51 | } |
||
52 | |||
53 | try {
|
||
54 | Cloud aux = new Cloud(args[0], args[1]); |
||
55 | cloud = aux.getPeerTools(args[0], args[1]); |
||
56 | cloud.createControlRoom(args[3], args[4]); |
||
57 | cloud.createBackupChannel(args[5], args[6]); |
||
58 | cloud.createRestoreChannel(args[7], args[8]); |
||
59 | |||
60 | String path = Constants.DATABASE_DIR;
|
||
61 | Path p = Paths.get(path); |
||
62 | if (Files.exists(p))
|
||
63 | { |
||
64 | try {
|
||
65 | cloud.loadFromDisk(); |
||
66 | } catch (Exception e) { |
||
67 | e.printStackTrace(); |
||
68 | } |
||
69 | |||
70 | } |
||
71 | |||
72 | // System.out.println(cloud.getStoredChunks().size());
|
||
73 | |||
74 | } catch (NumberFormatException | IOException e) { |
||
75 | System.out.println(e.getMessage());
|
||
76 | return;
|
||
77 | } |
||
78 | |||
79 | if(Cloud.checkOrCreateDirectory(Constants.CHUNKS_DIR + args[0] + "/") == -1) { |
||
80 | System.out.println("Couldn't create peer chunks dir!"); |
||
81 | } |
||
82 | |||
83 | cloud.activateChannels(); |
||
84 | System.out.println("Channels activated"); |
||
85 | } |
||
86 | |||
87 | // the name of each multicast channel consists of the ip multicast address and
|
||
88 | // port, passed as cmd line arguments, followed by protocol version, server id
|
||
89 | // and service access point
|
||
90 | private static void printHelp() { |
||
91 | System.out.println("java service.Peer SERVER_id VERSION ACCESS_POINT MC_ip MC_port MDB_ip MDB_port MDR_ip MDR_port"); |
||
92 | } |
||
93 | |||
94 | public void saveState() { |
||
95 | try {
|
||
96 | cloud.saveToDisk(); |
||
97 | } catch (IOException e) { |
||
98 | e.printStackTrace(); |
||
99 | } |
||
100 | } |
||
101 | |||
102 | |||
103 | public synchronized String backup(String file_path, int replication_degree) |
||
104 | { |
||
105 | FileChunker fc; |
||
106 | Message m; |
||
107 | MessageHeader h; |
||
108 | int numberOfChunks;
|
||
109 | ExecutorService WORKER_THREAD_POOL;
|
||
110 | List<Callable<Object>> callables = new ArrayList<Callable<Object>>(); |
||
111 | List<Future<Object>> futures = null; |
||
112 | ArrayList<Chunk> chunkHolder = null; |
||
113 | String[] headerElements = new String[Constants.PUTCHUNK_N_ARGS]; |
||
114 | Chunk aux; |
||
115 | int numberOfSteps;
|
||
116 | |||
117 | System.out.println("BACKUP request received"); |
||
118 | |||
119 | try {
|
||
120 | fc = new FileChunker(file_path, replication_degree);
|
||
121 | // cloud.createDirectoryFile(fc);
|
||
122 | } |
||
123 | catch (IOException e) { |
||
124 | System.out.println("ERROR chunking file -> " + e.getMessage()); |
||
125 | return "ERROR chunking file -> " + e.getMessage(); |
||
126 | } |
||
127 | |||
128 | chunkHolder = fc.getChunks(); |
||
129 | numberOfChunks = chunkHolder.size(); |
||
130 | |||
131 | cloud.registerNumberOfFileChunks(fc.getFileID(), numberOfChunks); |
||
132 | |||
133 | System.out.println("File splited into " + numberOfChunks + " chunks."); |
||
134 | |||
135 | headerElements[0] = Constants.PUTCHUNK;
|
||
136 | headerElements[1] = cloud.getProtocolVersion();
|
||
137 | headerElements[2] = Integer.toString(cloud.getID()); |
||
138 | |||
139 | numberOfSteps = numberOfChunks % Constants.MAX_NUMBER_OF_THREADS == 0 ? (int)(numberOfChunks / Constants.MAX_NUMBER_OF_THREADS):(int)(numberOfChunks / Constants.MAX_NUMBER_OF_THREADS) + 1; |
||
140 | |||
141 | for(int step = 0, first, last; step < numberOfSteps; step++, futures.clear(), callables.clear()) { |
||
142 | first = step * Constants.MAX_NUMBER_OF_THREADS; |
||
143 | last = numberOfChunks - first < Constants.MAX_NUMBER_OF_THREADS ? numberOfChunks : first + Constants.MAX_NUMBER_OF_THREADS; |
||
144 | |||
145 | System.out.println("Sending from " + first + " to " + last); |
||
146 | |||
147 | WORKER_THREAD_POOL = Executors.newFixedThreadPool(last-first);
|
||
148 | |||
149 | for(int chunkNo = first; chunkNo < last; chunkNo++) { |
||
150 | aux = chunkHolder.get(chunkNo); |
||
151 | headerElements[3] = fc.getFileID();
|
||
152 | headerElements[4] = aux.getChunkNumber();
|
||
153 | headerElements[5] = Integer.toString(replication_degree); |
||
154 | try {
|
||
155 | h = new MessageHeader(headerElements.clone());
|
||
156 | |||
157 | m = new Message(h, aux.getChunkContent());
|
||
158 | |||
159 | callables.add(new BackupChunkProtocol(cloud, aux, m ,replication_degree));
|
||
160 | } |
||
161 | catch(IllegalArgumentException e) { |
||
162 | System.out.println(e.getMessage());
|
||
163 | return "Backup failed preparing chunks"; |
||
164 | } |
||
165 | } |
||
166 | |||
167 | try {
|
||
168 | futures = WORKER_THREAD_POOL.invokeAll(callables); |
||
169 | |||
170 | WORKER_THREAD_POOL.shutdown(); |
||
171 | |||
172 | if(WORKER_THREAD_POOL.awaitTermination(35, TimeUnit.SECONDS)) { |
||
173 | for(Future<Object> p: futures) { |
||
174 | if(!(boolean)p.get()) { |
||
175 | System.out.println("One of the chunks could not be stored."); |
||
176 | |||
177 | delete(file_path); //order successfully stored chunks to be deleted
|
||
178 | |||
179 | return "Backup failed preparing chunks"; |
||
180 | } |
||
181 | } |
||
182 | } |
||
183 | } |
||
184 | catch (InterruptedException | ExecutionException e) { |
||
185 | return "Error in peer: " + e.getMessage(); |
||
186 | } |
||
187 | } |
||
188 | |||
189 | System.out.println("backup peer has " + cloud.getNumberOfStoredChunks()); |
||
190 | |||
191 | saveState(); |
||
192 | |||
193 | return "Backup successfull."; |
||
194 | } |
||
195 | |||
196 | public synchronized String restore(String file_path) |
||
197 | { |
||
198 | System.out.println("RESTORE request received"); |
||
199 | |||
200 | FileChunker fc; |
||
201 | List<Callable<Object>> callables = new ArrayList<Callable<Object>>(); |
||
202 | List<Future<Object>> futures = null; |
||
203 | int numberOfChunks;
|
||
204 | MessageHeader h; |
||
205 | int numberOfSteps;
|
||
206 | ExecutorService WORKER_THREAD_POOL;
|
||
207 | String[] headerElements = new String[Constants.GETCHUNK_N_ARGS]; |
||
208 | |||
209 | headerElements[0] = Constants.GETCHUNK;
|
||
210 | headerElements[1] = cloud.getProtocolVersion();
|
||
211 | headerElements[2] = Integer.toString(cloud.getID()); |
||
212 | |||
213 | try
|
||
214 | { |
||
215 | fc = new FileChunker(file_path);
|
||
216 | } |
||
217 | catch (IOException e) |
||
218 | { |
||
219 | System.out.println("ERROR restoring file -> " + e.getMessage()); |
||
220 | return "ERROR restoring file -> " + e.getMessage(); |
||
221 | } |
||
222 | |||
223 | headerElements[3] = fc.getFileID();
|
||
224 | |||
225 | String file_id = fc.getFileID();
|
||
226 | |||
227 | numberOfChunks = cloud.getNumberOfChunks(file_id); |
||
228 | |||
229 | if(numberOfChunks < 0) { |
||
230 | return file_path + " wasn't backep up by this peer."; |
||
231 | } |
||
232 | |||
233 | numberOfSteps = numberOfChunks % Constants.MAX_NUMBER_OF_THREADS == 0 ? (int)(numberOfChunks / Constants.MAX_NUMBER_OF_THREADS):(int)(numberOfChunks / Constants.MAX_NUMBER_OF_THREADS) + 1; |
||
234 | |||
235 | for(int step = 0, first, last; step < numberOfSteps; step++, futures.clear(), callables.clear()) { |
||
236 | first = step * Constants.MAX_NUMBER_OF_THREADS; |
||
237 | last = numberOfChunks - first < Constants.MAX_NUMBER_OF_THREADS ? numberOfChunks : first + Constants.MAX_NUMBER_OF_THREADS; |
||
238 | |||
239 | System.out.println("Sending from " + first + " to " + last); |
||
240 | |||
241 | WORKER_THREAD_POOL = Executors.newFixedThreadPool(last-first);
|
||
242 | |||
243 | for(int chunkNo = first; chunkNo < last; chunkNo++) |
||
244 | { |
||
245 | headerElements[4] = Integer.toString(chunkNo); |
||
246 | |||
247 | try
|
||
248 | { |
||
249 | h = new MessageHeader(headerElements.clone());
|
||
250 | |||
251 | cloud.registerFileChunkToRestore(file_id, headerElements[4]);
|
||
252 | |||
253 | callables.add(new RestoreChunksProtocol(cloud, h));
|
||
254 | |||
255 | } |
||
256 | catch(IllegalArgumentException e) { |
||
257 | System.out.println(e.getMessage());
|
||
258 | return "Restore failed preparing chunks"; |
||
259 | } |
||
260 | } |
||
261 | |||
262 | try {
|
||
263 | futures = WORKER_THREAD_POOL.invokeAll(callables); |
||
264 | |||
265 | WORKER_THREAD_POOL.shutdown(); |
||
266 | |||
267 | if(WORKER_THREAD_POOL.awaitTermination(35, TimeUnit.SECONDS)) { |
||
268 | for(Future<Object> p: futures) { |
||
269 | if(!(boolean)p.get()) { |
||
270 | System.out.println("One of the chunks could not be restored."); |
||
271 | |||
272 | return "Restore failed preparing chunks"; |
||
273 | } |
||
274 | } |
||
275 | } |
||
276 | } |
||
277 | catch (InterruptedException | ExecutionException e) { |
||
278 | return "Error in peer: " + e.getMessage(); |
||
279 | } |
||
280 | |||
281 | } |
||
282 | |||
283 | System.out.println("Colected all file parts. Building file."); |
||
284 | |||
285 | try {
|
||
286 | cloud.buildFile(file_id, numberOfChunks, Paths.get(file_path).getFileName().toString()); |
||
287 | } |
||
288 | catch (IOException e) { |
||
289 | System.out.println("ERROR builidng file -> " + e.getMessage()); |
||
290 | } |
||
291 | |||
292 | saveState(); |
||
293 | |||
294 | return "Restored file successfully."; |
||
295 | } |
||
296 | |||
297 | public synchronized String delete(String file_path) |
||
298 | { |
||
299 | FileChunker fc; |
||
300 | MessageHeader h; |
||
301 | String[] headerElements = new String[Constants.DELETE_N_ARGS]; |
||
302 | |||
303 | System.out.println("DELETE request."); |
||
304 | |||
305 | try {
|
||
306 | fc = new FileChunker(file_path);
|
||
307 | headerElements[0] = Constants.DELETE;
|
||
308 | headerElements[1] = cloud.getProtocolVersion();
|
||
309 | headerElements[2] = Integer.toString(cloud.getID()); |
||
310 | headerElements[3] = fc.getFileID();
|
||
311 | h = new MessageHeader(headerElements.clone());
|
||
312 | if(cloud.checkIfPeerStoredFile(headerElements[3])) { |
||
313 | for(int i = 0; i < 3; i++) { |
||
314 | cloud.controlRoom.sendHeader(h); |
||
315 | Thread.sleep(1000); |
||
316 | } |
||
317 | } |
||
318 | else {
|
||
319 | return "File was not uploaded here. Delete failed."; |
||
320 | } |
||
321 | |||
322 | } |
||
323 | catch(Exception e) { |
||
324 | System.out.println("ERROR deleting file -> " + e.getMessage()); |
||
325 | return "ERROR deleting file -> " + e.getMessage(); |
||
326 | } |
||
327 | |||
328 | saveState(); |
||
329 | |||
330 | return "Delete successfull."; |
||
331 | } |
||
332 | |||
333 | public synchronized String reclaim(long disk_space) |
||
334 | { |
||
335 | if(disk_space < 0) { |
||
336 | return "Space_disk can't be lower than 0!"; |
||
337 | } |
||
338 | |||
339 | System.out.println("Space reclaim request to " + disk_space + " bytes."); |
||
340 | |||
341 | if(disk_space >= cloud.getChunkSpace()) {
|
||
342 | cloud.setAvailableSpace(disk_space); |
||
343 | } |
||
344 | else {
|
||
345 | cloud.freeUpSpace(disk_space); |
||
346 | } |
||
347 | |||
348 | saveState(); |
||
349 | |||
350 | return "Space reclaim successfull"; |
||
351 | } |
||
352 | |||
353 | public synchronized String state() |
||
354 | { |
||
355 | System.out.println("STATE OPERATION RECEIVED"); |
||
356 | |||
357 | String answer = "\n\nPeer " + cloud.getID() + " state: \n"; |
||
358 | |||
359 | answer += "\tBacked-up files:\n";
|
||
360 | |||
361 | String pathName;
|
||
362 | int repDegree, nChunks;
|
||
363 | Chunk c; |
||
364 | |||
365 | System.out.println("Number of files = " + cloud.getBackedUpFileNames().size()); |
||
366 | |||
367 | for(String key : cloud.getBackedUpFileNames()) { |
||
368 | |||
369 | pathName = cloud.getPathNameFromFileId(key); |
||
370 | repDegree = cloud.getBackedUpChunkNecessaryRepDegree(key); |
||
371 | |||
372 | if(pathName != null && repDegree > -1) { |
||
373 | answer += "\t\tFILE PATHNAME: " + pathName + " FILE ID: " + key + " DESIRED REPPLICATION DEGREE:" + repDegree + "\n"; |
||
374 | } |
||
375 | else {
|
||
376 | System.out.println("Error in key: " + key + " pathname " + pathName + " degree " + repDegree); |
||
377 | continue;
|
||
378 | } |
||
379 | |||
380 | nChunks = cloud.getNumberOfChunks(key); |
||
381 | |||
382 | if(nChunks < 0) { |
||
383 | System.out.println("Error getting number of chunks in key: " + key + " pathname " + pathName + " degree " + repDegree); |
||
384 | continue;
|
||
385 | } |
||
386 | |||
387 | answer += "\t\t\tChunks:\n";
|
||
388 | |||
389 | for(int i = 0; i < nChunks; i++) { |
||
390 | answer += "\t\t\t\tchk" + i + " REP: DEGREE " + cloud.getBackepUpChunkPerceivedRepDegree(key, i) + "\n"; |
||
391 | } |
||
392 | |||
393 | answer += "\n";
|
||
394 | } |
||
395 | |||
396 | answer += "\tStored chunks:\n";
|
||
397 | |||
398 | for(String key : cloud.getStoredChunks().keySet()) { |
||
399 | c = cloud.getStoredChunks().get(key); |
||
400 | |||
401 | if(c.getFilePathName() == null) { //only backep up files have a non-null file path name |
||
402 | answer += "\t\t" +c.getChunkId() + ":" + c.getChunkNumber() + " " + "SIZE: " + c.getChunkSize() / 1024 + " KBytes " + " PERCEIVED REP. DEGREE: " + c.getActualReplicationDegree() + "\n"; |
||
403 | } |
||
404 | } |
||
405 | |||
406 | answer += "\n\n\t Total peer disk space: " + cloud.getTotalSpace() / 1024 + " KBytes Space used by chunks : " + cloud.getChunkSpace() / 1024 + " KBytes Available space: " + cloud.getAvailableSpace() / 1024 + " KBytes.\n"; |
||
407 | |||
408 | return answer;
|
||
409 | } |
||
410 | } |