Project

General

Profile

Statistics
| Revision:

root / PutChunkTask.java @ 1

History | View | Annotate | Download (4.19 KB)

1
import java.io.File;
2
import java.io.FileOutputStream;
3
import java.io.IOException;
4
import java.net.DatagramPacket;
5
import java.net.DatagramSocket;
6
import java.net.InetAddress;
7
import java.util.Random;
8
import java.util.concurrent.ConcurrentHashMap;
9

    
10
/**
11
 * PutChunkTask
12
 */
13
public class PutChunkTask implements Runnable {
14
    private static final String CRLF = Integer.toHexString(0xD) + Integer.toHexString(0xA);
15

    
16
    private String version;
17
    private int senderId;
18
    private String fileId;
19
    private int chunkNo;
20
    private int replicationDegree;
21
    private byte[] body;
22

    
23
    private InetAddress MCaddress;
24
    private int MCport;
25

    
26
    private MDBListener mdb;
27

    
28
    private Peer peer;
29

    
30
    PutChunkTask(String[] args, InetAddress MCaddress, int MCport, MDBListener mdb, Peer peer, byte[] body) {
31
        this.version = args[1];
32
        this.senderId = Integer.parseInt(args[2]);
33
        this.fileId = args[3];
34
        this.chunkNo = Integer.parseInt(args[4]);
35
        this.replicationDegree = Integer.parseInt(args[5]);
36
        this.body = body;
37

    
38
        this.MCaddress = MCaddress;
39
        this.MCport = MCport;
40
        this.mdb = mdb;
41

    
42
        this.peer = peer;
43
    }
44

    
45
    @Override
46
    public void run() {
47
        File dir =  new File("Peer-" + this.peer.getSenderId() + File.separator + "Chunks" + File.separator + this.fileId);
48
        File file = new File(dir.getPath() + File.separator + this.chunkNo);
49
        dir.mkdirs();
50
        try {
51
            //STORED <Version> <SenderId> <FileId> <ChunkNo> <CRLF><CRLF>
52
            String message = "STORED "+this.version+" "+this.senderId+" "+this.fileId+" "+this.chunkNo+" "+CRLF+CRLF;
53

    
54
            DatagramSocket socket = new DatagramSocket();
55
            DatagramPacket packet = new DatagramPacket(message.getBytes(), message.length(), this.MCaddress, this.MCport);
56
            Random random = new Random();
57

    
58
            //Random delay between [0-400] ms to avoid overlapping messages
59
            Thread.sleep(random.nextInt(401));
60

    
61
            //Protocol enhancement - Only store the chunk if the desired replication degree hasn't been achieved yet to avoid using unnecessary storage space
62
            if (this.version.equals("2.0")) {
63
                if (!this.peer.repDegMap.containsKey(this.fileId))
64
                    this.peer.repDegMap.put(this.fileId, new ConcurrentHashMap<>());
65

    
66
                if(!this.peer.repDegMap.get(this.fileId).containsKey(this.chunkNo))
67
                    this.peer.repDegMap.get(this.fileId).put(this.chunkNo, 0);
68

    
69
                this.mdb.print("RepDegMap("+this.chunkNo+") = "+this.peer.repDegMap.get(this.fileId).get(this.chunkNo));
70
                if (this.peer.repDegMap.get(this.fileId).get(this.chunkNo) < this.replicationDegree){
71
                    FileOutputStream stream = new FileOutputStream(file);
72
                    stream.write(this.body);
73
                    this.peer.addStorage(this.body.length);
74
                    stream.close();
75
                    
76
                    if (this.peer.chunksMap.get(this.fileId) == null) 
77
                        this.peer.chunksMap.put(this.fileId, new BackupFile(this.fileId));
78

    
79
                    this.peer.chunksMap.get(this.fileId).addChunk(this.chunkNo, new Chunk(this.peer.chunksMap.get(this.fileId), this.chunkNo, this.body));
80

    
81
                    socket.send(packet);
82
                }
83
            } 
84
            //Normal protocol - Always store the chunk, even if the replication degree might have been achieved already
85
            else {
86
                FileOutputStream stream = new FileOutputStream(file);
87
                stream.write(this.body);
88
                        this.peer.addStorage(this.body.length);
89
                stream.close();
90
                
91
                if (this.peer.chunksMap.get(this.fileId) == null) 
92
                    this.peer.chunksMap.put(this.fileId, new BackupFile(this.fileId));
93

    
94
                this.peer.chunksMap.get(this.fileId).getChunksMap().put(this.chunkNo, new Chunk(this.peer.chunksMap.get(this.fileId), this.chunkNo, this.body));
95

    
96
                socket.send(packet);
97
            }
98
            
99
            socket.close();
100
        } catch (IOException | InterruptedException e) {
101
            e.printStackTrace();
102
            return;
103
        }
104
        this.mdb.print("PUTCHUNK protocol finished");
105
    }
106
}