root / DistributedBackupService / src / handlers / Handler.java @ 1
History | View | Annotate | Download (2.2 KB)
1 |
package handlers; |
---|---|
2 |
|
3 |
import protocols.*; |
4 |
import server.Peer; |
5 |
|
6 |
import java.util.Random; |
7 |
import java.util.concurrent.*; |
8 |
|
9 |
import chunk.Chunk; |
10 |
import chunk.FileManager; |
11 |
|
12 |
public class Handler implements Runnable { |
13 |
private Peer parentPeer;
|
14 |
private Chunk peerInformation;
|
15 |
private BlockingQueue<Message> msgQueue; |
16 |
private ScheduledExecutorService executor; |
17 |
private int delayLimit; |
18 |
|
19 |
private Random random; |
20 |
|
21 |
public Handler(Peer parentPeer) { |
22 |
this.parentPeer = parentPeer;
|
23 |
this.peerInformation = parentPeer.getPeerInformation();
|
24 |
msgQueue = new LinkedBlockingQueue<>(); |
25 |
executor = Executors.newScheduledThreadPool(5); |
26 |
delayLimit = 300;
|
27 |
|
28 |
this.random = new Random(); |
29 |
} |
30 |
|
31 |
@Override
|
32 |
public void run() { |
33 |
Message msg; |
34 |
|
35 |
while (true) { |
36 |
try {
|
37 |
msg = msgQueue.take(); |
38 |
if (msg == null) { |
39 |
System.out.println("Null Message Received"); |
40 |
return;
|
41 |
} |
42 |
|
43 |
System.out.println("R: " + msg.toString()); |
44 |
|
45 |
switch (msg.getType()) {
|
46 |
case PUTCHUNK:
|
47 |
Backup backup = new Backup(parentPeer, msg);
|
48 |
executor.execute(backup); |
49 |
break;
|
50 |
case STORED:
|
51 |
peerInformation.addChunkReplication(msg.getFileID(), msg.getChunkNo()); |
52 |
break;
|
53 |
case GETCHUNK:
|
54 |
|
55 |
break;
|
56 |
case CHUNK:
|
57 |
|
58 |
break;
|
59 |
case REMOVED:
|
60 |
FileManager database = parentPeer.getDatabase(); |
61 |
String fileID = msg.getFileID();
|
62 |
int chunkNo = msg.getChunkNo();
|
63 |
|
64 |
Chunk chunkInfo = database.getChunkInformation(fileID, chunkNo); |
65 |
|
66 |
int perceivedReplication = database.getRecognizedReplication(fileID, chunkNo);
|
67 |
int targetReplication = chunkInfo.getReplication();
|
68 |
|
69 |
if (perceivedReplication < targetReplication) {
|
70 |
byte[] chunkData = parentPeer.loadChunk(fileID, chunkNo); |
71 |
|
72 |
executor.schedule( |
73 |
new RemovedChunkHandler(parentPeer, chunkInfo, chunkData),
|
74 |
this.random.nextInt(delayLimit + 1), |
75 |
TimeUnit.MILLISECONDS
|
76 |
); |
77 |
} |
78 |
break;
|
79 |
case DELETE:
|
80 |
Delete delete = new Delete(parentPeer, msg);
|
81 |
executor.execute(delete); |
82 |
break;
|
83 |
default:
|
84 |
return;
|
85 |
} |
86 |
} catch (InterruptedException e) { |
87 |
e.printStackTrace(); |
88 |
} |
89 |
} |
90 |
} |
91 |
|
92 |
|
93 |
public void pushMessage(byte[] data, int length) { |
94 |
Message msgParsed = new Message(data, length);
|
95 |
msgQueue.add(msgParsed); |
96 |
} |
97 |
} |