root / PutChunkTask.java @ 1
History | View | Annotate | Download (4.19 KB)
1 | 1 | up20150476 | import java.io.File; |
---|---|---|---|
2 | import java.io.FileOutputStream; |
||
3 | import java.io.IOException; |
||
4 | import java.net.DatagramPacket; |
||
5 | import java.net.DatagramSocket; |
||
6 | import java.net.InetAddress; |
||
7 | import java.util.Random; |
||
8 | import java.util.concurrent.ConcurrentHashMap; |
||
9 | |||
10 | /**
|
||
11 | * PutChunkTask
|
||
12 | */
|
||
13 | public class PutChunkTask implements Runnable { |
||
14 | private static final String CRLF = Integer.toHexString(0xD) + Integer.toHexString(0xA); |
||
15 | |||
16 | private String version; |
||
17 | private int senderId; |
||
18 | private String fileId; |
||
19 | private int chunkNo; |
||
20 | private int replicationDegree; |
||
21 | private byte[] body; |
||
22 | |||
23 | private InetAddress MCaddress; |
||
24 | private int MCport; |
||
25 | |||
26 | private MDBListener mdb;
|
||
27 | |||
28 | private Peer peer;
|
||
29 | |||
30 | PutChunkTask(String[] args, InetAddress MCaddress, int MCport, MDBListener mdb, Peer peer, byte[] body) { |
||
31 | this.version = args[1]; |
||
32 | this.senderId = Integer.parseInt(args[2]); |
||
33 | this.fileId = args[3]; |
||
34 | this.chunkNo = Integer.parseInt(args[4]); |
||
35 | this.replicationDegree = Integer.parseInt(args[5]); |
||
36 | this.body = body;
|
||
37 | |||
38 | this.MCaddress = MCaddress;
|
||
39 | this.MCport = MCport;
|
||
40 | this.mdb = mdb;
|
||
41 | |||
42 | this.peer = peer;
|
||
43 | } |
||
44 | |||
45 | @Override
|
||
46 | public void run() { |
||
47 | File dir = new File("Peer-" + this.peer.getSenderId() + File.separator + "Chunks" + File.separator + this.fileId); |
||
48 | File file = new File(dir.getPath() + File.separator + this.chunkNo); |
||
49 | dir.mkdirs(); |
||
50 | try {
|
||
51 | //STORED <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF>
|
||
52 | String message = "STORED "+this.version+" "+this.senderId+" "+this.fileId+" "+this.chunkNo+" "+CRLF+CRLF; |
||
53 | |||
54 | DatagramSocket socket = new DatagramSocket(); |
||
55 | DatagramPacket packet = new DatagramPacket(message.getBytes(), message.length(), this.MCaddress, this.MCport); |
||
56 | Random random = new Random(); |
||
57 | |||
58 | //Random delay between [0-400] ms to avoid overlapping messages
|
||
59 | Thread.sleep(random.nextInt(401)); |
||
60 | |||
61 | //Protocol enhancement - Only store the chunk if the desired replication degree hasn't been achieved yet to avoid using unnecessary storage space
|
||
62 | if (this.version.equals("2.0")) { |
||
63 | if (!this.peer.repDegMap.containsKey(this.fileId)) |
||
64 | this.peer.repDegMap.put(this.fileId, new ConcurrentHashMap<>()); |
||
65 | |||
66 | if(!this.peer.repDegMap.get(this.fileId).containsKey(this.chunkNo)) |
||
67 | this.peer.repDegMap.get(this.fileId).put(this.chunkNo, 0); |
||
68 | |||
69 | this.mdb.print("RepDegMap("+this.chunkNo+") = "+this.peer.repDegMap.get(this.fileId).get(this.chunkNo)); |
||
70 | if (this.peer.repDegMap.get(this.fileId).get(this.chunkNo) < this.replicationDegree){ |
||
71 | FileOutputStream stream = new FileOutputStream(file); |
||
72 | stream.write(this.body);
|
||
73 | this.peer.addStorage(this.body.length); |
||
74 | stream.close(); |
||
75 | |||
76 | if (this.peer.chunksMap.get(this.fileId) == null) |
||
77 | this.peer.chunksMap.put(this.fileId, new BackupFile(this.fileId)); |
||
78 | |||
79 | this.peer.chunksMap.get(this.fileId).addChunk(this.chunkNo, new Chunk(this.peer.chunksMap.get(this.fileId), this.chunkNo, this.body)); |
||
80 | |||
81 | socket.send(packet); |
||
82 | } |
||
83 | } |
||
84 | //Normal protocol - Always store the chunk, even if the replication degree might have been achieved already
|
||
85 | else {
|
||
86 | FileOutputStream stream = new FileOutputStream(file); |
||
87 | stream.write(this.body);
|
||
88 | this.peer.addStorage(this.body.length); |
||
89 | stream.close(); |
||
90 | |||
91 | if (this.peer.chunksMap.get(this.fileId) == null) |
||
92 | this.peer.chunksMap.put(this.fileId, new BackupFile(this.fileId)); |
||
93 | |||
94 | this.peer.chunksMap.get(this.fileId).getChunksMap().put(this.chunkNo, new Chunk(this.peer.chunksMap.get(this.fileId), this.chunkNo, this.body)); |
||
95 | |||
96 | socket.send(packet); |
||
97 | } |
||
98 | |||
99 | socket.close(); |
||
100 | } catch (IOException | InterruptedException e) { |
||
101 | e.printStackTrace(); |
||
102 | return;
|
||
103 | } |
||
104 | this.mdb.print("PUTCHUNK protocol finished"); |
||
105 | } |
||
106 | } |