root / DistributedBackupService / src / handlers / Handler.java @ 1
History | View | Annotate | Download (2.2 KB)
1 | 1 | up20130859 | package handlers; |
---|---|---|---|
2 | |||
3 | import protocols.*; |
||
4 | import server.Peer; |
||
5 | |||
6 | import java.util.Random; |
||
7 | import java.util.concurrent.*; |
||
8 | |||
9 | import chunk.Chunk; |
||
10 | import chunk.FileManager; |
||
11 | |||
12 | public class Handler implements Runnable { |
||
13 | private Peer parentPeer;
|
||
14 | private Chunk peerInformation;
|
||
15 | private BlockingQueue<Message> msgQueue; |
||
16 | private ScheduledExecutorService executor; |
||
17 | private int delayLimit; |
||
18 | |||
19 | private Random random; |
||
20 | |||
21 | public Handler(Peer parentPeer) { |
||
22 | this.parentPeer = parentPeer;
|
||
23 | this.peerInformation = parentPeer.getPeerInformation();
|
||
24 | msgQueue = new LinkedBlockingQueue<>(); |
||
25 | executor = Executors.newScheduledThreadPool(5); |
||
26 | delayLimit = 300;
|
||
27 | |||
28 | this.random = new Random(); |
||
29 | } |
||
30 | |||
31 | @Override
|
||
32 | public void run() { |
||
33 | Message msg; |
||
34 | |||
35 | while (true) { |
||
36 | try {
|
||
37 | msg = msgQueue.take(); |
||
38 | if (msg == null) { |
||
39 | System.out.println("Null Message Received"); |
||
40 | return;
|
||
41 | } |
||
42 | |||
43 | System.out.println("R: " + msg.toString()); |
||
44 | |||
45 | switch (msg.getType()) {
|
||
46 | case PUTCHUNK:
|
||
47 | Backup backup = new Backup(parentPeer, msg);
|
||
48 | executor.execute(backup); |
||
49 | break;
|
||
50 | case STORED:
|
||
51 | peerInformation.addChunkReplication(msg.getFileID(), msg.getChunkNo()); |
||
52 | break;
|
||
53 | case GETCHUNK:
|
||
54 | |||
55 | break;
|
||
56 | case CHUNK:
|
||
57 | |||
58 | break;
|
||
59 | case REMOVED:
|
||
60 | FileManager database = parentPeer.getDatabase(); |
||
61 | String fileID = msg.getFileID();
|
||
62 | int chunkNo = msg.getChunkNo();
|
||
63 | |||
64 | Chunk chunkInfo = database.getChunkInformation(fileID, chunkNo); |
||
65 | |||
66 | int perceivedReplication = database.getRecognizedReplication(fileID, chunkNo);
|
||
67 | int targetReplication = chunkInfo.getReplication();
|
||
68 | |||
69 | if (perceivedReplication < targetReplication) {
|
||
70 | byte[] chunkData = parentPeer.loadChunk(fileID, chunkNo); |
||
71 | |||
72 | executor.schedule( |
||
73 | new RemovedChunkHandler(parentPeer, chunkInfo, chunkData),
|
||
74 | this.random.nextInt(delayLimit + 1), |
||
75 | TimeUnit.MILLISECONDS
|
||
76 | ); |
||
77 | } |
||
78 | break;
|
||
79 | case DELETE:
|
||
80 | Delete delete = new Delete(parentPeer, msg);
|
||
81 | executor.execute(delete); |
||
82 | break;
|
||
83 | default:
|
||
84 | return;
|
||
85 | } |
||
86 | } catch (InterruptedException e) { |
||
87 | e.printStackTrace(); |
||
88 | } |
||
89 | } |
||
90 | } |
||
91 | |||
92 | |||
93 | public void pushMessage(byte[] data, int length) { |
||
94 | Message msgParsed = new Message(data, length);
|
||
95 | msgQueue.add(msgParsed); |
||
96 | } |
||
97 | } |