Revision 15
threads/ManagePutchunk.java | ||
---|---|---|
1 |
package threads; |
|
2 |
|
|
3 |
|
|
4 |
import java.io.IOException; |
|
5 |
import java.util.concurrent.TimeUnit; |
|
6 |
import server.Server; |
|
7 |
|
|
8 |
public class ManagePutchunk implements Runnable { |
|
9 |
|
|
10 |
private byte[] message; |
|
11 |
private int time; |
|
12 |
private String key; |
|
13 |
private int replicationDegree; |
|
14 |
private int tries; |
|
15 |
|
|
16 |
public ManagePutchunk(byte[] message, int time, String fileID, int chunkNr, int replicationDegree) { |
|
17 |
this.message = message; |
|
18 |
this.time = time; |
|
19 |
this.key = fileID + '.' + chunkNr; |
|
20 |
this.replicationDegree = replicationDegree; |
|
21 |
this.tries = 1; |
|
22 |
} |
|
23 |
|
|
24 |
@Override |
|
25 |
public void run() { |
|
26 |
|
|
27 |
int occurrences = Server.getStorage().getStoredTimes().get(this.key); |
|
28 |
|
|
29 |
if (occurrences < replicationDegree) { |
|
30 |
try { |
|
31 |
Server.getmdb().sendMessage(message); |
|
32 |
} catch (IOException e) { |
|
33 |
e.printStackTrace(); |
|
34 |
} |
|
35 |
System.out.println("Sent PUTCHUNK try: " + tries); |
|
36 |
System.out.println(key); |
|
37 |
this.time = 2 * this.time; |
|
38 |
this.tries++; |
|
39 |
|
|
40 |
if (this.tries < 5) |
|
41 |
Server.getThreadLauncher().schedule(this, this.time, TimeUnit.SECONDS); |
|
42 |
|
|
43 |
|
|
44 |
|
|
45 |
|
|
46 |
} |
|
47 |
|
|
48 |
|
|
49 |
|
|
50 |
} |
|
51 |
} |
threads/ManageRestored.java | ||
---|---|---|
1 |
package threads; |
|
2 |
import java.io.*; |
|
3 |
import java.util.*; |
|
4 |
import server.Server; |
|
5 |
|
|
6 |
public class ManageRestored implements Runnable { |
|
7 |
|
|
8 |
private String fileName; |
|
9 |
|
|
10 |
public ManageRestored(String fileName) { |
|
11 |
this.fileName = fileName; |
|
12 |
} |
|
13 |
|
|
14 |
@Override |
|
15 |
public void run() { |
|
16 |
|
|
17 |
if (!Server.getStorage().getRemainingChunks().containsValue("false")) { |
|
18 |
if (restoreFile()) |
|
19 |
System.out.println("> File restored!\n"); |
|
20 |
else System.out.println("ERROR: File not restored.\n"); |
|
21 |
} else System.out.println("ERROR: File not restored, chunks missing.\n"); |
|
22 |
|
|
23 |
Server.getStorage().getRemainingChunks().clear(); |
|
24 |
} |
|
25 |
|
|
26 |
private boolean restoreFile() { |
|
27 |
String filePath = "database/" + Server.getServerId() + "/" + this.fileName; |
|
28 |
File file = new File(filePath); |
|
29 |
byte[] body; |
|
30 |
|
|
31 |
try { |
|
32 |
FileOutputStream fos = new FileOutputStream(file, true); |
|
33 |
|
|
34 |
if (!file.exists()) { |
|
35 |
file.getParentFile().mkdirs(); |
|
36 |
file.createNewFile(); |
|
37 |
} |
|
38 |
|
|
39 |
List<String> sortedChunkKeys = new ArrayList<>(Server.getStorage().getRemainingChunks().keySet()); |
|
40 |
|
|
41 |
sortedChunkKeys.sort((o1, o2) -> { |
|
42 |
int chunkNr1 = Integer.valueOf(o1.split(".")[1]); |
|
43 |
int chunkNr2 = Integer.valueOf(o2.split(".")[1]); |
|
44 |
return Integer.compare(chunkNr1, chunkNr2); |
|
45 |
}); |
|
46 |
|
|
47 |
for (String key : sortedChunkKeys) { |
|
48 |
String chunkPath = Server.getServerId() + "/" + key; |
|
49 |
|
|
50 |
File chunkFile = new File(chunkPath); |
|
51 |
|
|
52 |
if (!chunkFile.exists()) { |
|
53 |
return false; |
|
54 |
} |
|
55 |
|
|
56 |
body = new byte[(int) chunkFile.length()]; |
|
57 |
FileInputStream in = new FileInputStream(chunkFile); |
|
58 |
|
|
59 |
in.read(body); |
|
60 |
fos.write(body); |
|
61 |
|
|
62 |
chunkFile.delete(); |
|
63 |
in.close(); |
|
64 |
} |
|
65 |
|
|
66 |
fos.close(); |
|
67 |
return true; |
|
68 |
} catch (IOException e) { |
|
69 |
e.printStackTrace(); |
|
70 |
} |
|
71 |
return false; |
|
72 |
} |
|
73 |
} |
threads/ReceiveGetChunk.java | ||
---|---|---|
1 |
|
|
2 |
package threads; |
|
3 |
|
|
4 |
import java.io.File; |
|
5 |
import java.io.FileInputStream; |
|
6 |
import java.io.FileOutputStream; |
|
7 |
import java.io.IOException; |
|
8 |
import java.nio.charset.StandardCharsets; |
|
9 |
import java.util.Random; |
|
10 |
import java.util.concurrent.TimeUnit; |
|
11 |
|
|
12 |
import chunk.*; |
|
13 |
import server.*; |
|
14 |
|
|
15 |
public class ReceiveGetChunk implements Runnable { |
|
16 |
|
|
17 |
private String fileId; |
|
18 |
private int chunkNum; |
|
19 |
|
|
20 |
|
|
21 |
public ReceiveGetChunk( String fileId, int chunkNr) { |
|
22 |
this.fileId = fileId; |
|
23 |
this.chunkNum = chunkNr; |
|
24 |
} |
|
25 |
|
|
26 |
@Override |
|
27 |
public void run() { |
|
28 |
|
|
29 |
for (int i = 0; i < Server.getStorage().getSavedChunks().size(); i++) { |
|
30 |
if (isSameChunk(Server.getStorage().getSavedChunks().get(i).getfileId(), Server.getStorage().getSavedChunks().get(i).getNum()) && !isAbortSend()) { |
|
31 |
String header = "CHUNK " + "1.0" + " " + Server.getServerId() + " " + this.fileId + " " + this.chunkNum + "\r\n\r\n"; |
|
32 |
|
|
33 |
try { |
|
34 |
byte[] asciiHeader = header.getBytes(StandardCharsets.US_ASCII); |
|
35 |
|
|
36 |
String chunkPath = "database/" + Server.getServerId() + "/" + fileId + "." + chunkNum; |
|
37 |
|
|
38 |
File file = new File(chunkPath); |
|
39 |
byte[] body = new byte[(int) file.length()]; |
|
40 |
FileInputStream in = new FileInputStream(file); |
|
41 |
in.read(body); |
|
42 |
|
|
43 |
byte[] message = new byte[asciiHeader.length + body.length]; |
|
44 |
System.arraycopy(asciiHeader, 0, message, 0, asciiHeader.length); |
|
45 |
System.arraycopy(body, 0, message, asciiHeader.length, body.length); |
|
46 |
|
|
47 |
SendMessage sendThread = new SendMessage(message, "mdr"); |
|
48 |
System.out.println("Sent" + "CHUNK " + "1.0" + " " + Server.getServerId() + " " + this.fileId + " " + this.chunkNum); |
|
49 |
Random random = new Random(); |
|
50 |
Server.getThreadLauncher().schedule(sendThread, random.nextInt(401), TimeUnit.MILLISECONDS); |
|
51 |
} catch (IOException e) { |
|
52 |
e.printStackTrace(); |
|
53 |
} |
|
54 |
|
|
55 |
} |
|
56 |
|
|
57 |
} |
|
58 |
} |
|
59 |
private boolean isSameChunk(String fileId, int chunkNr) { |
|
60 |
return fileId.equals(this.fileId) && chunkNr == this.chunkNum; |
|
61 |
} |
|
62 |
|
|
63 |
private boolean isAbortSend() { |
|
64 |
for (int i = 0; i < Server.getStorage().getReceivedChunks().size(); i++) { |
|
65 |
if (isSameChunk(Server.getStorage().getReceivedChunks().get(i).getfileId(), Server.getStorage().getReceivedChunks().get(i).getNum())) |
|
66 |
return true; |
|
67 |
} |
|
68 |
return false; |
|
69 |
} |
|
70 |
} |
threads/ReceivePutchunk.java | ||
---|---|---|
1 |
package threads; |
|
2 |
|
|
3 |
import java.io.File; |
|
4 |
import java.io.FileOutputStream; |
|
5 |
import java.io.IOException; |
|
6 |
import chunk.*; |
|
7 |
import server.*; |
|
8 |
|
|
9 |
public class ReceivePutchunk implements Runnable { |
|
10 |
|
|
11 |
private Double version; |
|
12 |
private String fileId; |
|
13 |
private int chunkNr; |
|
14 |
private int replicationDegree; |
|
15 |
private byte[] content; |
|
16 |
|
|
17 |
public ReceivePutchunk(double version, String fileId, int chunkNr, int replicationDegree, byte[] content) { |
|
18 |
this.version = version; |
|
19 |
this.fileId = fileId; |
|
20 |
this.chunkNr = chunkNr; |
|
21 |
this.replicationDegree = replicationDegree; |
|
22 |
this.content = content; |
|
23 |
} |
|
24 |
|
|
25 |
@Override |
|
26 |
public void run() { |
|
27 |
|
|
28 |
String key = fileId + "." + chunkNr; |
|
29 |
|
|
30 |
for (int i = 0; i < Server.getStorage().getFiles().size(); i++) { |
|
31 |
if (Server.getStorage().getFiles().get(i).getFileId().equals(fileId)) |
|
32 |
return; |
|
33 |
} |
|
34 |
|
|
35 |
if (version == 2.0) { |
|
36 |
if (Server.getStorage().getStoredTimes().get(key) >= replicationDegree) |
|
37 |
return; |
|
38 |
|
|
39 |
if (Server.getStorage().getSpaceAvailable() >= content.length) { |
|
40 |
Chunk chunk = null; |
|
41 |
try { |
|
42 |
chunk = new Chunk(chunkNr, fileId, replicationDegree, content.length); |
|
43 |
} catch (IOException e1) { |
|
44 |
e1.printStackTrace(); |
|
45 |
} |
|
46 |
|
|
47 |
if (!Server.getStorage().addSavedChunk(chunk)) { |
|
48 |
return; |
|
49 |
} |
|
50 |
|
|
51 |
Server.getStorage().decSpaceAvailable(content.length); |
|
52 |
|
|
53 |
//creates the file and saves the chunk |
|
54 |
try { |
|
55 |
String filename = Server.getServerId() + "/" + fileId + "." + chunkNr; |
|
56 |
|
|
57 |
File file = new File(filename); |
|
58 |
if (!file.exists()) { |
|
59 |
file.getParentFile().mkdirs(); |
|
60 |
file.createNewFile(); |
|
61 |
} |
|
62 |
|
|
63 |
try (FileOutputStream fos = new FileOutputStream(filename)) { |
|
64 |
fos.write(content); |
|
65 |
} |
|
66 |
|
|
67 |
} catch (IOException e) { |
|
68 |
e.printStackTrace(); |
|
69 |
} |
|
70 |
|
|
71 |
Server.getStorage().incStoredOccurrences(fileId, chunkNr); |
|
72 |
String header = "STORED " + "1.0" + " " + Server.getServerId() + " " + fileId + " " + chunkNr |
|
73 |
+ "\r\n\r\n"; |
|
74 |
System.out.println( |
|
75 |
"Sent " + "STORED " + "1.0" + " " + Server.getServerId() + " " + fileId + " " + chunkNr); |
|
76 |
try { |
|
77 |
Server.getmc().sendMessage(header.getBytes()); |
|
78 |
} catch (IOException e) { |
|
79 |
e.printStackTrace(); |
|
80 |
} |
|
81 |
} else { |
|
82 |
System.out.println("Not enough space in this peer " + fileId + "_" + chunkNr); |
|
83 |
} |
|
84 |
|
|
85 |
} |
|
86 |
} |
|
87 |
} |
threads/SendMessage.java | ||
---|---|---|
1 |
package threads; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import server.Server; |
|
5 |
import multicastchannels.*; |
|
6 |
|
|
7 |
public class SendMessage implements Runnable { |
|
8 |
private byte[] message; |
|
9 |
private String channel; |
|
10 |
|
|
11 |
public SendMessage(byte[] message, String channel) { |
|
12 |
this.message = message; |
|
13 |
this.channel = channel; |
|
14 |
|
|
15 |
} |
|
16 |
|
|
17 |
@Override |
|
18 |
public void run() { |
|
19 |
switch (channel) { |
|
20 |
|
|
21 |
case "mc": |
|
22 |
try { |
|
23 |
Server.getmc().sendMessage(message); |
|
24 |
} catch (IOException e) { |
|
25 |
e.printStackTrace(); |
|
26 |
} |
|
27 |
break; |
|
28 |
|
|
29 |
case "mdb": |
|
30 |
try { |
|
31 |
Server.getmdb().sendMessage(message); |
|
32 |
} catch (IOException e) { |
|
33 |
e.printStackTrace(); |
|
34 |
} |
|
35 |
break; |
|
36 |
|
|
37 |
case "mdr": |
|
38 |
try { |
|
39 |
Server.getmdr().sendMessage(message); |
|
40 |
} catch (IOException e) { |
|
41 |
e.printStackTrace(); |
|
42 |
} |
|
43 |
break; |
|
44 |
|
|
45 |
} |
|
46 |
} |
|
47 |
} |
threads/messageManagement.java | ||
---|---|---|
1 |
package threads; |
|
2 |
|
|
3 |
import java.util.ArrayList; |
|
4 |
import server.Server; |
|
5 |
import java.util.Arrays; |
|
6 |
import java.util.List; |
|
7 |
import java.util.Random; |
|
8 |
import java.util.concurrent.TimeUnit; |
|
9 |
|
|
10 |
public class messageManagement implements Runnable{ |
|
11 |
private byte[] msg; |
|
12 |
|
|
13 |
public messageManagement(byte[] msg) { |
|
14 |
this.msg = msg; |
|
15 |
|
|
16 |
} |
|
17 |
|
|
18 |
@Override |
|
19 |
public void run() { |
|
20 |
|
|
21 |
List<byte[]> HB = getHeaderNBody(); |
|
22 |
byte[] header = HB.get(0); |
|
23 |
byte[] body = HB.get(1); |
|
24 |
|
|
25 |
String message = new String(this.msg, 0, this.msg.length); |
|
26 |
String[] newMessage = message.split(" "); |
|
27 |
|
|
28 |
String messageType = newMessage[0]; |
|
29 |
Double version = Double.parseDouble(newMessage[1]); |
|
30 |
int senderID = Integer.parseInt(newMessage[2]); |
|
31 |
String fileID = newMessage[3]; |
|
32 |
int chunkNr = Integer.parseInt(newMessage[4]); |
|
33 |
int replicationDegree = Integer.parseInt(newMessage[5]); |
|
34 |
|
|
35 |
|
|
36 |
//removeBlanks(newMessage); |
|
37 |
|
|
38 |
|
|
39 |
|
|
40 |
switch (newMessage[0]) { |
|
41 |
case "PUTCHUNK": |
|
42 |
putchunkOperation(version, fileID, chunkNr, replicationDegree, body); |
|
43 |
break; |
|
44 |
case "GETCHUNK": |
|
45 |
getchunkOperation(version, senderID, fileID, chunkNr); |
|
46 |
break; |
|
47 |
case "DELETE": |
|
48 |
deleteOperation(version, senderID, fileID); |
|
49 |
break; |
|
50 |
case "REMOVED": |
|
51 |
removedOperation(version, senderID, fileID, chunkNr); |
|
52 |
break; |
|
53 |
case "STORED": |
|
54 |
storedOperation(version, senderID, fileID, chunkNr); |
|
55 |
break; |
|
56 |
|
|
57 |
} |
|
58 |
|
|
59 |
} |
|
60 |
|
|
61 |
private void deleteOperation(Double version, int senderID, String fileID) { |
|
62 |
if (Integer.parseInt(fileID) != senderID) { |
|
63 |
Server.getStorage().deleteStoredChunks(fileID); |
|
64 |
System.out.println("Received DELETE " + version + " " + senderID + " " + fileID); |
|
65 |
} |
|
66 |
} |
|
67 |
|
|
68 |
private void removedOperation(Double version, int senderID, String fileID, int chunkNr) { |
|
69 |
} |
|
70 |
|
|
71 |
private void getchunkOperation(Double version, int senderID, String fileID, int chunkNr) { |
|
72 |
if (Integer.parseInt(fileID) != senderID) { |
|
73 |
Random random = new Random(); |
|
74 |
System.out.println("Received GETCHUNK " + version + " " + senderID + " " + fileID + " " + chunkNr); |
|
75 |
Server.getThreadLauncher().schedule(new ReceiveGetChunk(fileID, chunkNr), random.nextInt(401), TimeUnit.MILLISECONDS); |
|
76 |
} |
|
77 |
|
|
78 |
} |
|
79 |
|
|
80 |
private void storedOperation(Double version, int senderID, String fileID, int chunkNr) { |
|
81 |
if (Integer.parseInt(fileID) != senderID) { |
|
82 |
Server.getStorage().incStoredOccurrences(fileID, chunkNr); |
|
83 |
System.out.println("Received STORED " + version + " " + senderID + " " + fileID + " " + chunkNr); |
|
84 |
} |
|
85 |
|
|
86 |
} |
|
87 |
|
|
88 |
private void putchunkOperation(Double version, String fileID, int chunkNr, int replicationDegree, byte[] body) { |
|
89 |
Random r = new Random(); |
|
90 |
System.out.println("Received PUTCHUNK " + version + " " + fileID + " " + chunkNr); |
|
91 |
Server.getThreadLauncher().schedule(new ReceivePutchunk(version, fileID, chunkNr, replicationDegree, body), r.nextInt(401), TimeUnit.MILLISECONDS); |
|
92 |
|
|
93 |
} |
|
94 |
|
|
95 |
private void removeBlanks(String[] newMessage) { |
|
96 |
for(int i=0; i<newMessage.length; i++) { |
|
97 |
newMessage[i] = newMessage[i].replaceAll("\\s","" ); |
|
98 |
} |
|
99 |
|
|
100 |
} |
|
101 |
|
|
102 |
private List<byte[]> getHeaderNBody(){ |
|
103 |
|
|
104 |
int h = this.msg.length -4; |
|
105 |
|
|
106 |
byte[] header = Arrays.copyOfRange(this.msg, 0 ,h); |
|
107 |
byte[] body = Arrays.copyOfRange(this.msg, h, this.msg.length); |
|
108 |
|
|
109 |
List<byte[]> headerNBody = new ArrayList<>(); |
|
110 |
|
|
111 |
headerNBody.add(header); |
|
112 |
headerNBody.add(body); |
|
113 |
|
|
114 |
return headerNBody; |
|
115 |
} |
|
116 |
|
|
117 |
} |
Also available in: Unified diff