root / PutChunkTask.java @ 1
History | View | Annotate | Download (4.19 KB)
1 |
import java.io.File; |
---|---|
2 |
import java.io.FileOutputStream; |
3 |
import java.io.IOException; |
4 |
import java.net.DatagramPacket; |
5 |
import java.net.DatagramSocket; |
6 |
import java.net.InetAddress; |
7 |
import java.util.Random; |
8 |
import java.util.concurrent.ConcurrentHashMap; |
9 |
|
10 |
/**
|
11 |
* PutChunkTask
|
12 |
*/
|
13 |
public class PutChunkTask implements Runnable { |
14 |
private static final String CRLF = Integer.toHexString(0xD) + Integer.toHexString(0xA); |
15 |
|
16 |
private String version; |
17 |
private int senderId; |
18 |
private String fileId; |
19 |
private int chunkNo; |
20 |
private int replicationDegree; |
21 |
private byte[] body; |
22 |
|
23 |
private InetAddress MCaddress; |
24 |
private int MCport; |
25 |
|
26 |
private MDBListener mdb;
|
27 |
|
28 |
private Peer peer;
|
29 |
|
30 |
PutChunkTask(String[] args, InetAddress MCaddress, int MCport, MDBListener mdb, Peer peer, byte[] body) { |
31 |
this.version = args[1]; |
32 |
this.senderId = Integer.parseInt(args[2]); |
33 |
this.fileId = args[3]; |
34 |
this.chunkNo = Integer.parseInt(args[4]); |
35 |
this.replicationDegree = Integer.parseInt(args[5]); |
36 |
this.body = body;
|
37 |
|
38 |
this.MCaddress = MCaddress;
|
39 |
this.MCport = MCport;
|
40 |
this.mdb = mdb;
|
41 |
|
42 |
this.peer = peer;
|
43 |
} |
44 |
|
45 |
@Override
|
46 |
public void run() { |
47 |
File dir = new File("Peer-" + this.peer.getSenderId() + File.separator + "Chunks" + File.separator + this.fileId); |
48 |
File file = new File(dir.getPath() + File.separator + this.chunkNo); |
49 |
dir.mkdirs(); |
50 |
try {
|
51 |
//STORED <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF>
|
52 |
String message = "STORED "+this.version+" "+this.senderId+" "+this.fileId+" "+this.chunkNo+" "+CRLF+CRLF; |
53 |
|
54 |
DatagramSocket socket = new DatagramSocket(); |
55 |
DatagramPacket packet = new DatagramPacket(message.getBytes(), message.length(), this.MCaddress, this.MCport); |
56 |
Random random = new Random(); |
57 |
|
58 |
//Random delay between [0-400] ms to avoid overlapping messages
|
59 |
Thread.sleep(random.nextInt(401)); |
60 |
|
61 |
//Protocol enhancement - Only store the chunk if the desired replication degree hasn't been achieved yet to avoid using unnecessary storage space
|
62 |
if (this.version.equals("2.0")) { |
63 |
if (!this.peer.repDegMap.containsKey(this.fileId)) |
64 |
this.peer.repDegMap.put(this.fileId, new ConcurrentHashMap<>()); |
65 |
|
66 |
if(!this.peer.repDegMap.get(this.fileId).containsKey(this.chunkNo)) |
67 |
this.peer.repDegMap.get(this.fileId).put(this.chunkNo, 0); |
68 |
|
69 |
this.mdb.print("RepDegMap("+this.chunkNo+") = "+this.peer.repDegMap.get(this.fileId).get(this.chunkNo)); |
70 |
if (this.peer.repDegMap.get(this.fileId).get(this.chunkNo) < this.replicationDegree){ |
71 |
FileOutputStream stream = new FileOutputStream(file); |
72 |
stream.write(this.body);
|
73 |
this.peer.addStorage(this.body.length); |
74 |
stream.close(); |
75 |
|
76 |
if (this.peer.chunksMap.get(this.fileId) == null) |
77 |
this.peer.chunksMap.put(this.fileId, new BackupFile(this.fileId)); |
78 |
|
79 |
this.peer.chunksMap.get(this.fileId).addChunk(this.chunkNo, new Chunk(this.peer.chunksMap.get(this.fileId), this.chunkNo, this.body)); |
80 |
|
81 |
socket.send(packet); |
82 |
} |
83 |
} |
84 |
//Normal protocol - Always store the chunk, even if the replication degree might have been achieved already
|
85 |
else {
|
86 |
FileOutputStream stream = new FileOutputStream(file); |
87 |
stream.write(this.body);
|
88 |
this.peer.addStorage(this.body.length); |
89 |
stream.close(); |
90 |
|
91 |
if (this.peer.chunksMap.get(this.fileId) == null) |
92 |
this.peer.chunksMap.put(this.fileId, new BackupFile(this.fileId)); |
93 |
|
94 |
this.peer.chunksMap.get(this.fileId).getChunksMap().put(this.chunkNo, new Chunk(this.peer.chunksMap.get(this.fileId), this.chunkNo, this.body)); |
95 |
|
96 |
socket.send(packet); |
97 |
} |
98 |
|
99 |
socket.close(); |
100 |
} catch (IOException | InterruptedException e) { |
101 |
e.printStackTrace(); |
102 |
return;
|
103 |
} |
104 |
this.mdb.print("PUTCHUNK protocol finished"); |
105 |
} |
106 |
} |