Project

General

Profile

Statistics
| Revision:

root / DistributedBackupService / src / handlers / Handler.java @ 2

History | View | Annotate | Download (2.91 KB)

1
package handlers;
2

    
3
import protocols.*;
4
import server.Peer;
5

    
6
import java.util.Random;
7
import java.util.concurrent.*;
8

    
9
import chunk.Chunk;
10
import chunk.FileManager;
11

    
12
public class Handler implements Runnable {
13
        private BlockingQueue<Message> msgQueue;
14
        private ScheduledExecutorService agent;
15
        private Peer peer;
16
        private Chunk peerInformation;
17

    
18
        private int delayLimit;
19

    
20
        private Random random;
21

    
22
        public Handler(Peer peer) {
23
                this.peer = peer;
24
                this.peerInformation = peer.getPeerInformation();
25
                msgQueue = new LinkedBlockingQueue<>();
26
                agent = Executors.newScheduledThreadPool(5);
27
                delayLimit = 300;
28

    
29
                this.random = new Random();
30
        }
31

    
32
        @Override
33
        public void run() {
34
                Message msg;
35

    
36
                while (true) {
37
                        try {
38
                                msg = msgQueue.take();
39
                                if (msg == null) {
40
                                        System.out.println("No msg received!");
41
                                        return;
42
                                }
43
                                System.out.println("R:: " + msg.toString());
44
                                switch (msg.getType()) {
45
                                case PUTCHUNK:
46
                                        Backup backup = new Backup(peer, msg);
47
                                        agent.execute(backup);
48
                                        break;
49
                                case STORED:
50
                                        peerInformation.addChunkReplication(msg.getFileID(), msg.getChunkNo());
51
                                        break;
52
                                case REMOVED:
53
                                        FileManager database = peer.getDatabase();
54
                                        String fileID = msg.getFileID();
55
                                        int chunkNo = msg.getChunkNo();
56
                                        
57
                                        Chunk chunkInfo = database.getChunkInformation(fileID, chunkNo);
58

    
59
                                        int perceivedReplication = database.getRecognizedReplication(fileID, chunkNo);
60
                                        int targetReplication = chunkInfo.getReplication();
61

    
62
                                        if (perceivedReplication < targetReplication) {
63
                                                byte[] chunkData = peer.loadChunk(fileID, chunkNo);
64

    
65
                                                agent.schedule(
66
                                                                new RemovedChunkHandler(peer, chunkInfo, chunkData),
67
                                                                this.random.nextInt(delayLimit + 1),
68
                                                                TimeUnit.MILLISECONDS
69
                                                                );
70
                                        }
71
                                        break;
72
                                case DELETE:
73
                                        Delete delete = new Delete(peer, msg);
74
                                        agent.execute(delete);
75
                                        break;
76
                                default:
77
                                        return;
78
                                }
79
                        } catch (InterruptedException e) {
80
                                e.printStackTrace();
81
                        }
82
                }
83
        }
84

    
85
        public void pushMessage(byte[] data, int length) {
86
                Message msgParsed = new Message(data, length);
87
                msgQueue.add(msgParsed);
88
        }
89

    
90
        // Getters and setters
91
        
92
        public BlockingQueue<Message> getMsgQueue() {
93
                return msgQueue;
94
        }
95

    
96
        public ScheduledExecutorService getAgent() {
97
                return agent;
98
        }
99

    
100
        public Peer getPeer() {
101
                return peer;
102
        }
103

    
104
        public Chunk getPeerInformation() {
105
                return peerInformation;
106
        }
107

    
108
        public int getDelayLimit() {
109
                return delayLimit;
110
        }
111

    
112
        public Random getRandom() {
113
                return random;
114
        }
115

    
116
        public void setMsgQueue(BlockingQueue<Message> msgQueue) {
117
                this.msgQueue = msgQueue;
118
        }
119

    
120
        public void setAgent(ScheduledExecutorService agent) {
121
                this.agent = agent;
122
        }
123

    
124
        public void setPeer(Peer peer) {
125
                this.peer = peer;
126
        }
127

    
128
        public void setPeerInformation(Chunk peerInformation) {
129
                this.peerInformation = peerInformation;
130
        }
131

    
132
        public void setDelayLimit(int delayLimit) {
133
                this.delayLimit = delayLimit;
134
        }
135

    
136
        public void setRandom(Random random) {
137
                this.random = random;
138
        }
139
}