Project

General

Profile

Revision 1

Imported DistributedBackupService

View differences:

DistributedBackupService/delete.sh
1
#!/bin/sh
2
java -classpath bin client.TestApp 1 "DELETE" "files/dog.png"
0 3

  
DistributedBackupService/rmi.sh
1
cd bin/
2
rmiregistry
0 3

  
DistributedBackupService/src/chunk/Chunk.java
1
package chunk;
2

  
3
import java.util.HashSet;
4
import java.util.Set;
5
import java.util.concurrent.ConcurrentHashMap;
6
import java.util.concurrent.ConcurrentMap;
7
import java.util.concurrent.atomic.AtomicIntegerArray;
8

  
9
public class Chunk {
10

  
11
	private ConcurrentMap<String, AtomicIntegerArray> chunkReplication;
12

  
13
	private String fileID;
14
	private int chunkNo;
15
	private int replication;
16
	private byte[] data;
17
	private int size;
18
	private Set< Integer > chunks;
19

  
20
	public Chunk(String fileID, int chunkNo, byte[] data) {
21
		this.fileID = fileID;
22
		this.chunkNo = chunkNo;
23
		this.data = data;
24
	}
25

  
26
	public Chunk(String fileID, int chunkNo, int replication, byte[] data) {
27
		this(fileID, chunkNo, data);
28
		this.replication = replication;
29
	}
30

  
31
	public Chunk(Chunk chunkData, byte[] data) {
32
		this(chunkData.getFileID(), chunkData.getChunkNo(), chunkData.getReplication(), data);
33
	}
34

  
35
	public Chunk(int chunkNo, int replication) {
36
		this.chunkNo = chunkNo;
37
		this.replication = replication;
38
		this.chunks = new HashSet<>();
39
	}
40

  
41
	public Chunk(String fileID, int chunkNo, int replication, int size) {
42
		this(chunkNo, replication);
43
		this.fileID = fileID;
44
		this.size = size;
45
	}
46

  
47
	// Peer information
48
	public Chunk() {
49
		chunkReplication = new ConcurrentHashMap<>();
50
	}
51

  
52
	public void resetChunkReplication(String fileID) {
53
		chunkReplication.remove(fileID);
54
	}
55

  
56
	public void startChunkReplication(String fileID, int numChunks) {
57
		chunkReplication.putIfAbsent(fileID, new AtomicIntegerArray(numChunks));
58
	}
59

  
60
	public Integer addChunkReplication(String fileID, int chunkNo) {
61
		if (!chunkReplication.containsKey(fileID))
62
			return null;
63

  
64
		int replication = chunkReplication.get(fileID).addAndGet(chunkNo, 1);
65
		System.out.println("Incrementing replication of " + fileID + "/" + chunkNo + " to " + replication);
66
		return replication;
67
	}
68

  
69
	public int getChunkReplication(String fileID, int chunkNo) {
70
		return chunkReplication.get(fileID).get(chunkNo);
71
	}
72

  
73
	public AtomicIntegerArray getChunkReplication(String fileID) {
74
		return chunkReplication.get(fileID);
75
	}
76

  
77
	// Getters and setters 
78

  
79
	public String getFileID() { return fileID; }
80

  
81
	public int getChunkNo() { return chunkNo; }
82

  
83
	public int getReplication() { return replication; }
84

  
85
	public byte[] getData() { return data; }
86

  
87
	public boolean removeChunk(Integer peerID) { return chunks.remove(peerID); }
88

  
89
	public boolean addChunk(Integer peerID) { return chunks.add(peerID); }
90

  
91
	public int getNumChunks() { return chunks.size(); }
92

  
93
	public int getSize() { return size; }
94

  
95
	public Set<Integer> getChunks() { return chunks; }
96

  
97
	public void setFileID(String fileID) { this.fileID = fileID; }
98

  
99
	public void setChunkNo(int chunkNo) { this.chunkNo = chunkNo; }
100

  
101
	public void setReplication(int replication) { this.replication = replication; }
102

  
103
	public void setData(byte[] data) { this.data = data; }
104

  
105
	public void setSize(int size) { this.size = size; }
106

  
107
	public void setChunks(Set<Integer> mirrors) { this.chunks = mirrors; }
108

  
109
}
0 110

  
DistributedBackupService/src/chunk/FileManager.java
1
package chunk;
2

  
3
import java.io.File;
4
import java.io.FileInputStream;
5
import java.io.IOException;
6
import java.io.ObjectInputStream;
7
import java.io.Serializable;
8
import java.util.HashMap;
9
import java.util.Map;
10
import java.util.Set;
11
import java.util.concurrent.ConcurrentHashMap;
12
import java.util.concurrent.ConcurrentMap;
13

  
14
@SuppressWarnings("serial")
15
public class FileManager implements Serializable {
16

  
17
	private ConcurrentMap<String, FileManager> fileRepository;
18
	private ConcurrentMap<String, ConcurrentMap<Integer, Chunk>> chunkRepository;
19

  
20
	public FileManager() {
21
		fileRepository = new ConcurrentHashMap<>();
22
		chunkRepository = new ConcurrentHashMap<>();
23
	}
24

  
25
	private String fileID; 
26
	private String pathName;
27
	private String fileName;
28
	private int numChunks;
29
	private int wantedReplication;
30
	private HashMap<String, Chunk> chunkMap;
31

  
32
	public FileManager( File file, String fileID, int replicationDegree, HashMap<String, Chunk> chunksInfo ) {
33
		this.fileID = fileID;
34
		this.fileName = file.getName();
35
		this.pathName = file.getPath();
36
		this.numChunks = chunksInfo.size();
37
		this.wantedReplication = replicationDegree;
38
		this.chunkMap = chunksInfo;
39
	}
40

  
41
	public String getFileID() {
42
		return fileID;
43
	}
44

  
45
	public int getNumChunks() {
46
		return numChunks;
47
	}
48

  
49
	public String getPathname() {
50
		return pathName;
51
	}
52

  
53
	public String getFileName() {
54
		return fileName;
55
	}
56

  
57
	public int getWantedReplication() {
58
		return wantedReplication;
59
	}
60

  
61
	public HashMap<String, Chunk> getChunks() {
62
		return chunkMap;
63
	}
64

  
65
	synchronized public static FileManager loadDatabase(File file) throws ClassNotFoundException {
66
		FileManager db = null;
67

  
68
		try {
69
			final ObjectInputStream inputStream = new ObjectInputStream(new FileInputStream(file));
70
			db = (FileManager) inputStream.readObject();
71
			inputStream.close();
72
		} catch (final IOException pE) {
73
			pE.printStackTrace();
74
		}
75
		return db;
76
	}
77

  
78
	public void addFile( String pathName, FileManager fileInfo ) {
79
		fileRepository.put(pathName, fileInfo);
80
	}
81

  
82
	public void removeFile( String pathName ) {
83
		fileRepository.remove(pathName);
84
	}
85

  
86
	public FileManager getFileInformation( String pathName) 
87
	{
88
		return fileRepository.get( pathName);
89
	}
90

  
91
	public void addChunk( Chunk chunk ) 
92
	{
93
		
94
		String fileID = chunk.getFileID();
95
		int chunkNo = chunk.getChunkNo();
96

  
97
		ConcurrentMap<Integer, Chunk> fileChunks;
98
		fileChunks = chunkRepository.getOrDefault(fileID, new ConcurrentHashMap<>());
99
		fileChunks.putIfAbsent(chunkNo, chunk);
100

  
101
		chunkRepository.putIfAbsent(fileID, fileChunks);
102

  
103
	}
104
 
105
	public Chunk getChunkInformation( String fileID, int chunkNo ) 
106
	{
107
		Map<Integer, Chunk> fileChunks = chunkRepository.get(fileID);
108

  
109
		return fileChunks != null ? fileChunks.get(chunkNo) : null;
110
	}
111

  
112
	public void deleteChunk( String fileID, int chunkNo) 
113
	{
114
		if (!chunkRepository.containsKey(fileID))
115
			return;
116

  
117
		chunkRepository.get(fileID).remove(chunkNo);
118
	}
119

  
120
	public void deleteFileBackedUp( String fileID) 
121
	{
122
		if (!chunkRepository.containsKey( fileID))
123
			return;
124

  
125
		chunkRepository.remove(fileID);
126
	}
127

  
128
	public int getNumChunks(String pathname) {
129
		return fileRepository.get(pathname).getNumChunks();
130
	}
131

  
132
	public Integer getRecognizedReplication(String fileID, int chunkNo) 
133
	{
134
		int ret;
135
		try 
136
		{	
137
			ret = chunkRepository.get(fileID).get(chunkNo).getNumChunks();
138
		} catch (NullPointerException e) {
139
			return null;
140
		}
141
		return ret;
142
	}
143

  
144
	public boolean chunkExists(String fileID) {
145
		return chunkRepository.containsKey(fileID);
146
	}
147

  
148
	public Set<Integer> getFileChunksKey(String fileID) {
149
		return chunkRepository.get(fileID).keySet();
150
	}
151

  
152
}
0 153

  
DistributedBackupService/src/chunk/FolderManager.java
1
package chunk;
2

  
3
import java.io.*;
4
import java.util.ArrayList;
5
import chunk.FileManager;
6
import server.Peer;
7
import utils.Utils;
8

  
9
import static java.util.Arrays.copyOfRange;
10

  
11
public class FolderManager {
12
	private static long maxMemory;
13
	private static long usedMemory;
14
	private Peer peer;
15
	private String path;
16
	private FileManager fileManager;
17
	private static int chunkSizeLimit;
18

  
19
	public FolderManager(Peer peer, long maxMemory) throws ClassNotFoundException {
20
		this.peer = peer;
21
		FolderManager.maxMemory = maxMemory;
22

  
23
		chunkSizeLimit = 64000;
24
		usedMemory = 0;
25
		path = "results/peer_" + peer.getID() + "/";
26

  
27
		File db = new File(path + "db");
28

  
29
		if (db.exists()) {
30
			fileManager = FileManager.loadDatabase(db);
31
		} else {
32
			fileManager = new FileManager();
33
		}
34
		makeDir(path + Utils.CHUNKS);
35
	}
36

  
37
	public static void makeDir(String name) {
38
		File file = new File(name);
39
		file.mkdirs();
40
	}
41

  
42
	synchronized public static boolean backupFile(String name, String path, byte[] information) throws IOException {
43
		if (getFreeMemory() < information.length) {
44
			System.out.println("Without space to backup file");
45
			return false;
46
		}
47
		FileOutputStream out;
48
		out = new FileOutputStream(path + "/" + name);
49
		out.write(information);
50
		out.close();
51

  
52
		return FolderManager.incrUsedMemory(information.length);
53
	}
54

  
55
	synchronized public static byte[] loadFile(File file) throws IOException {
56
		FileInputStream inStream;
57
		byte[] information;
58

  
59
		inStream = new FileInputStream(file);
60
		information = new byte[(int) file.length()];
61

  
62
		inStream.read(information);
63
		inStream.close();
64

  
65
		return information;
66
	}
67

  
68
	public static ArrayList<Chunk> loadChunks(String path, int chunksNumber) throws IOException {
69
		ArrayList<Chunk> chunks = new ArrayList<>();
70

  
71
		for (int i = 0; i <= chunksNumber; i++) {
72
			byte[] information = loadFile(new File(path + "/" + i));
73
			Chunk chunk = new Chunk("", i, 1, information);
74
			chunks.add(chunk);
75
		}
76

  
77
		return chunks;
78
	}
79

  
80
	public static ArrayList<Chunk> fileSplit(byte[] fileInformation, String fileID, int replication) {
81
		ArrayList<Chunk> chunks;
82
		int numChunks;
83

  
84
		numChunks = fileInformation.length / chunkSizeLimit + 1;
85
		chunks = new ArrayList<>();
86

  
87
		for (int i = 0; i < numChunks; i++) 
88
		{
89
			byte[] chunkinformation;
90

  
91
			if (i == numChunks - 1) 
92
			{
93
				int leftOverBytes = fileInformation.length - (i * chunkSizeLimit);
94
				chunkinformation = copyOfRange(fileInformation, i * chunkSizeLimit, i * chunkSizeLimit + leftOverBytes);
95
			} 
96
			else if (i == numChunks - 1 && fileInformation.length % chunkSizeLimit == 0) 
97
			{
98
				chunkinformation = new byte[0];
99
			} 
100
			else 
101
			{
102
				chunkinformation = copyOfRange(fileInformation, i * chunkSizeLimit, i * chunkSizeLimit + chunkSizeLimit);
103
			}
104

  
105
			Chunk chunk = new Chunk(fileID, i, replication, chunkinformation);
106
			chunks.add(chunk);
107
		}
108

  
109
		return chunks;
110
	}
111

  
112

  
113
	public String getChunkPath(String fileID, int chunkNo) {
114
		return getChunksPath() + fileID + "/" + chunkNo;
115
	}
116

  
117
	public byte[] loadChunk(String fileID, int chunkNo) {
118
		byte[] chunkinformation = null;
119
		String chunkPath = getChunksPath() + fileID + "/" + chunkNo;
120

  
121
		try {
122
			chunkinformation = loadFile(new File(chunkPath));
123
		} catch (IOException e) {
124
			e.printStackTrace();
125
		}
126

  
127
		return chunkinformation;
128
	}
129

  
130

  
131
	public String getRootPath() {
132
		return path;
133
	}
134

  
135
	public String getChunksPath() {
136
		return path + Utils.CHUNKS;
137
	}
138

  
139
	public String getRestoresPath() {
140
		return path + Utils.RESTORES;
141
	}
142

  
143
	public FileManager getDatabase() {
144
		return fileManager;
145
	}
146

  
147
	public void deleteChunk(String fileID, int chunkNo) {
148
		String chunkPath = getChunkPath(fileID, chunkNo);
149
		File file = new File(chunkPath);
150

  
151
		long chunkSize = file.length();
152
		file.delete();
153
		decUsedMemory(chunkSize);
154
		fileManager.deleteChunk(fileID, chunkNo);
155
	}
156

  
157
	public static long getMaxMemory() {
158
		return maxMemory;
159
	}
160

  
161
	public static void setMaxMemory(int maxMemory) {
162
		FolderManager.maxMemory = maxMemory;
163
	}
164

  
165
	public static long getUsedMemory() {
166
		return FolderManager.usedMemory;
167
	}
168

  
169
	public static long getFreeMemory() {
170
		return maxMemory - usedMemory;
171
	}
172

  
173
	private static void decUsedMemory(long n) {
174
		usedMemory -= n;
175
		if (usedMemory < 0) {
176
			usedMemory = 0;
177
			System.out.println("Used memory went below 0");
178
		}
179
	}
180

  
181
	private static boolean incrUsedMemory(long n) {
182
		if (usedMemory + n > maxMemory) {
183
			System.out.println("Tried to surpass memory restrictions");
184
			return false;
185
		}
186
		usedMemory += n;
187
		System.out.println("Used memory: " + usedMemory + " / " + maxMemory);
188
		return true;
189
	}
190
}
0 191

  
DistributedBackupService/src/client/TestApp.java
1
package client;
2

  
3
import java.io.File;
4
import java.rmi.RemoteException;
5
import java.rmi.registry.LocateRegistry;
6
import java.rmi.registry.Registry;
7
import java.util.HashMap;
8
import java.util.Map;
9

  
10
import server.InitiatorPeer;
11

  
12
public class TestApp implements Runnable {
13

  
14
	private InitiatorPeer initiatorPeer;
15
	
16
    private String peer_ap;
17
    private String sub_protocol;
18
    private String opnd_1;
19
    private String opnd_2;
20

  
21
    private Map<String, Runnable> protocols;
22
    
23

  
24
    public static void main(String[] args) {
25
        if (args.length < 2 || args.length > 4) {
26
        	System.out.println("Usage: <java TestApp <peer_ap> <sub_protocol> <opnd_1> <opnd_2> >");
27
			System.out.println("<peer_ap> - peer access point");
28
			System.out.println("<sub_protocol> - BACKUP, RESTORE, DELETE, RECLAIM");
29
			System.out.println("<opnd_1> - path name of the file to backup/restore/delete");
30
			System.out.println("<opnd_2> - replication degree");
31
			System.exit(1);
32
        }
33
        
34
        String peer_ap = args[0];
35
        String sub_protocol = args[1];
36
        String opnd_1;
37
        String opnd_2;
38
        
39
        if(args.length > 2) 
40
        	opnd_1 = args[2];
41
        else
42
        	opnd_1 = null;
43
        
44
        if(args.length > 3) 
45
        	opnd_2 = args[3];
46
        else
47
        	opnd_2 = null;
48

  
49
        TestApp app = new TestApp(peer_ap, sub_protocol, opnd_1, opnd_2);
50
        new Thread(app).start();
51
    }
52

  
53
    public TestApp(String peer_ap, String sub_protocol, String opnd_1, String opnd_2) {
54
    	protocols = new HashMap<>();
55
    	
56
        this.peer_ap = peer_ap;
57
        this.sub_protocol = sub_protocol;
58
        this.opnd_1 = opnd_1;
59
        this.opnd_2 = opnd_2;
60

  
61
        protocols.put("BACKUP", this::handleBackup);
62
        protocols.put("DELETE", this::handleDelete);
63

  
64
    }
65

  
66
    @Override
67
    public void run() {
68
    	try {
69
            Registry registry = LocateRegistry.getRegistry(null);
70
            initiatorPeer = (InitiatorPeer) registry.lookup(peer_ap);
71
        } catch (Exception e) {
72
        	System.out.println("Error when opening RMI stub");
73
            e.printStackTrace();
74
        }
75
        protocols.get(sub_protocol).run();
76
    }
77
    
78
    private void handleBackup() {
79
        File file = new File(this.opnd_1);
80
        System.out.println("--> BACKUP :: Saving chunks at " + file.getAbsolutePath() + "\"");
81
        try {
82
            initiatorPeer.backup(file, Integer.parseInt(this.opnd_2));
83
        } catch (RemoteException e) {
84
        	e.printStackTrace();
85
        }
86
    }
87

  
88
    private void handleDelete() {
89
    	System.out.println("--> DELETE :: delete file: " + opnd_1 + "\"");
90
        try {
91
            initiatorPeer.delete(this.opnd_1);
92
        } catch (RemoteException e) {
93
        	e.printStackTrace();
94
        }
95
    }
96
}
0 97

  
DistributedBackupService/src/handlers/BackupChunkHandler.java
1
package handlers;
2

  
3
import server.Peer;
4
import sockets.Socket;
5
import utils.Utils;
6

  
7
import java.io.IOException;
8
import java.util.concurrent.atomic.AtomicIntegerArray;
9

  
10
import chunk.Chunk;
11
import protocols.BackupChunk;
12

  
13
public class BackupChunkHandler implements Runnable {
14

  
15
    private final String protocolVersion;
16
    private Peer parentPeer;
17
    private Chunk chunk;
18
    private AtomicIntegerArray chunkReplication;
19
    private int retries = 5;
20

  
21
    public BackupChunkHandler(BackupChunk backupInitiator, Chunk chunk) {
22
        this.chunk = chunk;
23
        this.parentPeer = backupInitiator.getPeer();
24
        this.protocolVersion = backupInitiator.getVersion();
25
        this.chunkReplication = parentPeer.getPeerInformation().getChunkReplication(chunk.getFileID());
26
    }
27

  
28
    BackupChunkHandler(Peer parentPeer, Chunk chunk) {
29
        this.chunk = chunk;
30
        this.parentPeer = parentPeer;
31
        this.protocolVersion = Utils.VERSION;
32
        this.chunkReplication = null;
33
    }
34

  
35
    @Override
36
    public void run() {
37

  
38
        int waitTime = 1000;
39
        Message msg = generatePutChunkMsg(chunk, protocolVersion);
40

  
41
        for (int i = 0; i < retries; ++i) {
42
            if (isDesiredReplicationDegree()) {
43
            	System.out.println("Achieved desired replication at i=" + i);
44
                break;
45
            }
46

  
47
            try {
48
                parentPeer.sendMsg(Socket.SocketType.MDB, msg);
49
            } catch (IOException e) {
50
            	System.out.println(e.getMessage());
51
            }
52

  
53
            sleep(waitTime);
54
            waitTime *= 2;
55
        }
56
    }
57

  
58
    protected boolean isDesiredReplicationDegree() {
59
        return chunkReplication != null && chunkReplication.get(chunk.getChunkNo()) >= chunk.getReplication();
60
    }
61

  
62
    private void sleep(int waitTime) {
63
        try {
64
            Thread.sleep(waitTime);
65
        } catch (InterruptedException e) {
66
            e.printStackTrace();
67
        }
68
    }
69

  
70
    private Message generatePutChunkMsg(Chunk chunk, String protocolVersion) {
71
        String[] args = { protocolVersion, Integer.toString(parentPeer.getID()), chunk.getFileID(), Integer.toString(chunk.getChunkNo()), Integer.toString(chunk.getReplication())
72
        };
73

  
74
        return new Message(Utils.MessageType.PUTCHUNK, args, chunk.getData());
75
    }
76

  
77
}
0 78

  
DistributedBackupService/src/handlers/Handler.java
1
package handlers;
2

  
3
import protocols.*;
4
import server.Peer;
5

  
6
import java.util.Random;
7
import java.util.concurrent.*;
8

  
9
import chunk.Chunk;
10
import chunk.FileManager;
11

  
12
public class Handler implements Runnable {
13
	private Peer parentPeer;
14
	private Chunk peerInformation;
15
	private BlockingQueue<Message> msgQueue;
16
	private ScheduledExecutorService executor;
17
	private int delayLimit;
18

  
19
	private Random random;
20

  
21
	public Handler(Peer parentPeer) {
22
		this.parentPeer = parentPeer;
23
		this.peerInformation = parentPeer.getPeerInformation();
24
		msgQueue = new LinkedBlockingQueue<>();
25
		executor = Executors.newScheduledThreadPool(5);
26
		delayLimit = 300;
27

  
28
		this.random = new Random();
29
	}
30

  
31
	@Override
32
	public void run() {
33
		Message msg;
34

  
35
		while (true) {
36
			try {
37
				msg = msgQueue.take();
38
				if (msg == null) {
39
					System.out.println("Null Message Received");
40
					return;
41
				}
42

  
43
				System.out.println("R: " + msg.toString());
44

  
45
				switch (msg.getType()) {
46
				case PUTCHUNK:
47
					Backup backup = new Backup(parentPeer, msg);
48
					executor.execute(backup);
49
					break;
50
				case STORED:
51
					peerInformation.addChunkReplication(msg.getFileID(), msg.getChunkNo());
52
					break;
53
				case GETCHUNK:
54

  
55
					break;
56
				case CHUNK:
57

  
58
					break;
59
				case REMOVED:
60
					FileManager database = parentPeer.getDatabase();
61
					String fileID = msg.getFileID();
62
					int chunkNo = msg.getChunkNo();
63
					
64
					Chunk chunkInfo = database.getChunkInformation(fileID, chunkNo);
65

  
66
					int perceivedReplication = database.getRecognizedReplication(fileID, chunkNo);
67
					int targetReplication = chunkInfo.getReplication();
68

  
69
					if (perceivedReplication < targetReplication) {
70
						byte[] chunkData = parentPeer.loadChunk(fileID, chunkNo);
71

  
72
						executor.schedule(
73
								new RemovedChunkHandler(parentPeer, chunkInfo, chunkData),
74
								this.random.nextInt(delayLimit + 1),
75
								TimeUnit.MILLISECONDS
76
								);
77
					}
78
					break;
79
				case DELETE:
80
					Delete delete = new Delete(parentPeer, msg);
81
					executor.execute(delete);
82
					break;
83
				default:
84
					return;
85
				}
86
			} catch (InterruptedException e) {
87
				e.printStackTrace();
88
			}
89
		}
90
	}
91

  
92

  
93
	public void pushMessage(byte[] data, int length) {
94
		Message msgParsed = new Message(data, length);
95
		msgQueue.add(msgParsed);
96
	}
97
}
0 98

  
DistributedBackupService/src/handlers/Message.java
1
package handlers;
2

  
3
import utils.Utils;
4
import utils.Utils.MessageType;
5

  
6
import java.io.*;
7

  
8
public class Message {
9

  
10
	private int argsNo;
11

  
12
	private int chunkNo;
13
	private int senderID;
14
	private String fileID;
15
	private String version;
16
	private int replication;
17
	private MessageType msgType;
18

  
19
	private byte [] body;
20

  
21
	public Message(byte[] information, int length) { 
22
		String header;
23
		String headerWithoutStuff;
24
		String [] splittedHeader;
25

  
26
		header= readHeader(information);
27

  
28
		headerWithoutStuff = header.trim().replaceAll("\\s+", " ");
29
		splittedHeader = headerWithoutStuff.split("\\s+");
30
		translateHeader(splittedHeader);
31

  
32
		if (msgType == MessageType.CHUNK)
33
			this.body = readBody(information, header.length(), length);
34

  
35
		if(msgType == MessageType.PUTCHUNK) 
36
			this.body = readBody(information, header.length(), length);
37
	}
38

  
39
	public Message(MessageType type, String[] args) {
40
		this.msgType = type;
41
		version = args[0];
42
		senderID = Integer.parseInt(args[1]);
43
		fileID = args[2];
44

  
45
		if (type != MessageType.DELETE)
46
			chunkNo = Integer.parseInt(args[3]);
47

  
48
		if (type == MessageType.PUTCHUNK) {
49
			replication = Integer.parseInt(args[4]);
50
		}
51
	}
52

  
53
	public Message(MessageType type, String[] args, byte[] information) {
54
		this(type, args);
55
		body = information;
56
	}
57

  
58
	private String readHeader(byte[] information) {
59
		ByteArrayInputStream stream;
60
		BufferedReader reader;
61
		String header = "";
62

  
63
		stream = new ByteArrayInputStream(information);
64
		reader = new BufferedReader(new InputStreamReader(stream));
65

  
66
		try {
67
			header = reader.readLine();
68
		} catch (IOException e) {
69
			e.printStackTrace();
70
		}
71
		return header;
72
	}
73

  
74
	private byte[] readBody(byte[] information, int headerLength, int informationLength) {
75
		int readBytes;
76
		ByteArrayInputStream message;
77
		byte[] bodyContent;
78

  
79
		readBytes = informationLength - headerLength - 4;
80
		message = new ByteArrayInputStream(information, headerLength + 4, readBytes);
81
		bodyContent = new byte[readBytes];
82

  
83
		message.read(bodyContent, 0, readBytes);
84

  
85
		return bodyContent;
86
	}
87

  
88
	private void translateHeader(String[] headerSplit) {
89

  
90
		switch (headerSplit[0]) {
91
		case "PUTCHUNK": 
92
		{
93
			msgType = MessageType.PUTCHUNK;
94
			argsNo = 6;
95
		} break;
96
		case "STORED":
97
		{
98
			msgType = MessageType.STORED;
99
			argsNo = 5;
100
		}
101
		break;
102
		case "GETCHUNK":
103
		{	
104
			msgType = MessageType.GETCHUNK;
105
			argsNo = 5;
106
		} break;
107
		case "CHUNK":
108
		{	msgType = MessageType.CHUNK;
109
		argsNo = 5;
110
		} break;
111
		case "DELETE":
112
		{	
113
			msgType = MessageType.DELETE;
114
			argsNo = 4;
115
		} break;
116
		case "REMOVED":
117
		{	
118
			msgType = MessageType.REMOVED;
119
			argsNo = 5;
120
		} break;
121
		default:
122
			return;
123
		}
124

  
125
		if (headerSplit.length != argsNo)
126
			return;
127

  
128
		version = headerSplit[1];
129
		senderID = Integer.parseInt(headerSplit[2]);
130
		fileID = headerSplit[3];
131

  
132
		if (argsNo > 4)
133
			chunkNo = Integer.parseInt(headerSplit[4]);
134

  
135
		if (msgType == MessageType.PUTCHUNK)
136
			replication = Integer.parseInt(headerSplit[5]);
137

  
138
	}
139

  
140
	public String headerString() {
141
		String string;
142
		if(msgType == MessageType.PUTCHUNK)
143
			string = msgType + " " + version + " " + senderID + " " + fileID + " " + chunkNo + " " + replication + " " + Utils.CRLF + Utils.CRLF;
144
		else if(msgType == MessageType.DELETE)
145
			string = msgType + " " + version + " " + senderID + " " + fileID + " " + Utils.CRLF + Utils.CRLF;
146
		else	
147
			string = msgType + " " + version + " " + senderID + " " + fileID + " " + chunkNo + " " + Utils.CRLF + Utils.CRLF;
148
		return string;
149
	}
150

  
151
	public byte[] getBytes() throws IOException {
152
		byte header[]; 
153
		header = headerString().getBytes();
154

  
155
		if (body == null)
156
			return header;
157
		else {
158
			ByteArrayOutputStream out = new ByteArrayOutputStream();
159
			out.write(header);
160
			out.write(body);
161
			return out.toByteArray();
162
		}
163
	}
164

  
165
	public String getVersion() {
166
		return version;
167
	}
168

  
169
	public int getSenderID() {
170
		return senderID;
171
	}
172

  
173
	public String getFileID() {
174
		return fileID;
175
	}
176

  
177
	public int getChunkNo() {
178
		return chunkNo;
179
	}
180

  
181
	public int getReplicationDegree() {
182
		return replication;
183
	}
184

  
185
	public byte[] getBody() {
186
		return body;
187
	}
188

  
189
	public MessageType getType() {
190
		return msgType;
191
	}
192

  
193
	@Override
194
	public String toString() {
195
		String str;
196
		if(msgType == MessageType.PUTCHUNK)
197
		{
198
			str = "<" + msgType +">" + " " + "<" + version+">" + " " + "<" + senderID+">" + " " +"<" + fileID+">" + " " + "<" +chunkNo+">";
199
		} 
200
		else if(msgType == MessageType.DELETE) 
201
		{
202
			str = "<" + msgType +">" + " " + "<" + version+">" + " " + "<" + senderID+">" + " " +"<" + fileID+">";
203
		}
204
		else 
205
		{
206
			str = "<" + msgType +">" + " " + "<" + version+">" + " " + "<" + senderID+">" + " " +"<" + fileID+">" + " " + "<" +chunkNo+">";
207
		}
208
		return str;
209
	}
210

  
211
}
0 212

  
DistributedBackupService/src/handlers/RemovedChunkHandler.java
1
package handlers;
2

  
3
import chunk.Chunk;
4
import server.Peer;
5

  
6
public class RemovedChunkHandler extends BackupChunkHandler {
7
    private Chunk chunkInfo;
8

  
9
    public RemovedChunkHandler(Peer parentPeer, Chunk chunkInfo, byte[] chunkData) {
10
        super(parentPeer, new Chunk(chunkInfo, chunkData));
11

  
12
        this.chunkInfo = chunkInfo;
13
    }
14

  
15
    @Override
16
    protected boolean isDesiredReplicationDegree() {
17
        return chunkInfo.getNumChunks() >= chunkInfo.getReplication();
18
    }
19
}
0 20

  
DistributedBackupService/src/protocols/Backup.java
1
package protocols;
2

  
3
import server.Peer;
4
import sockets.Socket;
5
import utils.Utils;
6

  
7
import java.io.IOException;
8
import java.util.Random;
9
import java.util.concurrent.TimeUnit;
10

  
11
import chunk.Chunk;
12
import handlers.Message;
13

  
14
import static chunk.FolderManager.makeDir;
15
import static chunk.FolderManager.backupFile;
16

  
17
public class Backup implements Runnable {
18

  
19
    private Peer peer;
20
    private Message req;
21

  
22
    private byte[] data;
23
    private int replication;
24
    private String fileID;
25
    private int chunkNo;
26
    private String version;
27
    private int senderID;
28
    private static int delayLimit;
29

  
30
    public Backup(Peer peer, Message req) {
31
        this.peer = peer;
32
        this.req = req;
33

  
34
        System.out.println("BACKUP begins!");
35
    }
36

  
37

  
38
    @Override
39
    public void run() {
40

  
41
        version = req.getVersion();
42
        senderID = req.getSenderID();
43
        fileID = req.getFileID();
44
        chunkNo = req.getChunkNo();
45
        replication = req.getReplicationDegree();
46
        delayLimit = 300;
47

  
48
        if (senderID == peer.getID()) { 
49
        	System.out.println("Ignoring backup of own files");
50
            return;
51
        }
52

  
53
        data = req.getBody();
54

  
55
        String chunkPathname = peer.getPath("chunks") + "/" + fileID;
56

  
57
        makeDir(peer.getPath("chunks") + "/" + fileID);
58

  
59
        boolean success = false;
60
        try {
61
            success = backupFile(Integer.toString(chunkNo), chunkPathname, data);
62
            //save to database
63
            peer.addChunkToFileManager(new Chunk(fileID, chunkNo, replication, data.length));
64
        } catch (IOException e) {
65
            e.printStackTrace();
66
        }
67

  
68
        if (! success) {
69
        	System.out.println("Memory overflow");
70
        } else {
71
            sendMsgStored();
72
        }
73

  
74
        System.out.println("Finished backup!");
75
    }
76

  
77
    private void sendMsgStored() {
78
        String[] args = {version, Integer.toString(peer.getID()),fileID, Integer.toString(chunkNo)};
79

  
80
        Message msg = new Message(Utils.MessageType.STORED, args);
81

  
82
        Random random = new Random();
83
        peer.sendLateMsg(Socket.SocketType.MC, msg, random.nextInt(delayLimit + 1), TimeUnit.MILLISECONDS);
84
    }
85
}
0 86

  
DistributedBackupService/src/protocols/BackupChunk.java
1
package protocols;
2

  
3
import server.Peer;
4
import utils.Utils;
5

  
6
import static chunk.FolderManager.fileSplit;
7

  
8
import java.io.File;
9
import java.util.HashMap;
10
import java.io.IOException;
11
import java.nio.file.Files;
12
import java.util.ArrayList;
13
import java.io.FileNotFoundException;
14
import java.nio.file.attribute.BasicFileAttributes;
15
import java.security.NoSuchAlgorithmException;
16

  
17
import chunk.Chunk;
18
import chunk.FileManager;
19
import chunk.FolderManager;
20
import handlers.BackupChunkHandler;
21

  
22
public class BackupChunk implements Runnable {
23

  
24
	private String fileID;
25
	private Peer peer;
26
	private String version;
27

  
28
	private byte[] data;
29
	private int replication;
30
	private File file;
31

  
32

  
33
	public BackupChunk(String version, File file, int replication, Peer peer) {
34
		this.version = version;
35
		this.file = file;
36
		this.replication = replication;
37
		this.peer = peer;
38
	}
39
	
40

  
41
	public Peer getPeer() {
42
		return peer;
43
	}
44

  
45
	public String getVersion() {
46
		return version;
47
	}
48

  
49
	@Override
50
	public void run() {
51
		try {
52
			data = FolderManager.loadFile(file);
53
		} catch (IOException e) {
54
			e.printStackTrace();
55
		}
56

  
57
		try {
58
			fileID = hashFileID(file);
59
		} catch (NoSuchAlgorithmException e) {
60
			e.printStackTrace();
61
		}
62
		
63
		ArrayList<Chunk> chunks;
64
		chunks = fileSplit(data, fileID, replication);
65
		
66
		HashMap<String, Chunk> chunksInfo = new HashMap<>();
67

  
68
		peer.getPeerInformation().startChunkReplication(fileID, chunks.size());
69

  
70
		for (Chunk chunk : chunks) {
71
			new Thread(new BackupChunkHandler(this, chunk)).start();
72
			chunksInfo.put(Integer.toString(chunk.getChunkNo()), new Chunk(chunk.getChunkNo(), chunk.getReplication()));
73
		}
74

  
75
		peer.addFileToFileManager(file.getPath(), new FileManager(file, fileID, replication, chunksInfo));
76
		peer.getPeerInformation().resetChunkReplication(fileID);
77

  
78
		System.out.println("Finished backupInitiator!");
79
	}
80

  
81
	private String hashFileID(File file) throws NoSuchAlgorithmException {
82
		BasicFileAttributes attributes;
83
		try {
84
			attributes = Files.readAttributes(file.toPath(), BasicFileAttributes.class);
85
		} catch (IOException e) {
86
			System.out.println("Couldn't read file's metadata: " + e.getMessage());
87
			return null;
88
		}
89
		
90
		String fileId = file.getName() + attributes.lastModifiedTime() + attributes.size();
91
		return Utils.hash(fileId);
92
	}
93

  
94
}
0 95

  
DistributedBackupService/src/protocols/Delete.java
1
package protocols;
2

  
3
import server.Peer;
4
import java.util.Set;
5

  
6
import java.io.IOException;
7
import java.nio.file.Files;
8
import java.nio.file.Paths;
9

  
10

  
11
import chunk.FileManager;
12
import handlers.Message;
13

  
14
public class Delete implements Runnable {
15

  
16
    private Peer peer;
17
    private Message req;
18
    private FileManager fileManager;
19

  
20
    public Delete(Peer peer, Message req) {
21
        this.peer = peer;
22
        this.req = req;
23
        this.fileManager = peer.getDatabase();
24

  
25
        System.out.println("DELETE begins!");
26
    }
27

  
28

  
29
    @Override
30
    public void run() {
31
        String fileID;
32
        String path;
33
        Set<Integer> chunks;
34
        
35
        fileID = req.getFileID();
36

  
37
        if (!fileManager.chunkExists(fileID)) {
38
        	System.out.println("Can not find chunks!");
39
            System.exit(-1);;
40
        }
41

  
42
        chunks = fileManager.getFileChunksKey(fileID);
43
        path = peer.getPath("chunks");
44

  
45
        for (Integer chunk : chunks) {
46
            try {
47
                Files.delete(Paths.get(path + "/" + fileID + "/" + chunk));
48
            } catch (IOException e) {
49
                e.printStackTrace();
50
            }
51
        }
52

  
53
        try {
54
            Files.delete(Paths.get(path + "/" + fileID));
55
        } catch (IOException e) {
56
            e.printStackTrace();
57
        }
58

  
59
        fileManager.deleteFileBackedUp(fileID);
60
        System.out.println("Delete successful.");
61
    }
62

  
63
}
0 64

  
DistributedBackupService/src/protocols/DeleteChunk.java
1
package protocols;
2

  
3
import server.Peer;
4
import sockets.Socket;
5
import utils.Utils;
6

  
7
import java.io.IOException;
8
import java.nio.file.Files;
9
import java.nio.file.Paths;
10

  
11
import chunk.FileManager;
12
import handlers.Message;
13

  
14
public class DeleteChunk implements Runnable {
15
    private String version;
16
    private String pathName;
17
    private Peer peer;
18

  
19
    public DeleteChunk(String version, String pathName, Peer parentPeer) {
20
        this.version = version;
21
        this.pathName = pathName;
22
        this.peer = parentPeer;
23
    }
24

  
25
    @Override
26
    public void run() {
27
        FileManager fileInformation;
28
        fileInformation = peer.getFile(pathName);
29
        
30
        if (fileInformation == null) {
31
        	System.out.println("Can not find File!");
32
            System.exit(-1);;
33
        }
34
        sendMsgToMC(fileInformation);
35
        try {
36
            Files.delete(Paths.get(pathName));
37
        } catch (IOException e) {
38
            e.printStackTrace();
39
        }
40
        peer.deleteFileFromFileManager(pathName);
41
    }
42

  
43
    private boolean sendMsgToMC(FileManager fileInformation) {
44
    	String idString = Integer.toString(peer.getID());
45
        String[] args = { version, idString,fileInformation.getFileID() };
46

  
47
        Message msg = new Message(Utils.MessageType.DELETE, args);
48

  
49
        try {
50
            peer.sendMsg(Socket.SocketType.MC, msg);
51
        } catch (IOException e) {
52
            e.printStackTrace();
53
            return false;
54
        }
55
        return true;
56
    }
57

  
58
}
0 59

  
DistributedBackupService/src/server/InitiatorPeer.java
1
package server;
2

  
3
import java.io.File;
4
import java.rmi.Remote;
5
import java.rmi.RemoteException;
6

  
7
public interface InitiatorPeer extends Remote {
8
    String backup(File file, int replicationDegree) throws RemoteException;
9
    void restore(String pathname) throws RemoteException; 
10
    void delete(String pathname) throws RemoteException;
11
    void reclaim(int space) throws RemoteException;
12
    void state() throws RemoteException;
13
}
0 14

  
DistributedBackupService/src/server/Peer.java
1
package server;
2

  
3
import protocols.BackupChunk;
4
import protocols.DeleteChunk;
5
import sockets.Socket;
6
import sockets.MultiCastSocket;
7
import sockets.MDBSocket;
8
import sockets.MDRSocket;
9
import sockets.Socket.SocketType;
10
import utils.Utils;
11

  
12
import java.io.File;
13
import java.io.FileNotFoundException;
14
import java.io.IOException;
15
import java.rmi.registry.LocateRegistry;
16
import java.rmi.registry.Registry;
17
import java.rmi.server.UnicastRemoteObject;
18
import java.util.HashMap;
19
import java.util.Map;
20
import java.util.concurrent.*;
21

  
22
import chunk.*;
23
import handlers.Handler;
24
import handlers.Message;
25

  
26

  
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff