Project

General

Profile

Revision 15

View differences:

threads/ManagePutchunk.java
1
package threads;
2

  
3

  
4
import java.io.IOException;
5
import java.util.concurrent.TimeUnit;
6
import server.Server;
7

  
8
public class ManagePutchunk implements Runnable {
9
    
10
	private byte[] message;
11
    private int time;
12
    private String key;
13
    private int replicationDegree;
14
    private int tries;
15

  
16
    public ManagePutchunk(byte[] message, int time, String fileID, int chunkNr, int replicationDegree) {
17
        this.message = message;
18
        this.time = time;
19
        this.key = fileID + '.' + chunkNr;
20
        this.replicationDegree = replicationDegree;
21
        this.tries = 1;
22
    }
23

  
24
    @Override
25
    public void run() {
26

  
27
        int occurrences = Server.getStorage().getStoredTimes().get(this.key);
28

  
29
        if (occurrences < replicationDegree) { 
30
            try {
31
				Server.getmdb().sendMessage(message);
32
			} catch (IOException e) {
33
				e.printStackTrace();
34
			}
35
            System.out.println("Sent PUTCHUNK try: " + tries);
36
            System.out.println(key);
37
            this.time = 2 * this.time;
38
            this.tries++;
39
            
40
            if (this.tries < 5)
41
                Server.getThreadLauncher().schedule(this, this.time, TimeUnit.SECONDS);
42
              
43
            
44
            
45
            
46
        }
47
        
48
 
49

  
50
    }
51
}
threads/ManageRestored.java
1
package threads;
2
import java.io.*;
3
import java.util.*;
4
import server.Server;
5

  
6
public class ManageRestored implements Runnable {
7

  
8
    private String fileName;
9

  
10
    public ManageRestored(String fileName) {
11
        this.fileName = fileName;
12
    }
13

  
14
    @Override
15
    public void run() {
16

  
17
        if (!Server.getStorage().getRemainingChunks().containsValue("false")) {
18
            if (restoreFile())
19
                System.out.println("> File restored!\n");
20
            else System.out.println("ERROR: File not restored.\n");
21
        } else System.out.println("ERROR: File not restored, chunks missing.\n");
22

  
23
        Server.getStorage().getRemainingChunks().clear();
24
    }
25

  
26
    private boolean restoreFile() {
27
        String filePath = "database/" + Server.getServerId() + "/" + this.fileName;
28
        File file = new File(filePath);
29
        byte[] body;
30

  
31
        try {
32
            FileOutputStream fos = new FileOutputStream(file, true);
33

  
34
            if (!file.exists()) {
35
                file.getParentFile().mkdirs();
36
                file.createNewFile();
37
            }
38

  
39
            List<String> sortedChunkKeys = new ArrayList<>(Server.getStorage().getRemainingChunks().keySet());
40

  
41
            sortedChunkKeys.sort((o1, o2) -> {
42
                int chunkNr1 = Integer.valueOf(o1.split(".")[1]);
43
                int chunkNr2 = Integer.valueOf(o2.split(".")[1]);
44
                return Integer.compare(chunkNr1, chunkNr2);
45
            });
46

  
47
            for (String key : sortedChunkKeys) {
48
                String chunkPath = Server.getServerId() + "/" + key;
49

  
50
                File chunkFile = new File(chunkPath);
51

  
52
                if (!chunkFile.exists()) {
53
                    return false;
54
                }
55

  
56
                body = new byte[(int) chunkFile.length()];
57
                FileInputStream in = new FileInputStream(chunkFile);
58

  
59
                in.read(body);
60
                fos.write(body);
61

  
62
                chunkFile.delete();
63
                in.close();
64
            }
65
           
66
            fos.close();
67
            return true;
68
        } catch (IOException e) {
69
            e.printStackTrace();
70
        }
71
        return false;
72
    }
73
}
threads/ReceiveGetChunk.java
1

  
2
package threads;
3

  
4
import java.io.File;
5
import java.io.FileInputStream;
6
import java.io.FileOutputStream;
7
import java.io.IOException;
8
import java.nio.charset.StandardCharsets;
9
import java.util.Random;
10
import java.util.concurrent.TimeUnit;
11

  
12
import chunk.*;
13
import server.*;
14

  
15
public class ReceiveGetChunk implements Runnable {
16

  
17
    private String fileId;
18
    private int chunkNum;
19

  
20

  
21
	public ReceiveGetChunk( String fileId, int chunkNr) {
22
        this.fileId = fileId;
23
        this.chunkNum = chunkNr;
24
    }
25

  
26
	@Override
27
	public void run() {
28
		
29
		for (int i = 0; i < Server.getStorage().getSavedChunks().size(); i++) {
30
            if (isSameChunk(Server.getStorage().getSavedChunks().get(i).getfileId(), Server.getStorage().getSavedChunks().get(i).getNum()) && !isAbortSend()) {
31
                String header = "CHUNK " + "1.0" + " " + Server.getServerId() + " " + this.fileId + " " + this.chunkNum + "\r\n\r\n";
32

  
33
                try {
34
                    byte[] asciiHeader = header.getBytes(StandardCharsets.US_ASCII);
35

  
36
                    String chunkPath = "database/" + Server.getServerId() + "/" + fileId + "." + chunkNum;
37

  
38
                    File file = new File(chunkPath);
39
                    byte[] body = new byte[(int) file.length()];
40
                    FileInputStream in = new FileInputStream(file);
41
                    in.read(body);
42

  
43
                    byte[] message = new byte[asciiHeader.length + body.length];
44
                    System.arraycopy(asciiHeader, 0, message, 0, asciiHeader.length);
45
                    System.arraycopy(body, 0, message, asciiHeader.length, body.length);
46

  
47
                    SendMessage sendThread = new SendMessage(message, "mdr");
48
                    System.out.println("Sent" + "CHUNK " + "1.0" + " " + Server.getServerId() + " " + this.fileId + " " + this.chunkNum);
49
                    Random random = new Random();
50
                    Server.getThreadLauncher().schedule(sendThread, random.nextInt(401), TimeUnit.MILLISECONDS);
51
                } catch (IOException e) {
52
                    e.printStackTrace();
53
                }
54

  
55
            }
56

  
57
    }
58
}
59
    private boolean isSameChunk(String fileId, int chunkNr) {
60
        return fileId.equals(this.fileId) && chunkNr == this.chunkNum;
61
    }
62

  
63
    private boolean isAbortSend() {
64
        for (int i = 0; i < Server.getStorage().getReceivedChunks().size(); i++) {
65
            if (isSameChunk(Server.getStorage().getReceivedChunks().get(i).getfileId(), Server.getStorage().getReceivedChunks().get(i).getNum()))
66
                return true;
67
        }
68
        return false;
69
}
70
}
threads/ReceivePutchunk.java
1
package threads;
2

  
3
import java.io.File;
4
import java.io.FileOutputStream;
5
import java.io.IOException;
6
import chunk.*;
7
import server.*;
8

  
9
public class ReceivePutchunk implements Runnable {
10

  
11
	private Double version;
12
	private String fileId;
13
	private int chunkNr;
14
	private int replicationDegree;
15
	private byte[] content;
16

  
17
	public ReceivePutchunk(double version, String fileId, int chunkNr, int replicationDegree, byte[] content) {
18
		this.version = version;
19
		this.fileId = fileId;
20
		this.chunkNr = chunkNr;
21
		this.replicationDegree = replicationDegree;
22
		this.content = content;
23
	}
24

  
25
	@Override
26
	public void run() {
27
		
28
		String key = fileId + "." + chunkNr;
29

  
30
		for (int i = 0; i < Server.getStorage().getFiles().size(); i++) {
31
			if (Server.getStorage().getFiles().get(i).getFileId().equals(fileId))
32
				return;
33
		}
34

  
35
		if (version == 2.0) {
36
			if (Server.getStorage().getStoredTimes().get(key) >= replicationDegree)
37
				return;
38

  
39
			if (Server.getStorage().getSpaceAvailable() >= content.length) {
40
				Chunk chunk = null;
41
				try {
42
					chunk = new Chunk(chunkNr, fileId, replicationDegree, content.length);
43
				} catch (IOException e1) {
44
					e1.printStackTrace();
45
				}
46

  
47
				if (!Server.getStorage().addSavedChunk(chunk)) {
48
					return;
49
				}
50

  
51
				Server.getStorage().decSpaceAvailable(content.length);
52

  
53
            //creates the file and saves the chunk
54
            try {
55
                String filename = Server.getServerId() + "/" + fileId + "." + chunkNr;
56

  
57
					File file = new File(filename);
58
					if (!file.exists()) {
59
						file.getParentFile().mkdirs();
60
						file.createNewFile();
61
					}
62

  
63
					try (FileOutputStream fos = new FileOutputStream(filename)) {
64
						fos.write(content);
65
					}
66

  
67
				} catch (IOException e) {
68
					e.printStackTrace();
69
				}
70

  
71
				Server.getStorage().incStoredOccurrences(fileId, chunkNr);
72
				String header = "STORED " + "1.0" + " " + Server.getServerId() + " " + fileId + " " + chunkNr
73
						+ "\r\n\r\n";
74
				System.out.println(
75
						"Sent " + "STORED " + "1.0" + " " + Server.getServerId() + " " + fileId + " " + chunkNr);
76
				try {
77
					Server.getmc().sendMessage(header.getBytes());
78
				} catch (IOException e) {
79
					e.printStackTrace();
80
				}
81
			} else {
82
				System.out.println("Not enough space in this peer " + fileId + "_" + chunkNr);
83
			}
84

  
85
		}
86
	}
87
}
threads/SendMessage.java
1
package threads;
2

  
3
import java.io.IOException;
4
import server.Server;
5
import multicastchannels.*;
6

  
7
public class SendMessage implements Runnable {
8
	private byte[] message;
9
	private String channel;
10

  
11
	public SendMessage(byte[] message, String channel) {
12
		this.message = message;
13
		this.channel = channel;
14

  
15
	}
16

  
17
	@Override
18
	public void run() {
19
		switch (channel) {
20

  
21
		case "mc":
22
			try {
23
				Server.getmc().sendMessage(message);
24
			} catch (IOException e) {
25
				e.printStackTrace();
26
			}
27
			break;
28

  
29
		case "mdb":
30
			try {
31
				Server.getmdb().sendMessage(message);
32
			} catch (IOException e) {
33
				e.printStackTrace();
34
			}
35
			break;
36

  
37
		case "mdr":
38
			try {
39
				Server.getmdr().sendMessage(message);
40
			} catch (IOException e) {
41
				e.printStackTrace();
42
			}
43
			break;
44

  
45
		}
46
	}
47
}
threads/messageManagement.java
1
package threads;
2

  
3
import java.util.ArrayList;
4
import server.Server;
5
import java.util.Arrays;
6
import java.util.List;
7
import java.util.Random;
8
import java.util.concurrent.TimeUnit;
9

  
10
public class messageManagement implements Runnable{
11
	private byte[] msg;
12

  
13
	public messageManagement(byte[] msg) {
14
		this.msg = msg;
15
		
16
	}
17

  
18
	@Override
19
	public void run() {
20
		
21
		List<byte[]> HB = getHeaderNBody();
22
		byte[] header = HB.get(0);
23
		byte[] body = HB.get(1);
24
		
25
		String message = new String(this.msg, 0, this.msg.length);
26
		String[] newMessage = message.split(" ");
27
		
28
		String messageType = newMessage[0];
29
		Double version = Double.parseDouble(newMessage[1]);
30
		int senderID = Integer.parseInt(newMessage[2]);
31
		String fileID = newMessage[3];
32
		int chunkNr = Integer.parseInt(newMessage[4]);
33
		int replicationDegree = Integer.parseInt(newMessage[5]);
34
		
35
		
36
		//removeBlanks(newMessage);	
37
		
38
		
39
       
40
		switch (newMessage[0]) {
41
        case "PUTCHUNK":
42
            putchunkOperation(version, fileID, chunkNr, replicationDegree, body);
43
            break;
44
        case "GETCHUNK":
45
            getchunkOperation(version, senderID, fileID, chunkNr);
46
            break;
47
        case "DELETE":
48
            deleteOperation(version, senderID, fileID);
49
            break;
50
        case "REMOVED":
51
            removedOperation(version, senderID, fileID, chunkNr);
52
            break;
53
        case "STORED":
54
            storedOperation(version, senderID, fileID, chunkNr);
55
            break;
56
		
57
	}
58

  
59
}
60

  
61
	private void deleteOperation(Double version, int senderID, String fileID) {
62
		if (Integer.parseInt(fileID) != senderID) { 
63
            Server.getStorage().deleteStoredChunks(fileID);
64
            System.out.println("Received DELETE " + version + " " + senderID + " " + fileID);
65
        }		
66
	}
67

  
68
	private void removedOperation(Double version, int senderID, String fileID, int chunkNr) {
69
	}
70

  
71
	private void getchunkOperation(Double version, int senderID, String fileID, int chunkNr) {
72
		if (Integer.parseInt(fileID) != senderID) {           
73
			Random random = new Random();
74
            System.out.println("Received GETCHUNK " + version + " " + senderID + " " + fileID + " " + chunkNr);
75
            Server.getThreadLauncher().schedule(new ReceiveGetChunk(fileID, chunkNr), random.nextInt(401), TimeUnit.MILLISECONDS);
76
		}
77
		
78
	}
79

  
80
	private void storedOperation(Double version, int senderID, String fileID, int chunkNr) {
81
		if (Integer.parseInt(fileID) != senderID) {           
82
            Server.getStorage().incStoredOccurrences(fileID, chunkNr); 
83
            System.out.println("Received STORED " + version + " " + senderID + " " + fileID + " " + chunkNr);
84
        }
85
		
86
	}
87

  
88
	private void putchunkOperation(Double version, String fileID, int chunkNr, int replicationDegree, byte[] body) {
89
		Random r = new Random();
90
		System.out.println("Received PUTCHUNK " + version + " " + fileID + " " + chunkNr);
91
		Server.getThreadLauncher().schedule(new ReceivePutchunk(version, fileID, chunkNr, replicationDegree, body), r.nextInt(401), TimeUnit.MILLISECONDS);
92

  
93
	}
94

  
95
	private void removeBlanks(String[] newMessage) {
96
		for(int i=0; i<newMessage.length; i++) {
97
			newMessage[i] = newMessage[i].replaceAll("\\s","" );
98
		}
99
		
100
	}
101

  
102
	private List<byte[]> getHeaderNBody(){
103
		
104
		int h = this.msg.length -4;
105
		
106
		byte[] header = Arrays.copyOfRange(this.msg, 0 ,h);
107
		byte[] body = Arrays.copyOfRange(this.msg, h, this.msg.length);
108
		
109
		List<byte[]> headerNBody = new ArrayList<>();
110

  
111
        headerNBody.add(header);
112
        headerNBody.add(body);
113

  
114
        return headerNBody;
115
	}
116

  
117
}

Also available in: Unified diff