Project

General

Profile

Statistics
| Revision:

root / DistributedBackupService / src / handlers / Handler.java @ 1

History | View | Annotate | Download (2.2 KB)

1
package handlers;
2

    
3
import protocols.*;
4
import server.Peer;
5

    
6
import java.util.Random;
7
import java.util.concurrent.*;
8

    
9
import chunk.Chunk;
10
import chunk.FileManager;
11

    
12
public class Handler implements Runnable {
13
        private Peer parentPeer;
14
        private Chunk peerInformation;
15
        private BlockingQueue<Message> msgQueue;
16
        private ScheduledExecutorService executor;
17
        private int delayLimit;
18

    
19
        private Random random;
20

    
21
        public Handler(Peer parentPeer) {
22
                this.parentPeer = parentPeer;
23
                this.peerInformation = parentPeer.getPeerInformation();
24
                msgQueue = new LinkedBlockingQueue<>();
25
                executor = Executors.newScheduledThreadPool(5);
26
                delayLimit = 300;
27

    
28
                this.random = new Random();
29
        }
30

    
31
        @Override
32
        public void run() {
33
                Message msg;
34

    
35
                while (true) {
36
                        try {
37
                                msg = msgQueue.take();
38
                                if (msg == null) {
39
                                        System.out.println("Null Message Received");
40
                                        return;
41
                                }
42

    
43
                                System.out.println("R: " + msg.toString());
44

    
45
                                switch (msg.getType()) {
46
                                case PUTCHUNK:
47
                                        Backup backup = new Backup(parentPeer, msg);
48
                                        executor.execute(backup);
49
                                        break;
50
                                case STORED:
51
                                        peerInformation.addChunkReplication(msg.getFileID(), msg.getChunkNo());
52
                                        break;
53
                                case GETCHUNK:
54

    
55
                                        break;
56
                                case CHUNK:
57

    
58
                                        break;
59
                                case REMOVED:
60
                                        FileManager database = parentPeer.getDatabase();
61
                                        String fileID = msg.getFileID();
62
                                        int chunkNo = msg.getChunkNo();
63
                                        
64
                                        Chunk chunkInfo = database.getChunkInformation(fileID, chunkNo);
65

    
66
                                        int perceivedReplication = database.getRecognizedReplication(fileID, chunkNo);
67
                                        int targetReplication = chunkInfo.getReplication();
68

    
69
                                        if (perceivedReplication < targetReplication) {
70
                                                byte[] chunkData = parentPeer.loadChunk(fileID, chunkNo);
71

    
72
                                                executor.schedule(
73
                                                                new RemovedChunkHandler(parentPeer, chunkInfo, chunkData),
74
                                                                this.random.nextInt(delayLimit + 1),
75
                                                                TimeUnit.MILLISECONDS
76
                                                                );
77
                                        }
78
                                        break;
79
                                case DELETE:
80
                                        Delete delete = new Delete(parentPeer, msg);
81
                                        executor.execute(delete);
82
                                        break;
83
                                default:
84
                                        return;
85
                                }
86
                        } catch (InterruptedException e) {
87
                                e.printStackTrace();
88
                        }
89
                }
90
        }
91

    
92

    
93
        public void pushMessage(byte[] data, int length) {
94
                Message msgParsed = new Message(data, length);
95
                msgQueue.add(msgParsed);
96
        }
97
}