root / DistributedBackupService / src / handlers / Handler.java
History | View | Annotate | Download (2.91 KB)
1 | 1 | up20130859 | package handlers; |
---|---|---|---|
2 | |||
3 | import protocols.*; |
||
4 | import server.Peer; |
||
5 | |||
6 | import java.util.Random; |
||
7 | import java.util.concurrent.*; |
||
8 | |||
9 | import chunk.Chunk; |
||
10 | import chunk.FileManager; |
||
11 | |||
12 | public class Handler implements Runnable { |
||
13 | 2 | up20130859 | private BlockingQueue<Message> msgQueue; |
14 | private ScheduledExecutorService agent; |
||
15 | private Peer peer;
|
||
16 | 1 | up20130859 | private Chunk peerInformation;
|
17 | 2 | up20130859 | |
18 | 1 | up20130859 | private int delayLimit; |
19 | |||
20 | private Random random; |
||
21 | |||
22 | 2 | up20130859 | public Handler(Peer peer) { |
23 | this.peer = peer;
|
||
24 | this.peerInformation = peer.getPeerInformation();
|
||
25 | 1 | up20130859 | msgQueue = new LinkedBlockingQueue<>(); |
26 | 2 | up20130859 | agent = Executors.newScheduledThreadPool(5); |
27 | 1 | up20130859 | delayLimit = 300;
|
28 | |||
29 | this.random = new Random(); |
||
30 | } |
||
31 | |||
32 | @Override
|
||
33 | public void run() { |
||
34 | Message msg; |
||
35 | |||
36 | while (true) { |
||
37 | try {
|
||
38 | msg = msgQueue.take(); |
||
39 | if (msg == null) { |
||
40 | 2 | up20130859 | System.out.println("No msg received!"); |
41 | 1 | up20130859 | return;
|
42 | } |
||
43 | 2 | up20130859 | System.out.println("R:: " + msg.toString()); |
44 | 1 | up20130859 | switch (msg.getType()) {
|
45 | case PUTCHUNK:
|
||
46 | 2 | up20130859 | Backup backup = new Backup(peer, msg);
|
47 | agent.execute(backup); |
||
48 | 1 | up20130859 | break;
|
49 | case STORED:
|
||
50 | peerInformation.addChunkReplication(msg.getFileID(), msg.getChunkNo()); |
||
51 | break;
|
||
52 | case REMOVED:
|
||
53 | 2 | up20130859 | FileManager database = peer.getDatabase(); |
54 | 1 | up20130859 | String fileID = msg.getFileID();
|
55 | int chunkNo = msg.getChunkNo();
|
||
56 | |||
57 | Chunk chunkInfo = database.getChunkInformation(fileID, chunkNo); |
||
58 | |||
59 | int perceivedReplication = database.getRecognizedReplication(fileID, chunkNo);
|
||
60 | int targetReplication = chunkInfo.getReplication();
|
||
61 | |||
62 | if (perceivedReplication < targetReplication) {
|
||
63 | 2 | up20130859 | byte[] chunkData = peer.loadChunk(fileID, chunkNo); |
64 | 1 | up20130859 | |
65 | 2 | up20130859 | agent.schedule( |
66 | new RemovedChunkHandler(peer, chunkInfo, chunkData),
|
||
67 | 1 | up20130859 | this.random.nextInt(delayLimit + 1), |
68 | TimeUnit.MILLISECONDS
|
||
69 | ); |
||
70 | } |
||
71 | break;
|
||
72 | case DELETE:
|
||
73 | 2 | up20130859 | Delete delete = new Delete(peer, msg);
|
74 | agent.execute(delete); |
||
75 | 1 | up20130859 | break;
|
76 | default:
|
||
77 | return;
|
||
78 | } |
||
79 | } catch (InterruptedException e) { |
||
80 | e.printStackTrace(); |
||
81 | } |
||
82 | } |
||
83 | } |
||
84 | |||
85 | public void pushMessage(byte[] data, int length) { |
||
86 | Message msgParsed = new Message(data, length);
|
||
87 | msgQueue.add(msgParsed); |
||
88 | } |
||
89 | 2 | up20130859 | |
90 | // Getters and setters
|
||
91 | |||
92 | public BlockingQueue<Message> getMsgQueue() { |
||
93 | return msgQueue;
|
||
94 | } |
||
95 | |||
96 | public ScheduledExecutorService getAgent() { |
||
97 | return agent;
|
||
98 | } |
||
99 | |||
100 | public Peer getPeer() {
|
||
101 | return peer;
|
||
102 | } |
||
103 | |||
104 | public Chunk getPeerInformation() {
|
||
105 | return peerInformation;
|
||
106 | } |
||
107 | |||
108 | public int getDelayLimit() { |
||
109 | return delayLimit;
|
||
110 | } |
||
111 | |||
112 | public Random getRandom() { |
||
113 | return random;
|
||
114 | } |
||
115 | |||
116 | public void setMsgQueue(BlockingQueue<Message> msgQueue) { |
||
117 | this.msgQueue = msgQueue;
|
||
118 | } |
||
119 | |||
120 | public void setAgent(ScheduledExecutorService agent) { |
||
121 | this.agent = agent;
|
||
122 | } |
||
123 | |||
124 | public void setPeer(Peer peer) { |
||
125 | this.peer = peer;
|
||
126 | } |
||
127 | |||
128 | public void setPeerInformation(Chunk peerInformation) { |
||
129 | this.peerInformation = peerInformation;
|
||
130 | } |
||
131 | |||
132 | public void setDelayLimit(int delayLimit) { |
||
133 | this.delayLimit = delayLimit;
|
||
134 | } |
||
135 | |||
136 | public void setRandom(Random random) { |
||
137 | this.random = random;
|
||
138 | } |
||
139 | 1 | up20130859 | } |