Revision 1
added files
service/.project | ||
---|---|---|
1 |
<?xml version="1.0" encoding="UTF-8"?> |
|
2 |
<projectDescription> |
|
3 |
<name>service</name> |
|
4 |
<comment></comment> |
|
5 |
<projects> |
|
6 |
</projects> |
|
7 |
<buildSpec> |
|
8 |
<buildCommand> |
|
9 |
<name>org.eclipse.jdt.core.javabuilder</name> |
|
10 |
<arguments> |
|
11 |
</arguments> |
|
12 |
</buildCommand> |
|
13 |
</buildSpec> |
|
14 |
<natures> |
|
15 |
<nature>org.eclipse.jdt.core.javanature</nature> |
|
16 |
</natures> |
|
17 |
</projectDescription> |
service/Chunk.java | ||
---|---|---|
1 |
package service; |
|
2 |
|
|
3 |
import java.util.ArrayList; |
|
4 |
|
|
5 |
public class Chunk implements java.io.Serializable { |
|
6 |
|
|
7 |
private int chunkNr; |
|
8 |
private String fileID; |
|
9 |
private String filePathName = null; |
|
10 |
private int size; |
|
11 |
|
|
12 |
private int actual_replication_degree = 0; |
|
13 |
private int desired_replication_degree; |
|
14 |
|
|
15 |
private byte[] chunkContent; |
|
16 |
|
|
17 |
// each chunk is identified by the pair <fileID, chunkNo> |
|
18 |
public Chunk(int n, byte[] c, String s, int desiredRepDegree) throws Exception |
|
19 |
{ |
|
20 |
this.chunkNr = n; |
|
21 |
this.chunkContent = getFiltredData(c); |
|
22 |
|
|
23 |
if(chunkContent.length > Constants.MAX_CHUNK_SIZE) { |
|
24 |
throw new Exception("Invalid chunk size -> " + chunkContent.length); |
|
25 |
} |
|
26 |
|
|
27 |
this.size = chunkContent.length; |
|
28 |
|
|
29 |
desired_replication_degree = desiredRepDegree; |
|
30 |
|
|
31 |
fileID = s; |
|
32 |
} |
|
33 |
|
|
34 |
private byte[] getFiltredData(byte[] c) { |
|
35 |
ArrayList<Byte> aux = new ArrayList<Byte>(); |
|
36 |
boolean toAdd = false; |
|
37 |
|
|
38 |
for(byte b : c) { |
|
39 |
aux.add(b); |
|
40 |
} |
|
41 |
|
|
42 |
byte[] toReturn = new byte[aux.size()]; |
|
43 |
|
|
44 |
for(int i = aux.size() - 1, k = 0; i >= 0 ; i--) { |
|
45 |
if(aux.get(i) == 0) { |
|
46 |
if(!toAdd) { |
|
47 |
continue; |
|
48 |
} |
|
49 |
else { |
|
50 |
toReturn[k] = aux.get(i); |
|
51 |
k++; |
|
52 |
} |
|
53 |
} |
|
54 |
else { |
|
55 |
toReturn[k] = aux.get(i); |
|
56 |
toAdd = true; |
|
57 |
k++; |
|
58 |
} |
|
59 |
} |
|
60 |
|
|
61 |
return toReturn; |
|
62 |
} |
|
63 |
|
|
64 |
public int getReplicationDegree() { |
|
65 |
return desired_replication_degree; |
|
66 |
} |
|
67 |
|
|
68 |
public void setActualReplicationDegree(int rd) |
|
69 |
{ |
|
70 |
this.actual_replication_degree = rd; |
|
71 |
} |
|
72 |
|
|
73 |
|
|
74 |
public String getChunkNumber() |
|
75 |
{ |
|
76 |
return Integer.toString(this.chunkNr); |
|
77 |
} |
|
78 |
|
|
79 |
|
|
80 |
public String getChunkId() |
|
81 |
{ |
|
82 |
return this.fileID; |
|
83 |
} |
|
84 |
|
|
85 |
|
|
86 |
public int getChunkSize() |
|
87 |
{ |
|
88 |
return this.size; |
|
89 |
} |
|
90 |
|
|
91 |
|
|
92 |
public int getActualReplicationDegree() |
|
93 |
{ |
|
94 |
return this.actual_replication_degree; |
|
95 |
} |
|
96 |
|
|
97 |
|
|
98 |
public byte[] getChunkContent() |
|
99 |
{ |
|
100 |
return this.chunkContent; |
|
101 |
} |
|
102 |
|
|
103 |
public int compareTo(Chunk c) |
|
104 |
{ |
|
105 |
return(this.chunkNr - c.chunkNr); |
|
106 |
} |
|
107 |
|
|
108 |
@Override |
|
109 |
public boolean equals(Object obj) { |
|
110 |
if (this == obj) { |
|
111 |
return true; |
|
112 |
} |
|
113 |
if (obj == null) { |
|
114 |
return false; |
|
115 |
} |
|
116 |
if (!(obj instanceof Chunk)) { |
|
117 |
return false; |
|
118 |
} |
|
119 |
Chunk other = (Chunk) obj; |
|
120 |
if (chunkNr != other.chunkNr) { |
|
121 |
return false; |
|
122 |
} |
|
123 |
if (fileID == null) { |
|
124 |
if (other.fileID != null) { |
|
125 |
return false; |
|
126 |
} |
|
127 |
} else if (!fileID.equals(other.fileID)) { |
|
128 |
return false; |
|
129 |
} |
|
130 |
return true; |
|
131 |
} |
|
132 |
|
|
133 |
public String getFilePathName() { |
|
134 |
return filePathName; |
|
135 |
} |
|
136 |
|
|
137 |
public void setFilePathName(String filePathName) { |
|
138 |
this.filePathName = filePathName; |
|
139 |
} |
|
140 |
} |
service/Client.java | ||
---|---|---|
1 |
package service; |
|
2 |
|
|
3 |
import java.rmi.registry.LocateRegistry; |
|
4 |
import java.rmi.registry.Registry; |
|
5 |
|
|
6 |
public class Client { |
|
7 |
|
|
8 |
private Client() {} |
|
9 |
|
|
10 |
public static void main(String[] args) { |
|
11 |
|
|
12 |
System.out.println("CLIENT"); |
|
13 |
try |
|
14 |
{ |
|
15 |
int argc = args.length; |
|
16 |
|
|
17 |
if(argc < 2 || argc > 4) |
|
18 |
{ |
|
19 |
print_usage(); |
|
20 |
return; |
|
21 |
} |
|
22 |
|
|
23 |
|
|
24 |
String aux = args[0]; |
|
25 |
|
|
26 |
String[] acess_point = aux.split(":"); |
|
27 |
|
|
28 |
String ip = acess_point[0]; |
|
29 |
String id = acess_point[1]; |
|
30 |
|
|
31 |
String sub_protocol = args[1]; |
|
32 |
|
|
33 |
Registry registry = LocateRegistry.getRegistry(ip); |
|
34 |
RMI rmi = (RMI) registry.lookup(id); |
|
35 |
|
|
36 |
String file_path; |
|
37 |
|
|
38 |
String response; |
|
39 |
|
|
40 |
switch(sub_protocol) |
|
41 |
{ |
|
42 |
case "BACKUP": |
|
43 |
if(argc != 4) |
|
44 |
{ |
|
45 |
print_error_ops(sub_protocol); |
|
46 |
} |
|
47 |
|
|
48 |
|
|
49 |
file_path = args[2]; |
|
50 |
int replication_degree = Integer.parseInt(args[3]); |
|
51 |
|
|
52 |
|
|
53 |
response = rmi.backup(file_path, replication_degree); |
|
54 |
System.out.println("response: " + response); |
|
55 |
break; |
|
56 |
|
|
57 |
case "RESTORE": |
|
58 |
if(argc != 3) |
|
59 |
{ |
|
60 |
print_error_ops(sub_protocol); |
|
61 |
} |
|
62 |
|
|
63 |
file_path = args[2]; |
|
64 |
response = rmi.restore(file_path); |
|
65 |
System.out.println("response: " + response); |
|
66 |
break; |
|
67 |
|
|
68 |
case "DELETE": |
|
69 |
if(argc != 3) |
|
70 |
{ |
|
71 |
print_error_ops(sub_protocol); |
|
72 |
} |
|
73 |
|
|
74 |
|
|
75 |
file_path = args[2]; |
|
76 |
response = rmi.delete(file_path); |
|
77 |
System.out.println("response: " + response); |
|
78 |
break; |
|
79 |
|
|
80 |
case "RECLAIM": |
|
81 |
if(argc != 3) |
|
82 |
{ |
|
83 |
print_error_ops(sub_protocol); |
|
84 |
} |
|
85 |
|
|
86 |
long disk_space = Long.parseLong(args[2]); |
|
87 |
response = rmi.reclaim(disk_space); |
|
88 |
System.out.println("response: " + response); |
|
89 |
break; |
|
90 |
|
|
91 |
case "STATE": |
|
92 |
if(argc != 2) |
|
93 |
{ |
|
94 |
print_error_ops(sub_protocol); |
|
95 |
} |
|
96 |
|
|
97 |
response = rmi.state(); |
|
98 |
System.out.println("response: " + response); |
|
99 |
break; |
|
100 |
} |
|
101 |
|
|
102 |
} catch (Exception e) { |
|
103 |
System.err.println("Client exception: " + e.toString()); |
|
104 |
} |
|
105 |
} |
|
106 |
|
|
107 |
public static void print_usage() |
|
108 |
{ |
|
109 |
System.out.println("Usage: :\n"); |
|
110 |
System.out.println(" java Client <peer_ap> <operation> <opnd_1> <opnd_2> \n"); |
|
111 |
System.out.println(" - <peer_ap> -> peer's acess point\n"); |
|
112 |
System.out.println(" - <operation> -> Subprotocol ops: BACKUP, RESTORE, DELETE, RECLAIM; Enhancements: ops with ENH in the end. State: STATE \n"); |
|
113 |
System.out.println(" - <opnd_1> -> path name of the file or in case of op RECLAIM the max amout of disk space\n"); |
|
114 |
System.out.println(" - <opnd_2>* -> integer that specifies the desired replication degree\n"); |
|
115 |
} |
|
116 |
|
|
117 |
public static void print_error_ops(String error) |
|
118 |
{ |
|
119 |
switch(error) |
|
120 |
{ |
|
121 |
case "BACKUP": |
|
122 |
System.out.println("ERROR on BACKUP format. Must be:\n"); |
|
123 |
System.out.println("> BACKUP <file_path> <replication_degree>"); |
|
124 |
break; |
|
125 |
|
|
126 |
case "RESTORE": |
|
127 |
System.out.println("ERROR on RESTORE format. Must be:\n"); |
|
128 |
System.out.println("> RESTORE <file_path>"); |
|
129 |
break; |
|
130 |
|
|
131 |
case "DELETE": |
|
132 |
System.out.println("ERROR on DELETE format. Must be:\n"); |
|
133 |
System.out.println("> DELETE <file_path>"); |
|
134 |
break; |
|
135 |
|
|
136 |
case "RECLAIM": |
|
137 |
System.out.println("ERROR on RECLAIM format. Must be:\n"); |
|
138 |
System.out.println("> RECLAIM <disk_space>"); |
|
139 |
break; |
|
140 |
|
|
141 |
case "STATE": |
|
142 |
System.out.println("ERROR on STATE format. Must be:\n"); |
|
143 |
System.out.println("> RECLAIM <disk_space>"); |
|
144 |
break; |
|
145 |
|
|
146 |
default: |
|
147 |
System.out.println("ERROR. Unkown op"); |
|
148 |
} |
|
149 |
} |
|
150 |
} |
service/Cloud.java | ||
---|---|---|
1 |
package service; |
|
2 |
|
|
3 |
import channels.BackupChannel; |
|
4 |
import channels.ControlChannel; |
|
5 |
import channels.Message; |
|
6 |
import channels.MessageHeader; |
|
7 |
import channels.RestoreChannel; |
|
8 |
import protocols.SendChunkMessageProtocol; |
|
9 |
|
|
10 |
import java.io.*; |
|
11 |
import java.nio.file.Files; |
|
12 |
import java.nio.file.Path; |
|
13 |
import java.nio.file.Paths; |
|
14 |
import java.util.*; |
|
15 |
import java.util.concurrent.ConcurrentHashMap; |
|
16 |
import java.util.concurrent.ConcurrentHashMap.KeySetView; |
|
17 |
import java.util.concurrent.ConcurrentLinkedQueue; |
|
18 |
|
|
19 |
//https://www.geeksforgeeks.org/serialization-in-java/ |
|
20 |
|
|
21 |
public class Cloud implements java.io.Serializable |
|
22 |
{ |
|
23 |
private static final long serialVersionUID = 1L; |
|
24 |
|
|
25 |
/** |
|
26 |
* stores received V and backedUp chunks |
|
27 |
* key = <fileID> |
|
28 |
*/ |
|
29 |
private ConcurrentHashMap<String, Chunk> storedChunks; |
|
30 |
|
|
31 |
/** |
|
32 |
* stores the number of chunks taht a file that was backed up in peer was splited into |
|
33 |
* key = <fileID> |
|
34 |
*/ |
|
35 |
private ConcurrentHashMap<String, Integer> numberOfFileChunks; |
|
36 |
|
|
37 |
/** |
|
38 |
* registers the peers that have stored chunks sent by this peer |
|
39 |
* key = <fileID>:<ChunkNo> |
|
40 |
*/ |
|
41 |
private ConcurrentHashMap<String, HashSet<Integer>> colaborativePeers; |
|
42 |
|
|
43 |
/** |
|
44 |
* stores the number of times a chunk was stored |
|
45 |
* key = <fileID>:<ChunkNo> |
|
46 |
*/ |
|
47 |
private ConcurrentHashMap<String, Integer> numberOfChunksConfirmations; |
|
48 |
|
|
49 |
/** |
|
50 |
* key = <fileID>:<ChunkNo> |
|
51 |
*/ |
|
52 |
private ConcurrentHashMap<String, Integer> chunksToRestore; |
|
53 |
|
|
54 |
/** |
|
55 |
* Holds chucks restored by restore protocol |
|
56 |
* key = <fileID>:<ChunkNo> |
|
57 |
*/ |
|
58 |
private ConcurrentHashMap<String, Chunk> recoveredChunks; |
|
59 |
|
|
60 |
/** |
|
61 |
* stores the number of chunks a file was splited into |
|
62 |
* key = <fileID> |
|
63 |
*/ |
|
64 |
private ConcurrentLinkedQueue<String> numberOfFileChunksToBeRestored; |
|
65 |
|
|
66 |
private ArrayList<FileChunker> data; |
|
67 |
private int serverID; |
|
68 |
private String protocol_version; |
|
69 |
public ControlChannel controlRoom; |
|
70 |
public BackupChannel backupCh; |
|
71 |
public RestoreChannel restoreCh; |
|
72 |
|
|
73 |
long spaceRemaining; |
|
74 |
|
|
75 |
|
|
76 |
public Cloud(String id, String version) |
|
77 |
{ |
|
78 |
numberOfFileChunks = new ConcurrentHashMap<String, Integer>(); |
|
79 |
storedChunks = new ConcurrentHashMap<String, Chunk>(); |
|
80 |
colaborativePeers = new ConcurrentHashMap<String, HashSet<Integer>>(); |
|
81 |
numberOfChunksConfirmations = new ConcurrentHashMap<String, Integer>(); |
|
82 |
this.data = new ArrayList<>(); |
|
83 |
|
|
84 |
this.numberOfFileChunksToBeRestored = new ConcurrentLinkedQueue<String>(); |
|
85 |
this.recoveredChunks = new ConcurrentHashMap<String, Chunk>(); |
|
86 |
|
|
87 |
this.spaceRemaining = 1073741824; //1GB |
|
88 |
|
|
89 |
serverID = Integer.parseInt(id); |
|
90 |
protocol_version = version; |
|
91 |
} |
|
92 |
|
|
93 |
public synchronized boolean sendBackupMessage(Message m) { |
|
94 |
return backupCh.sendMessage(m); |
|
95 |
} |
|
96 |
|
|
97 |
public synchronized boolean sendRestoreMessage(Message m) { |
|
98 |
return restoreCh.sendMessage(m); |
|
99 |
} |
|
100 |
|
|
101 |
public Cloud getPeerTools(String id, String version) { |
|
102 |
return new Cloud(id, version); |
|
103 |
} |
|
104 |
|
|
105 |
public int getChunkCurrentRepDegree(String fileId, String chunkNo) { |
|
106 |
String key = fileId + ':' + chunkNo; |
|
107 |
|
|
108 |
if(numberOfChunksConfirmations.containsKey(key)) { |
|
109 |
return numberOfChunksConfirmations.get(key); |
|
110 |
} |
|
111 |
|
|
112 |
return -1; |
|
113 |
} |
|
114 |
|
|
115 |
public byte[] reverseList(byte[] a) { |
|
116 |
ArrayList<Byte> aux = new ArrayList<Byte>(a.length); |
|
117 |
|
|
118 |
for(byte b : a) { |
|
119 |
aux.add(0, b); |
|
120 |
} |
|
121 |
|
|
122 |
byte[] toReturn = new byte[a.length]; |
|
123 |
|
|
124 |
int i = 0; |
|
125 |
|
|
126 |
for(byte b : aux) { |
|
127 |
toReturn[i] = b; |
|
128 |
i++; |
|
129 |
} |
|
130 |
|
|
131 |
return toReturn; |
|
132 |
} |
|
133 |
|
|
134 |
public boolean buildFile(String file_id, int maxchunks, String fileName) throws IOException |
|
135 |
{ |
|
136 |
String restore_path = Constants.CHUNKS_DIR + this.serverID + "/restored/"; |
|
137 |
String key; |
|
138 |
Chunk aux; |
|
139 |
int total = 0; |
|
140 |
|
|
141 |
checkOrCreateDirectory(restore_path); |
|
142 |
|
|
143 |
File restoredFile = new File(restore_path + fileName); |
|
144 |
|
|
145 |
restoredFile.createNewFile(); |
|
146 |
|
|
147 |
FileOutputStream fos = new FileOutputStream(restoredFile); |
|
148 |
|
|
149 |
for(int i = 0; i < maxchunks; i++) |
|
150 |
{ |
|
151 |
key = file_id + ':' + Integer.toString(i); |
|
152 |
if(recoveredChunks.containsKey(key)) { |
|
153 |
aux = recoveredChunks.get(key); |
|
154 |
|
|
155 |
if(aux != null) { |
|
156 |
fos.write(reverseList(aux.getChunkContent())); |
|
157 |
total += aux.getChunkContent().length; |
|
158 |
} |
|
159 |
else { |
|
160 |
System.out.println("NULL chunk file might be corrupted"); |
|
161 |
} |
|
162 |
} |
|
163 |
} |
|
164 |
|
|
165 |
try |
|
166 |
{ |
|
167 |
fos.close(); |
|
168 |
} |
|
169 |
catch(IOException e) { |
|
170 |
recoveredChunks.clear(); |
|
171 |
System.out.println(e.getMessage()); |
|
172 |
} |
|
173 |
|
|
174 |
recoveredChunks.clear(); |
|
175 |
|
|
176 |
System.out.println("Restored file " + fileName + " with " + total + " bytes."); |
|
177 |
|
|
178 |
return false; |
|
179 |
} |
|
180 |
|
|
181 |
public ArrayList<FileChunker> getData() |
|
182 |
{ |
|
183 |
return this.data; |
|
184 |
} |
|
185 |
|
|
186 |
public ConcurrentHashMap<String, Integer> getChunksToRestore() |
|
187 |
{ |
|
188 |
return this.chunksToRestore; |
|
189 |
} |
|
190 |
|
|
191 |
void addFile(FileChunker fc) |
|
192 |
{ |
|
193 |
this.data.add(fc); |
|
194 |
} |
|
195 |
|
|
196 |
|
|
197 |
public void addChunk(Chunk c) { |
|
198 |
String key = c.getChunkId() + ":" + c.getChunkNumber(); |
|
199 |
storedChunks.put(key, c); |
|
200 |
} |
|
201 |
|
|
202 |
|
|
203 |
public void deleteChunks(FileChunker fc, ArrayList<FileChunker> fcs) |
|
204 |
{ |
|
205 |
|
|
206 |
String file_id = fc.getFileID(); |
|
207 |
|
|
208 |
String path = Constants.CHUNKS_DIR + this.serverID + "/backup/" + file_id + '/'; |
|
209 |
|
|
210 |
File f = new File(path); |
|
211 |
|
|
212 |
String[] entries = f.list(); |
|
213 |
|
|
214 |
for(String s: entries) |
|
215 |
{ |
|
216 |
File currentFile = new File(f.getPath(), s); |
|
217 |
currentFile.delete(); |
|
218 |
} |
|
219 |
|
|
220 |
f = new File(path); |
|
221 |
f.delete(); |
|
222 |
} |
|
223 |
|
|
224 |
public boolean storeRestoreChunk(Message m) { |
|
225 |
Chunk toStore = null; |
|
226 |
byte[] chunkData; |
|
227 |
|
|
228 |
try { |
|
229 |
chunkData = m.getChunkContent(); |
|
230 |
toStore = new Chunk(Integer.parseInt(m.getHeader().getChunkNumber()), chunkData, m.getHeader().getFileId(), Integer.parseInt(m.getHeader().getDegree())); |
|
231 |
} catch (Exception e) { |
|
232 |
System.out.println("Error in chunk: " + e.getMessage()); |
|
233 |
return false; |
|
234 |
} |
|
235 |
|
|
236 |
addToRecoveredChunks(toStore); |
|
237 |
|
|
238 |
return true; |
|
239 |
} |
|
240 |
|
|
241 |
public boolean storeChunk(Message m) { |
|
242 |
|
|
243 |
Chunk toStore = null; |
|
244 |
String filename, path; |
|
245 |
FileOutputStream out = null; |
|
246 |
File dir = null; |
|
247 |
byte[] chunkData; |
|
248 |
|
|
249 |
try { |
|
250 |
chunkData = m.getChunkContent(); |
|
251 |
toStore = new Chunk(Integer.parseInt(m.getHeader().getChunkNumber()), chunkData, m.getHeader().getFileId(), Integer.parseInt(m.getHeader().getDegree())); |
|
252 |
} catch (Exception e) { |
|
253 |
System.out.println("Error in chunk: " + e.getMessage()); |
|
254 |
return false; |
|
255 |
} |
|
256 |
|
|
257 |
if(toStore.getChunkContent().length > spaceRemaining) { |
|
258 |
System.out.println("There is no available space"); |
|
259 |
return false; |
|
260 |
} |
|
261 |
|
|
262 |
this.addChunk(toStore); |
|
263 |
spaceRemaining -= toStore.getChunkSize(); |
|
264 |
|
|
265 |
path = Constants.CHUNKS_DIR + this.serverID + "/backup/" + toStore.getChunkId() + "/"; |
|
266 |
filename = getFileName(Integer.parseInt(toStore.getChunkNumber())); |
|
267 |
|
|
268 |
if(!doesDirExists(path)) { |
|
269 |
dir = new File(path); |
|
270 |
if(!dir.mkdirs()) { |
|
271 |
return false; |
|
272 |
} |
|
273 |
} |
|
274 |
|
|
275 |
try { |
|
276 |
dir = new File(path + filename); |
|
277 |
dir.createNewFile(); |
|
278 |
out = new FileOutputStream(dir); |
|
279 |
out.write(toStore.getChunkContent()); |
|
280 |
out.close(); |
|
281 |
} catch (IOException e) { |
|
282 |
System.out.println("Error writing file to disk: " + e.getMessage()); |
|
283 |
return false; |
|
284 |
} |
|
285 |
|
|
286 |
return true; |
|
287 |
} |
|
288 |
|
|
289 |
public boolean checkIfChunkWasRestored(String fileid, String chunknr) |
|
290 |
{ |
|
291 |
return this.recoveredChunks.containsKey(fileid + ':' + chunknr); |
|
292 |
} |
|
293 |
|
|
294 |
public void addToRecoveredChunks(Chunk c) { |
|
295 |
String key = c.getChunkId() + ":" + c.getChunkNumber(); |
|
296 |
if(!recoveredChunks.containsKey(key)) { |
|
297 |
recoveredChunks.put(key, c); |
|
298 |
} |
|
299 |
} |
|
300 |
|
|
301 |
public void registerFileChunkToRestore(String file_id, String chunknr) |
|
302 |
{ |
|
303 |
|
|
304 |
if(!this.numberOfFileChunksToBeRestored.contains(file_id + ':' + chunknr)) |
|
305 |
{ |
|
306 |
this.numberOfFileChunksToBeRestored.add(file_id + ':' + chunknr); |
|
307 |
} |
|
308 |
} |
|
309 |
|
|
310 |
public boolean isFileChunkRegistered(String fileId, String chunknr) { |
|
311 |
return this.numberOfFileChunksToBeRestored.contains(fileId + ':' + chunknr); |
|
312 |
} |
|
313 |
|
|
314 |
public void createDirectoryFile(FileChunker fc) throws IOException |
|
315 |
{ |
|
316 |
String file_id = fc.getFileID(); |
|
317 |
|
|
318 |
String path = Constants.CHUNKS_DIR + this.serverID + "/backup/" + file_id + '/'; |
|
319 |
|
|
320 |
if(checkOrCreateDirectory(path) == -1) |
|
321 |
{ |
|
322 |
System.out.println("Couldn't create peer backup chunks dir for that file!"); |
|
323 |
} |
|
324 |
|
|
325 |
for(int i = 0; i < fc.getChunks().size(); i++) |
|
326 |
{ |
|
327 |
int nr = i + 1; |
|
328 |
String filename = "chk" + Integer.toString(nr); |
|
329 |
File dir = new File(path + filename); |
|
330 |
|
|
331 |
|
|
332 |
dir.createNewFile(); |
|
333 |
|
|
334 |
try(FileOutputStream out = new FileOutputStream(dir)) |
|
335 |
{ |
|
336 |
out.write(fc.getChunks().get(i).getChunkContent()); |
|
337 |
} |
|
338 |
} |
|
339 |
} |
|
340 |
|
|
341 |
private static String getFileName(int chunkNumber) { |
|
342 |
return "chk" + Integer.toString(chunkNumber); |
|
343 |
} |
|
344 |
|
|
345 |
private static boolean doesDirExists(String path) { |
|
346 |
File f = new File(path); |
|
347 |
|
|
348 |
return f.exists(); |
|
349 |
} |
|
350 |
|
|
351 |
public boolean checkIfPeerStoredFile(String filename) { |
|
352 |
return numberOfFileChunks.containsKey(filename); |
|
353 |
} |
|
354 |
|
|
355 |
public static int checkOrCreateDirectory(String dirName) |
|
356 |
{ |
|
357 |
|
|
358 |
File f = new File(dirName); |
|
359 |
|
|
360 |
int result = -1; |
|
361 |
|
|
362 |
if(!f.exists()) |
|
363 |
{ |
|
364 |
// System.out.println("Creating directory " + f.getName()); |
|
365 |
|
|
366 |
try |
|
367 |
{ |
|
368 |
f.mkdirs(); |
|
369 |
result = 1; |
|
370 |
} |
|
371 |
catch(SecurityException se) |
|
372 |
{ |
|
373 |
// System.err.println("Dir exception: " + se.toString()); |
|
374 |
se.printStackTrace(); |
|
375 |
result = -1; |
|
376 |
} |
|
377 |
|
|
378 |
if(result == 1) |
|
379 |
{ |
|
380 |
// System.out.println("DIR " + f.getName() + " created"); |
|
381 |
} |
|
382 |
} |
|
383 |
else if(f.exists()) |
|
384 |
{ |
|
385 |
// System.out.println("Directory already exists, proceeding"); |
|
386 |
result = 2; |
|
387 |
} |
|
388 |
else |
|
389 |
{ |
|
390 |
result = -1; |
|
391 |
} |
|
392 |
|
|
393 |
return result; |
|
394 |
} |
|
395 |
|
|
396 |
public void createControlRoom(String ip, String port) throws IOException { |
|
397 |
controlRoom = new ControlChannel(ip, port, this); |
|
398 |
} |
|
399 |
|
|
400 |
public void createBackupChannel(String ip, String port) throws IOException { |
|
401 |
backupCh = new BackupChannel(ip, port, this); |
|
402 |
} |
|
403 |
|
|
404 |
public void createRestoreChannel(String ip, String port) throws IOException { |
|
405 |
restoreCh = new RestoreChannel(ip, port, this); |
|
406 |
} |
|
407 |
|
|
408 |
public void activateChannels() { |
|
409 |
controlRoom.start(); |
|
410 |
backupCh.start(); |
|
411 |
restoreCh.start(); |
|
412 |
} |
|
413 |
|
|
414 |
|
|
415 |
public int getID() { |
|
416 |
return serverID; |
|
417 |
} |
|
418 |
|
|
419 |
public boolean storeChunk(Chunk c) |
|
420 |
{ |
|
421 |
if(!this.storedChunks.containsKey(c.getChunkId() + ":" + c.getChunkNumber())) { |
|
422 |
addChunk(c); |
|
423 |
spaceRemaining -= c.getChunkSize(); |
|
424 |
} |
|
425 |
|
|
426 |
return true; |
|
427 |
} |
|
428 |
|
|
429 |
public String getProtocolVersion() { |
|
430 |
return protocol_version; |
|
431 |
} |
|
432 |
|
|
433 |
public ConcurrentHashMap<String, Chunk> getStoredChunks() |
|
434 |
{ |
|
435 |
return this.storedChunks; |
|
436 |
} |
|
437 |
|
|
438 |
|
|
439 |
public synchronized void registerNumberOfFileChunks(String fileID, Integer NFileChunks) { |
|
440 |
System.out.println(fileID + " registered."); |
|
441 |
numberOfFileChunks.put(fileID, NFileChunks); |
|
442 |
} |
|
443 |
|
|
444 |
public synchronized boolean insertAwaitingStoreFile(MessageHeader h) { |
|
445 |
|
|
446 |
String key = h.getFileId() + ':' + h.getChunkNumber(); |
|
447 |
if(numberOfFileChunks.containsKey(h.getFileId())) { |
|
448 |
colaborativePeers.put(key, new HashSet<Integer>()); |
|
449 |
numberOfChunksConfirmations.put(key, 0); |
|
450 |
return true; |
|
451 |
} |
|
452 |
else { |
|
453 |
System.out.println("Cloud hasn't got " + h.getFileId() + " registered."); |
|
454 |
return false; |
|
455 |
} |
|
456 |
} |
|
457 |
|
|
458 |
public boolean wasTheRepDegreeMatched(Chunk c) { |
|
459 |
return c.getReplicationDegree() <= numberOfChunksConfirmations.get(c.getChunkId() + ':' + c.getChunkNumber()); |
|
460 |
} |
|
461 |
|
|
462 |
|
|
463 |
|
|
464 |
public synchronized boolean intrepertStoredMessage(MessageHeader h) { |
|
465 |
String key = h.getFileId() + ':' + h.getChunkNumber(); |
|
466 |
|
|
467 |
if(numberOfFileChunks.containsKey(h.getFileId())) { |
|
468 |
if(colaborativePeers.get(key).add(Integer.parseInt(h.getSenderId()))) { |
|
469 |
numberOfChunksConfirmations.put(key, numberOfChunksConfirmations.get(key) + 1); |
|
470 |
} |
|
471 |
return true; |
|
472 |
} |
|
473 |
else { |
|
474 |
if(storedChunks.containsKey(key)) { |
|
475 |
if(!colaborativePeers.containsKey(key)) { |
|
476 |
colaborativePeers.put(key, new HashSet<Integer>()); |
|
477 |
colaborativePeers.get(key).add(getID()); |
|
478 |
} |
|
479 |
colaborativePeers.get(key).add(Integer.parseInt(h.getSenderId())); |
|
480 |
storedChunks.get(key).setActualReplicationDegree(colaborativePeers.get(key).size()); |
|
481 |
|
|
482 |
return true; |
|
483 |
} |
|
484 |
return false; |
|
485 |
} |
|
486 |
} |
|
487 |
|
|
488 |
public synchronized boolean deleteFile(MessageHeader h) { |
|
489 |
String fileId = h.getFileId(); |
|
490 |
String path = Constants.CHUNKS_DIR + this.serverID + "/backup/" + fileId + "/"; |
|
491 |
File dir = new File(path); |
|
492 |
boolean somethingDeleted = false; |
|
493 |
|
|
494 |
if(dir.isDirectory()) { |
|
495 |
for(File f : dir.listFiles()) { |
|
496 |
f.delete(); |
|
497 |
somethingDeleted = true; |
|
498 |
} |
|
499 |
|
|
500 |
if(!dir.delete()) { |
|
501 |
System.out.println("Couldn't erase " + h.getFileId() + " from file system"); |
|
502 |
return false; |
|
503 |
} |
|
504 |
} |
|
505 |
|
|
506 |
for(String key : storedChunks.keySet()) { |
|
507 |
Chunk c = storedChunks.get(key); |
|
508 |
if(c.getChunkId().equals(fileId)) { |
|
509 |
storedChunks.remove(key); |
|
510 |
spaceRemaining += c.getChunkSize(); |
|
511 |
somethingDeleted = true; |
|
512 |
} |
|
513 |
} |
|
514 |
|
|
515 |
if(somethingDeleted) { |
|
516 |
System.out.println(h.getFileId() + " erased from file system."); |
|
517 |
} |
|
518 |
|
|
519 |
return true; |
|
520 |
} |
|
521 |
|
|
522 |
public KeySetView<String, Integer> getBackedUpFileNames() { |
|
523 |
return numberOfFileChunks.keySet(); |
|
524 |
} |
|
525 |
|
|
526 |
public String getPathNameFromFileId(String fileId) { |
|
527 |
if(storedChunks.containsKey(fileId + ":" + "0")) { |
|
528 |
return storedChunks.get(fileId + ":" + "0").getFilePathName(); |
|
529 |
} |
|
530 |
|
|
531 |
return null; |
|
532 |
} |
|
533 |
|
|
534 |
public int getBackedUpChunkNecessaryRepDegree(String fileId) { |
|
535 |
if(storedChunks.containsKey(fileId + ":" + "0")) { |
|
536 |
return storedChunks.get(fileId + ":" + "0").getReplicationDegree(); |
|
537 |
} |
|
538 |
|
|
539 |
return -1; |
|
540 |
} |
|
541 |
|
|
542 |
public int getBackepUpChunkPerceivedRepDegree(String fileId, int id) { |
|
543 |
if(colaborativePeers.containsKey(fileId + ":" + Integer.toString(id))) { |
|
544 |
return colaborativePeers.get(fileId + ":" + Integer.toString(id)).size(); |
|
545 |
} |
|
546 |
|
|
547 |
return 0; |
|
548 |
} |
|
549 |
|
|
550 |
public long getAvailableSpace() { |
|
551 |
return this.spaceRemaining; |
|
552 |
} |
|
553 |
|
|
554 |
public int getNumberOfStoredChunks() { |
|
555 |
return storedChunks.size(); |
|
556 |
} |
|
557 |
|
|
558 |
public long getChunkSpace() { |
|
559 |
long total = 0; |
|
560 |
|
|
561 |
for(String key : storedChunks.keySet()) { |
|
562 |
Chunk c = storedChunks.get(key); |
|
563 |
total += c.getChunkSize(); |
|
564 |
} |
|
565 |
|
|
566 |
return total; |
|
567 |
} |
|
568 |
|
|
569 |
public long getTotalSpace() { |
|
570 |
return this.spaceRemaining + getChunkSpace(); |
|
571 |
} |
|
572 |
|
|
573 |
public void setAvailableSpace(long s) { |
|
574 |
this.spaceRemaining = s-getChunkSpace(); |
|
575 |
} |
|
576 |
|
|
577 |
public void sendRemovedMessage(String fileId, String chunkNumber) { |
|
578 |
String[] headerElements = new String[Constants.REMOVED_N_ARGS]; |
|
579 |
MessageHeader h; |
|
580 |
headerElements[0] = Constants.REMOVED; |
|
581 |
headerElements[1] = getProtocolVersion(); |
|
582 |
headerElements[2] = Integer.toString(getID()); |
|
583 |
headerElements[3] = fileId; |
|
584 |
headerElements[4] = chunkNumber; |
|
585 |
|
|
586 |
h = new MessageHeader(headerElements); |
|
587 |
|
|
588 |
controlRoom.sendHeader(h); |
|
589 |
} |
|
590 |
|
|
591 |
public void sendHeader(MessageHeader mh) |
|
592 |
{ |
|
593 |
controlRoom.sendHeader(mh); |
|
594 |
} |
|
595 |
|
|
596 |
public boolean freeUpSpace(long desiredSpace) { |
|
597 |
|
|
598 |
String fileId, chunkNo, path; |
|
599 |
File file; |
|
600 |
|
|
601 |
for(String key : storedChunks.keySet()) { |
|
602 |
Chunk c = storedChunks.get(key); |
|
603 |
if(getChunkSpace() <= desiredSpace) { |
|
604 |
break; |
|
605 |
} |
|
606 |
|
|
607 |
fileId = c.getChunkId(); |
|
608 |
chunkNo = c.getChunkNumber(); |
|
609 |
|
|
610 |
path = Constants.CHUNKS_DIR + this.serverID + "/backup/" + fileId + "/chk" + chunkNo; |
|
611 |
|
|
612 |
file = new File(path); |
|
613 |
|
|
614 |
if(file.delete()) { |
|
615 |
sendRemovedMessage(fileId, chunkNo); |
|
616 |
storedChunks.remove(key); |
|
617 |
} |
|
618 |
} |
|
619 |
|
|
620 |
setAvailableSpace(desiredSpace); |
|
621 |
|
|
622 |
return true; |
|
623 |
} |
|
624 |
|
|
625 |
public boolean intrepertRemovedMessage(MessageHeader h) { |
|
626 |
String fileId, chunkNo, senderId, key; |
|
627 |
|
|
628 |
fileId = h.getFileId(); |
|
629 |
chunkNo = h.getChunkNumber(); |
|
630 |
senderId = h.getSenderId(); |
|
631 |
key = fileId + ":" + chunkNo; |
|
632 |
|
|
633 |
if(checkIfPeerStoredFile(fileId)) { |
|
634 |
if(numberOfChunksConfirmations.containsKey(key)) { |
|
635 |
numberOfChunksConfirmations.put(key, numberOfChunksConfirmations.get(key) - 1); |
|
636 |
} |
|
637 |
if(this.colaborativePeers.containsKey(key)) { |
|
638 |
this.colaborativePeers.get(key).remove(Integer.parseInt(senderId)); |
|
639 |
} |
|
640 |
if(storedChunks.containsKey(key)) { |
|
641 |
return storedChunks.get(key).getReplicationDegree() < numberOfChunksConfirmations.get(key); |
|
642 |
} |
|
643 |
} |
|
644 |
|
|
645 |
return true; |
|
646 |
} |
|
647 |
|
|
648 |
public int getNumberOfChunks(String file_id) |
|
649 |
{ |
|
650 |
if(numberOfFileChunks.containsKey(file_id)) { |
|
651 |
return numberOfFileChunks.get(file_id); |
|
652 |
} |
|
653 |
|
|
654 |
return -1; |
|
655 |
} |
|
656 |
|
|
657 |
public Chunk getFileChunkRepDegree(String fileId, String chunkNo) { |
|
658 |
|
|
659 |
if(storedChunks.containsKey(fileId + ":" + chunkNo)) { |
|
660 |
return storedChunks.get(fileId + ":" + chunkNo); |
|
661 |
} |
|
662 |
|
|
663 |
return null; |
|
664 |
} |
|
665 |
|
|
666 |
public void intrepertGetChunkMessage(MessageHeader header) { |
|
667 |
|
|
668 |
String key = header.getFileId() + ":" + header.getChunkNumber(); |
|
669 |
Chunk toSend; |
|
670 |
SendChunkMessageProtocol p; |
|
671 |
|
|
672 |
if(storedChunks.containsKey(key)) { |
|
673 |
System.out.println("Going to send chunk no " + header.getChunkNumber()); |
|
674 |
toSend = storedChunks.get(key); |
|
675 |
|
|
676 |
p = new SendChunkMessageProtocol(toSend, this); |
|
677 |
|
|
678 |
p.run(); |
|
679 |
} |
|
680 |
else { |
|
681 |
System.out.println("I dont have that chunk"); |
|
682 |
} |
|
683 |
} |
|
684 |
|
|
685 |
|
|
686 |
@SuppressWarnings({ "unchecked" }) |
|
687 |
public void loadFromDisk() throws IOException, Exception |
|
688 |
{ |
|
689 |
String path = Constants.DATABASE_DIR + "peer" + this.serverID + '/'; |
|
690 |
|
|
691 |
|
|
692 |
String path1 = path + "storedChunks.ser"; |
|
693 |
Path p = Paths.get(path1); |
|
694 |
if (Files.exists(p)) { |
|
695 |
FileInputStream fis = new FileInputStream(path1); |
|
696 |
ObjectInputStream ois = new ObjectInputStream(fis); |
|
697 |
this.setStoredChunks( (ConcurrentHashMap<String, Chunk>)ois.readObject()); |
|
698 |
fis.close(); |
|
699 |
ois.close(); |
|
700 |
} |
|
701 |
|
|
702 |
|
|
703 |
|
|
704 |
|
|
705 |
String path2 = path + "numberOfFileChunks.ser"; |
|
706 |
Path p2 = Paths.get(path2); |
|
707 |
if (Files.exists(p2)) { |
|
708 |
FileInputStream fis2 = new FileInputStream(path2); |
|
709 |
ObjectInputStream ois2 = new ObjectInputStream(fis2); |
|
710 |
this.setNumberOfFileChunks((ConcurrentHashMap<String, Integer>)ois2.readObject()); |
|
711 |
fis2.close(); |
|
712 |
ois2.close(); |
|
713 |
} |
|
714 |
|
|
715 |
|
|
716 |
String path3 = path + "colaborativePeers.ser"; |
|
717 |
Path p3 = Paths.get(path3); |
|
718 |
if (Files.exists(p3)) { |
|
719 |
FileInputStream fis3 = new FileInputStream(path3); |
|
720 |
ObjectInputStream ois3 = new ObjectInputStream(fis3); |
|
721 |
this.setColaborativePeers((ConcurrentHashMap<String, HashSet<Integer>>)ois3.readObject()); |
|
722 |
fis3.close(); |
|
723 |
ois3.close(); |
|
724 |
} |
|
725 |
|
|
726 |
|
|
727 |
String path4 = path + "numberOfChunksConfirmations.ser"; |
|
728 |
Path p4 = Paths.get(path4); |
|
729 |
if (Files.exists(p4)) { |
|
730 |
FileInputStream fis4 = new FileInputStream(path4); |
|
731 |
ObjectInputStream ois4 = new ObjectInputStream(fis4); |
|
732 |
this.setNumberOfChunksConfirmations((ConcurrentHashMap<String, Integer>)ois4.readObject()); |
|
733 |
fis4.close(); |
|
734 |
ois4.close(); |
|
735 |
} |
|
736 |
|
|
737 |
|
|
738 |
String path5 = path + "chunksToRestore.ser"; |
|
739 |
Path p5 = Paths.get(path5); |
|
740 |
if (Files.exists(p5)) { |
|
741 |
FileInputStream fis5 = new FileInputStream(path5); |
|
742 |
ObjectInputStream ois5 = new ObjectInputStream(fis5); |
|
743 |
this.setChunksToRestore((ConcurrentHashMap<String, Integer>)ois5.readObject()); |
|
744 |
fis5.close(); |
|
745 |
ois5.close(); |
|
746 |
} |
|
747 |
|
|
748 |
String path6 = path + "recoveredChunks.ser"; |
|
749 |
Path p6 = Paths.get(path6); |
|
750 |
if (Files.exists(p6)) { |
|
751 |
FileInputStream fis6 = new FileInputStream(path6); |
|
752 |
ObjectInputStream ois6 = new ObjectInputStream(fis6); |
|
753 |
this.setRecoveredChunks((ConcurrentHashMap<String, Chunk>)ois6.readObject()); |
|
754 |
fis6.close(); |
|
755 |
ois6.close(); |
|
756 |
} |
|
757 |
|
|
758 |
String path7 = path + "numberOfFileChunksToBeRestored.ser"; |
|
759 |
Path p7 = Paths.get(path7); |
|
760 |
if (Files.exists(p7)) { |
|
761 |
FileInputStream fis7 = new FileInputStream(path7); |
|
762 |
ObjectInputStream ois7 = new ObjectInputStream(fis7); |
|
763 |
this.setNumberOfFileChunksToBeRestored((ConcurrentLinkedQueue<String>)ois7.readObject()); |
|
764 |
fis7.close(); |
|
765 |
ois7.close(); |
|
766 |
} |
|
767 |
} |
|
768 |
|
|
769 |
@SuppressWarnings("resource") |
|
770 |
public synchronized void saveToDisk() throws IOException |
|
771 |
{ |
|
772 |
String path = Constants.DATABASE_DIR + "peer" + this.serverID + '/'; |
|
773 |
|
|
774 |
checkOrCreateDirectory(path); |
|
775 |
|
|
776 |
String path1 = path + "storedChunks.ser"; |
|
777 |
FileOutputStream fos1 = new FileOutputStream(path1); |
|
778 |
ObjectOutputStream oos1 = new ObjectOutputStream(fos1); |
|
779 |
oos1.writeObject(storedChunks); |
|
780 |
|
|
781 |
String path2 = path + "numberOfFileChunks.ser"; |
|
782 |
FileOutputStream fos2 = new FileOutputStream(path2); |
|
783 |
ObjectOutputStream oos2 = new ObjectOutputStream(fos2); |
|
784 |
oos2.writeObject(numberOfFileChunks); |
|
785 |
|
|
786 |
String path3 = path + "colaborativePeers.ser"; |
|
787 |
FileOutputStream fos3 = new FileOutputStream(path3); |
|
788 |
ObjectOutputStream oos3 = new ObjectOutputStream(fos3); |
|
789 |
oos3.writeObject(colaborativePeers); |
|
790 |
|
|
791 |
String path4 = path + "numberOfChunksConfirmations.ser"; |
|
792 |
FileOutputStream fos4 = new FileOutputStream(path4); |
|
793 |
ObjectOutputStream oos4 = new ObjectOutputStream(fos4); |
|
794 |
oos4.writeObject(numberOfChunksConfirmations); |
|
795 |
|
|
796 |
String path5 = path + "chunksToRestore.ser"; |
|
797 |
FileOutputStream fos5 = new FileOutputStream(path5); |
|
798 |
ObjectOutputStream oos5 = new ObjectOutputStream(fos5); |
|
799 |
oos5.writeObject(chunksToRestore); |
|
800 |
|
|
801 |
String path6 = path + "recoveredChunks.ser"; |
|
802 |
FileOutputStream fos6 = new FileOutputStream(path6); |
|
803 |
ObjectOutputStream oos6 = new ObjectOutputStream(fos6); |
|
804 |
oos6.writeObject(recoveredChunks); |
|
805 |
|
|
806 |
String path7 = path + "numberOfFileChunksToBeRestored.ser"; |
|
807 |
FileOutputStream fos7 = new FileOutputStream(path7); |
|
808 |
ObjectOutputStream oos7 = new ObjectOutputStream(fos7); |
|
809 |
oos7.writeObject(numberOfFileChunksToBeRestored); |
|
810 |
} |
|
811 |
|
|
812 |
public void setStoredChunks(ConcurrentHashMap<String, Chunk> sc) |
|
813 |
{ |
|
814 |
storedChunks = sc; |
|
815 |
} |
|
816 |
|
|
817 |
public void setNumberOfFileChunks(ConcurrentHashMap<String, Integer> si) |
|
818 |
{ |
|
819 |
numberOfFileChunks = si; |
|
820 |
} |
|
821 |
|
|
822 |
public void setColaborativePeers(ConcurrentHashMap<String, HashSet<Integer>> sh) |
|
823 |
{ |
|
824 |
colaborativePeers = sh; |
|
825 |
} |
|
826 |
|
|
827 |
public void setNumberOfChunksConfirmations(ConcurrentHashMap<String, Integer> si) |
|
828 |
{ |
|
829 |
numberOfChunksConfirmations = si; |
|
830 |
} |
|
831 |
|
|
832 |
public void setChunksToRestore(ConcurrentHashMap<String, Integer> si) |
|
833 |
{ |
|
834 |
chunksToRestore = si; |
|
835 |
} |
|
836 |
|
|
837 |
public void setRecoveredChunks(ConcurrentHashMap<String, Chunk> sc) |
|
838 |
{ |
|
839 |
recoveredChunks = sc; |
|
840 |
} |
|
841 |
|
|
842 |
public void setNumberOfFileChunksToBeRestored(ConcurrentLinkedQueue<String> s) |
|
843 |
{ |
|
844 |
numberOfFileChunksToBeRestored = s; |
|
845 |
} |
|
846 |
} |
channels/BackupChannel.java | ||
---|---|---|
1 |
package channels; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import protocols.SendSoredMessageProtocol; |
|
6 |
import service.Cloud; |
|
7 |
import service.Constants; |
|
8 |
|
|
9 |
public class BackupChannel extends Channel { |
|
10 |
private Message receivedMessage; |
|
11 |
|
|
12 |
public BackupChannel(String addr, String p, Cloud f) throws IOException { |
|
13 |
super(addr, p, f.getID(), f); |
|
14 |
} |
|
15 |
|
|
16 |
public void printLog(String msg) { |
|
17 |
System.out.println("MDB " + serverId + msg); |
|
18 |
} |
|
19 |
|
|
20 |
// This message is used to ensure that the chunk is backed up with the desired replication degree as follows. The initiator-peer collects the confirmation messages during a time interval of one second. If the number of confirmation messages it received up to the end of that interval is lower than the desired replication degree, it retransmits the backup message on the MDB channel, and doubles the time interval for receiving confirmation messages. This procedure is repeated up to a maximum number of five times, i.e. the initiator will send at most 5 PUTCHUNK messages per chunk. |
|
21 |
// |
|
22 |
// Hint: Because UDP is not reliable, a peer that has stored a chunk must reply with a STORED message to every PUTCHUNK message it receives. Furthermore, the initiator-peer needs to keep track of which peers have responded. |
|
23 |
// |
|
24 |
// A peer should also count the number of confirmation messages for each of the chunks it has stored and keep that count in non-volatile memory. This information can be useful if the peer runs out of disk space: in that event, the peer may try to free some space by evicting chunks whose actual replication degree is higher than the desired replication degree. |
|
25 |
|
|
26 |
@Override |
|
27 |
public void run () { |
|
28 |
printLog(": UP"); |
|
29 |
int messageSenderId = -1; |
|
30 |
String messageType; |
|
31 |
|
|
32 |
try { |
|
33 |
joinChannelGroup(); |
|
34 |
} catch (IOException e1) { |
|
35 |
e1.printStackTrace(); |
|
36 |
} |
|
37 |
|
|
38 |
do { |
|
39 |
try { |
|
40 |
|
|
41 |
|
|
42 |
receivedMessage = captureData(); |
|
43 |
|
|
44 |
if(receivedMessage == null) { |
|
45 |
printLog(" passing message."); |
|
46 |
continue; |
|
47 |
} |
|
48 |
|
|
49 |
messageType = receivedMessage.getHeader().getMessageType(); |
|
50 |
|
|
51 |
messageSenderId = Integer.parseInt(receivedMessage.getHeader().getSenderId()); |
|
52 |
|
|
53 |
if(messageType.equals(Constants.PUTCHUNK) && messageSenderId != getServerId()) { |
|
54 |
//printLog(" received: " + receivedMessage.getHeader().toString()); |
|
55 |
|
|
56 |
if(father.storeChunk(receivedMessage)) { |
|
57 |
SendSoredMessageProtocol mp = new SendSoredMessageProtocol(buildMessage(), father); |
|
58 |
|
|
59 |
mp.start(); |
|
60 |
} |
|
61 |
else { |
|
62 |
printLog(" ERROR: couldn't store chunk"); |
|
63 |
} |
|
64 |
} |
|
65 |
else { |
|
66 |
//printLog(" ERROR: Invalid message header received -> " + receivedMessage.getHeader().toString()); |
|
67 |
} |
|
68 |
|
|
69 |
// leaveGroupChannel(); |
|
70 |
} |
|
71 |
catch (IOException e) { |
|
72 |
printLog(" ERROR: " + "IOException -> " + e.getMessage()); |
|
73 |
} |
|
74 |
} while(true); |
|
75 |
} |
|
76 |
|
|
77 |
public MessageHeader buildMessage() { |
|
78 |
MessageHeader response; |
|
79 |
|
|
80 |
response = buildResponseMessage(); |
|
81 |
|
|
82 |
return response; |
|
83 |
} |
|
84 |
|
|
85 |
public MessageHeader buildResponseMessage() { |
|
86 |
MessageHeader response; |
|
87 |
String[] headerElements = new String[Constants.STORED_N_ARGS]; |
|
88 |
|
|
89 |
headerElements[0] = Constants.STORED; |
|
90 |
headerElements[1] = father.getProtocolVersion(); |
|
91 |
headerElements[2] = Integer.toString(father.getID()); |
|
92 |
headerElements[3] = receivedMessage.getHeader().getFileId(); |
|
93 |
headerElements[4] = receivedMessage.getHeader().getChunkNumber(); |
|
94 |
|
|
95 |
response = new MessageHeader(headerElements); |
|
96 |
|
|
97 |
return response; |
|
98 |
} |
|
99 |
} |
service/Constants.java | ||
---|---|---|
1 |
package service; |
|
2 |
|
|
3 |
public class Constants { |
|
4 |
public static final int MAX_CHUNK_SIZE = 64 * 1000; |
|
5 |
public static final int MAX_MESSAGE_SIZE = MAX_CHUNK_SIZE + 1024; |
|
6 |
public static final String CHUNKS_DIR = "../Project 1 -- Distributed Backup Service/peers/peer"; |
|
7 |
public static final String DATABASE_DIR = "../Project 1 -- Distributed Backup Service/database/"; |
|
8 |
public static final int MAX_NUMBER_OF_THREADS = 8; |
|
9 |
public static final String NORMAL_VERSION = "1.0"; |
|
10 |
|
|
11 |
// Available header types |
|
12 |
public static final String PUTCHUNK = "PUTCHUNK"; |
|
13 |
public static final String STORED = "STORED"; |
|
14 |
public static final String GETCHUNK = "GETCHUNK"; |
|
15 |
public static final String CHUNK = "CHUNK"; |
|
16 |
public static final String DELETE = "DELETE"; |
|
17 |
public static final String REMOVED = "REMOVED"; |
|
18 |
|
|
19 |
|
|
20 |
// headers necessary arguments |
|
21 |
|
|
22 |
public static final int PUTCHUNK_N_ARGS = 6; |
|
23 |
public static final int STORED_N_ARGS = 5; |
|
24 |
public static final int GETCHUNK_N_ARGS = 5; |
|
25 |
public static final int CHUNK_N_ARGS = 5; |
|
26 |
public static final int DELETE_N_ARGS = 4; |
|
27 |
public static final int REMOVED_N_ARGS = 5; |
|
28 |
|
|
29 |
//'0xD''0xA' |
|
30 |
public static final byte CR = 0xD; |
|
31 |
public static final byte LF = 0xA; |
|
32 |
|
|
33 |
public static final int MAX_PUTCHUNK_MESSAGES = 5; |
|
34 |
} |
service/Peer.java | ||
---|---|---|
1 |
package service; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.nio.file.Files; |
|
5 |
import java.nio.file.Path; |
|
6 |
import java.nio.file.Paths; |
|
7 |
import java.rmi.registry.Registry; |
|
8 |
import java.rmi.registry.LocateRegistry; |
|
9 |
import java.rmi.server.UnicastRemoteObject; |
|
10 |
import java.util.ArrayList; |
|
11 |
import java.util.List; |
|
12 |
import java.util.concurrent.Callable; |
|
13 |
import java.util.concurrent.ExecutionException; |
|
14 |
import java.util.concurrent.ExecutorService; |
|
15 |
import java.util.concurrent.Executors; |
|
16 |
import java.util.concurrent.Future; |
|
17 |
import java.util.concurrent.ScheduledThreadPoolExecutor; |
|
18 |
import java.util.concurrent.TimeUnit; |
|
19 |
|
|
20 |
import channels.Message; |
|
21 |
import channels.MessageHeader; |
|
22 |
import protocols.BackupChunkProtocol; |
|
23 |
import protocols.RestoreChunksProtocol; |
|
24 |
|
|
25 |
public class Peer implements RMI { |
|
26 |
|
|
27 |
ScheduledThreadPoolExecutor exec; |
|
28 |
private static Cloud cloud; |
|
29 |
|
|
30 |
public Peer() {} |
|
31 |
|
|
32 |
public static void main(String[] args) { |
|
33 |
|
|
34 |
try |
|
35 |
{ |
|
36 |
Peer peer = new Peer(); |
|
37 |
RMI rmi = (RMI) UnicastRemoteObject.exportObject(peer, 0); |
|
38 |
String acess_point = args[2]; |
|
39 |
Registry registry = LocateRegistry.getRegistry(); |
|
40 |
registry.bind(acess_point, rmi); |
|
41 |
System.err.println("Peer ready"); |
|
42 |
} |
|
43 |
catch (Exception e) { |
|
44 |
System.err.println("Peer exception: " + e.toString()); |
|
45 |
return; |
|
46 |
} |
|
47 |
|
|
48 |
if(args.length != 9) { |
|
49 |
printHelp(); |
|
50 |
return; |
|
51 |
} |
|
52 |
|
|
53 |
try { |
|
54 |
Cloud aux = new Cloud(args[0], args[1]); |
|
55 |
cloud = aux.getPeerTools(args[0], args[1]); |
|
56 |
cloud.createControlRoom(args[3], args[4]); |
|
57 |
cloud.createBackupChannel(args[5], args[6]); |
|
58 |
cloud.createRestoreChannel(args[7], args[8]); |
|
59 |
|
|
60 |
String path = Constants.DATABASE_DIR; |
|
61 |
Path p = Paths.get(path); |
|
62 |
if (Files.exists(p)) |
|
63 |
{ |
|
64 |
try { |
|
65 |
cloud.loadFromDisk(); |
|
66 |
} catch (Exception e) { |
|
67 |
e.printStackTrace(); |
|
68 |
} |
|
69 |
|
|
70 |
} |
|
71 |
|
|
72 |
// System.out.println(cloud.getStoredChunks().size()); |
|
73 |
|
|
74 |
} catch (NumberFormatException | IOException e) { |
|
75 |
System.out.println(e.getMessage()); |
|
76 |
return; |
|
77 |
} |
|
78 |
|
|
79 |
if(Cloud.checkOrCreateDirectory(Constants.CHUNKS_DIR + args[0] + "/") == -1) { |
|
80 |
System.out.println("Couldn't create peer chunks dir!"); |
|
81 |
} |
|
82 |
|
|
83 |
cloud.activateChannels(); |
|
84 |
System.out.println("Channels activated"); |
|
85 |
} |
|
86 |
|
|
87 |
// the name of each multicast channel consists of the ip multicast address and |
|
88 |
// port, passed as cmd line arguments, followed by protocol version, server id |
|
89 |
// and service access point |
|
90 |
private static void printHelp() { |
|
91 |
System.out.println("java service.Peer SERVER_id VERSION ACCESS_POINT MC_ip MC_port MDB_ip MDB_port MDR_ip MDR_port"); |
|
92 |
} |
|
93 |
|
|
94 |
public void saveState() { |
|
95 |
try { |
|
96 |
cloud.saveToDisk(); |
|
97 |
} catch (IOException e) { |
|
98 |
e.printStackTrace(); |
|
99 |
} |
|
100 |
} |
|
101 |
|
|
102 |
|
|
103 |
public synchronized String backup(String file_path, int replication_degree) |
|
104 |
{ |
|
105 |
FileChunker fc; |
|
106 |
Message m; |
|
107 |
MessageHeader h; |
|
108 |
int numberOfChunks; |
|
109 |
ExecutorService WORKER_THREAD_POOL; |
|
110 |
List<Callable<Object>> callables = new ArrayList<Callable<Object>>(); |
|
111 |
List<Future<Object>> futures = null; |
|
112 |
ArrayList<Chunk> chunkHolder = null; |
|
113 |
String[] headerElements = new String[Constants.PUTCHUNK_N_ARGS]; |
|
114 |
Chunk aux; |
|
115 |
int numberOfSteps; |
|
116 |
|
|
117 |
System.out.println("BACKUP request received"); |
|
118 |
|
|
119 |
try { |
|
120 |
fc = new FileChunker(file_path, replication_degree); |
|
121 |
// cloud.createDirectoryFile(fc); |
|
122 |
} |
|
123 |
catch (IOException e) { |
|
124 |
System.out.println("ERROR chunking file -> " + e.getMessage()); |
|
125 |
return "ERROR chunking file -> " + e.getMessage(); |
|
126 |
} |
|
127 |
|
|
128 |
chunkHolder = fc.getChunks(); |
|
129 |
numberOfChunks = chunkHolder.size(); |
|
130 |
|
|
131 |
cloud.registerNumberOfFileChunks(fc.getFileID(), numberOfChunks); |
|
132 |
|
|
133 |
System.out.println("File splited into " + numberOfChunks + " chunks."); |
|
134 |
|
|
135 |
headerElements[0] = Constants.PUTCHUNK; |
Also available in: Unified diff