Project

General

Profile

Revision 1

added files

View differences:

service/.project
1
<?xml version="1.0" encoding="UTF-8"?>
2
<projectDescription>
3
	<name>service</name>
4
	<comment></comment>
5
	<projects>
6
	</projects>
7
	<buildSpec>
8
		<buildCommand>
9
			<name>org.eclipse.jdt.core.javabuilder</name>
10
			<arguments>
11
			</arguments>
12
		</buildCommand>
13
	</buildSpec>
14
	<natures>
15
		<nature>org.eclipse.jdt.core.javanature</nature>
16
	</natures>
17
</projectDescription>
service/Chunk.java
1
package service;
2

  
3
import java.util.ArrayList;
4

  
5
public class Chunk implements java.io.Serializable {
6

  
7
	private int chunkNr;
8
    private String fileID;
9
    private String filePathName = null;
10
    private int size;
11

  
12
    private int actual_replication_degree = 0;
13
    private int desired_replication_degree;
14

  
15
    private byte[] chunkContent;
16

  
17
//    each chunk is identified by the pair <fileID, chunkNo>
18
    public Chunk(int n, byte[] c, String s, int desiredRepDegree) throws Exception
19
    {
20
      this.chunkNr = n;
21
      this.chunkContent = getFiltredData(c);
22

  
23
      if(chunkContent.length > Constants.MAX_CHUNK_SIZE) {
24
    	  throw new Exception("Invalid chunk size -> " + chunkContent.length);
25
      }
26
      
27
      this.size = chunkContent.length;
28
      
29
      desired_replication_degree = desiredRepDegree;
30
      
31
      fileID = s;
32
    }
33
    
34
    private byte[] getFiltredData(byte[] c) {
35
    	ArrayList<Byte> aux = new ArrayList<Byte>();
36
    	boolean toAdd = false;
37
    	
38
		for(byte b : c) {
39
			aux.add(b);
40
		}
41
		
42
		byte[] toReturn = new byte[aux.size()];
43

  
44
		for(int i = aux.size() - 1, k = 0; i >= 0 ; i--) {
45
			if(aux.get(i) == 0) {
46
				if(!toAdd) {
47
					continue;
48
				}
49
				else {
50
					toReturn[k] = aux.get(i);
51
					k++;
52
				}
53
			}
54
			else {
55
				toReturn[k] = aux.get(i);
56
				toAdd = true;
57
				k++;
58
			}	
59
		}
60
		
61
		return toReturn;
62
	}
63

  
64
	public int getReplicationDegree() {
65
    	return desired_replication_degree;
66
    }
67

  
68
    public void setActualReplicationDegree(int rd)
69
    {
70
      this.actual_replication_degree = rd;
71
    }
72

  
73

  
74
    public String getChunkNumber()
75
    {
76
      return Integer.toString(this.chunkNr);
77
    }
78

  
79

  
80
    public String getChunkId()
81
    {
82
      return this.fileID;
83
    }
84

  
85

  
86
    public int getChunkSize()
87
    {
88
      return this.size;
89
    }
90

  
91

  
92
    public int getActualReplicationDegree()
93
    {
94
      return this.actual_replication_degree;
95
    }
96

  
97

  
98
    public byte[] getChunkContent()
99
    {
100
      return this.chunkContent;
101
    }
102
    
103
    public int compareTo(Chunk c)
104
    {
105
         return(this.chunkNr - c.chunkNr);
106
    }
107

  
108
	@Override
109
	public boolean equals(Object obj) {
110
		if (this == obj) {
111
			return true;
112
		}
113
		if (obj == null) {
114
			return false;
115
		}
116
		if (!(obj instanceof Chunk)) {
117
			return false;
118
		}
119
		Chunk other = (Chunk) obj;
120
		if (chunkNr != other.chunkNr) {
121
			return false;
122
		}
123
		if (fileID == null) {
124
			if (other.fileID != null) {
125
				return false;
126
			}
127
		} else if (!fileID.equals(other.fileID)) {
128
			return false;
129
		}
130
		return true;
131
	}
132

  
133
	public String getFilePathName() {
134
		return filePathName;
135
	}
136

  
137
	public void setFilePathName(String filePathName) {
138
		this.filePathName = filePathName;
139
	}
140
}
service/Client.java
1
package service;
2

  
3
import java.rmi.registry.LocateRegistry;
4
import java.rmi.registry.Registry;
5

  
6
public class Client {
7

  
8
    private Client() {}
9

  
10
    public static void main(String[] args) { 
11

  
12
    	System.out.println("CLIENT");
13
    	try 
14
        {      	
15
        	int argc = args.length;
16

  
17
            if(argc < 2 || argc > 4)
18
            {
19
              print_usage();
20
              return;
21
            }
22
        	
23
            
24
            String aux = args[0];
25
            
26
            String[] acess_point = aux.split(":");
27
            
28
            String ip = acess_point[0];
29
            String id = acess_point[1];
30
            
31
            String sub_protocol = args[1];
32
            
33
            Registry registry = LocateRegistry.getRegistry(ip);
34
            RMI rmi = (RMI) registry.lookup(id);
35
            
36
            String file_path;
37

  
38
            String response;
39
            
40
            switch(sub_protocol)
41
            {
42
              case "BACKUP":
43
                if(argc != 4)
44
                {
45
                	print_error_ops(sub_protocol);
46
                }
47
                
48
                
49
                file_path = args[2];
50
                int replication_degree = Integer.parseInt(args[3]);
51
               
52
                
53
                response = rmi.backup(file_path, replication_degree);
54
                System.out.println("response: " + response);
55
                break;
56

  
57
              case "RESTORE":
58
                if(argc != 3)
59
                {
60
                	print_error_ops(sub_protocol);
61
                }
62

  
63
                file_path = args[2];
64
                response = rmi.restore(file_path);
65
                System.out.println("response: " + response);
66
                break;
67

  
68
              case "DELETE":
69
                if(argc != 3)
70
                {
71
                	print_error_ops(sub_protocol);
72
                }
73

  
74
                
75
                file_path = args[2];
76
                response = rmi.delete(file_path);
77
                System.out.println("response: " + response);
78
                break;
79

  
80
              case "RECLAIM":
81
                if(argc != 3)
82
                {
83
                  print_error_ops(sub_protocol);
84
                }
85

  
86
                long disk_space = Long.parseLong(args[2]);
87
                response = rmi.reclaim(disk_space);
88
                System.out.println("response: " + response);
89
                break;
90

  
91
              case "STATE":
92
                if(argc != 2)
93
                {
94
                	print_error_ops(sub_protocol);
95
                }
96

  
97
                response = rmi.state();
98
                System.out.println("response: " + response);
99
                break;
100
            }
101

  
102
        } catch (Exception e) {
103
            System.err.println("Client exception: " + e.toString()); 
104
        }
105
    }
106
    
107
    public static void print_usage()
108
    {
109
      System.out.println("Usage: :\n");
110
      System.out.println("  java Client <peer_ap> <operation> <opnd_1> <opnd_2> \n");
111
      System.out.println("  - <peer_ap> -> peer's acess point\n");
112
      System.out.println("  - <operation> -> Subprotocol ops: BACKUP, RESTORE, DELETE, RECLAIM;  Enhancements: ops with ENH in the end. State: STATE \n");
113
      System.out.println("  - <opnd_1> -> path name of the file or in case of op RECLAIM the max amout of disk space\n");
114
      System.out.println("  - <opnd_2>* -> integer that specifies the desired replication degree\n");
115
    }
116
    
117
    public static void print_error_ops(String error)
118
    {
119
      switch(error)
120
      {
121
        case "BACKUP":
122
          System.out.println("ERROR on BACKUP format. Must be:\n");
123
          System.out.println("> BACKUP <file_path> <replication_degree>");
124
          break;
125

  
126
        case "RESTORE":
127
          System.out.println("ERROR on RESTORE format. Must be:\n");
128
          System.out.println("> RESTORE <file_path>");
129
          break;
130

  
131
        case "DELETE":
132
          System.out.println("ERROR on DELETE format. Must be:\n");
133
          System.out.println("> DELETE <file_path>");
134
          break;
135

  
136
        case "RECLAIM":
137
          System.out.println("ERROR on RECLAIM format. Must be:\n");
138
          System.out.println("> RECLAIM <disk_space>");
139
          break;
140

  
141
        case "STATE":
142
          System.out.println("ERROR on STATE format. Must be:\n");
143
          System.out.println("> RECLAIM <disk_space>");
144
          break;
145

  
146
         default:
147
           System.out.println("ERROR. Unkown op");
148
      }
149
    }
150
}
service/Cloud.java
1
package service;
2

  
3
import channels.BackupChannel;
4
import channels.ControlChannel;
5
import channels.Message;
6
import channels.MessageHeader;
7
import channels.RestoreChannel;
8
import protocols.SendChunkMessageProtocol;
9

  
10
import java.io.*;
11
import java.nio.file.Files;
12
import java.nio.file.Path;
13
import java.nio.file.Paths;
14
import java.util.*;
15
import java.util.concurrent.ConcurrentHashMap;
16
import java.util.concurrent.ConcurrentHashMap.KeySetView;
17
import java.util.concurrent.ConcurrentLinkedQueue;
18

  
19
//https://www.geeksforgeeks.org/serialization-in-java/
20

  
21
public class Cloud implements java.io.Serializable
22
{
23
	private static final long serialVersionUID = 1L;
24

  
25
	/**
26
	 * stores received V and backedUp chunks
27
	 * key = <fileID>
28
	 */
29
	private ConcurrentHashMap<String, Chunk> storedChunks;
30

  
31
	/**
32
	 * stores the number of chunks taht a file that was backed up in peer was splited into
33
	 * key = <fileID>
34
	 */
35
	private ConcurrentHashMap<String, Integer> numberOfFileChunks;
36

  
37
	/**
38
	 * registers the peers that have stored chunks sent by this peer
39
	 * key = <fileID>:<ChunkNo>
40
	 */
41
	private ConcurrentHashMap<String, HashSet<Integer>> colaborativePeers;
42

  
43
	/**
44
	 * stores the number of times a chunk was stored
45
	 * key = <fileID>:<ChunkNo>
46
	 */
47
	private ConcurrentHashMap<String, Integer> numberOfChunksConfirmations;
48

  
49
	/**
50
	 * key = <fileID>:<ChunkNo>
51
	 */
52
	private ConcurrentHashMap<String, Integer> chunksToRestore;
53

  
54
	/**
55
	 * Holds chucks restored by restore protocol
56
	 * key = <fileID>:<ChunkNo>
57
	 */
58
	private ConcurrentHashMap<String, Chunk> recoveredChunks;
59

  
60
	/**
61
	 * stores the number of chunks a file was splited into
62
	 * key = <fileID>
63
	 */
64
	private ConcurrentLinkedQueue<String> numberOfFileChunksToBeRestored;
65

  
66
	private ArrayList<FileChunker> data;
67
	private int serverID;
68
	private String protocol_version;
69
	public ControlChannel controlRoom;
70
	public BackupChannel backupCh;
71
	public RestoreChannel restoreCh;
72

  
73
	long spaceRemaining;
74

  
75

  
76
	public Cloud(String id, String version)
77
	{
78
		numberOfFileChunks = new ConcurrentHashMap<String, Integer>();
79
		storedChunks = new ConcurrentHashMap<String, Chunk>();
80
		colaborativePeers = new ConcurrentHashMap<String, HashSet<Integer>>();
81
		numberOfChunksConfirmations = new ConcurrentHashMap<String, Integer>();
82
		this.data = new ArrayList<>();
83

  
84
		this.numberOfFileChunksToBeRestored = new ConcurrentLinkedQueue<String>();
85
		this.recoveredChunks = new  ConcurrentHashMap<String, Chunk>();
86

  
87
		this.spaceRemaining = 1073741824; //1GB
88

  
89
		serverID = Integer.parseInt(id);
90
		protocol_version = version;
91
	}
92

  
93
	public synchronized boolean sendBackupMessage(Message m) {
94
		return backupCh.sendMessage(m);
95
	}
96

  
97
	public synchronized boolean sendRestoreMessage(Message m) {
98
		return restoreCh.sendMessage(m);
99
	}
100

  
101
	public Cloud getPeerTools(String id, String version) {
102
		return new Cloud(id, version);
103
	}
104

  
105
	public int getChunkCurrentRepDegree(String fileId, String chunkNo) {
106
		String key = fileId + ':' + chunkNo;
107

  
108
		if(numberOfChunksConfirmations.containsKey(key)) {
109
			return numberOfChunksConfirmations.get(key);
110
		}
111

  
112
		return -1;
113
	}
114

  
115
	public byte[] reverseList(byte[] a) {
116
		ArrayList<Byte> aux = new ArrayList<Byte>(a.length);
117

  
118
		for(byte b : a) {
119
			aux.add(0, b);
120
		}
121

  
122
		byte[] toReturn = new byte[a.length];
123

  
124
		int i = 0;
125

  
126
		for(byte b : aux) {
127
			toReturn[i] = b;
128
			i++;
129
		}
130

  
131
		return toReturn;
132
	}
133

  
134
	public boolean buildFile(String file_id, int maxchunks, String fileName) throws IOException
135
	{
136
		String restore_path = Constants.CHUNKS_DIR + this.serverID + "/restored/";
137
		String key;
138
		Chunk aux;
139
		int total = 0;
140

  
141
		checkOrCreateDirectory(restore_path);
142

  
143
		File restoredFile = new File(restore_path + fileName);
144

  
145
		restoredFile.createNewFile();
146

  
147
		FileOutputStream fos =  new FileOutputStream(restoredFile);
148

  
149
		for(int i = 0; i < maxchunks; i++)
150
		{
151
			key = file_id + ':' + Integer.toString(i);
152
			if(recoveredChunks.containsKey(key)) {
153
				aux = recoveredChunks.get(key);
154

  
155
				if(aux != null) {
156
					fos.write(reverseList(aux.getChunkContent()));
157
					total += aux.getChunkContent().length;
158
				}
159
				else {
160
					System.out.println("NULL chunk file might be corrupted");
161
				}	
162
			}
163
		}
164

  
165
		try
166
		{
167
			fos.close();
168
		}
169
		catch(IOException e) {
170
			recoveredChunks.clear();
171
			System.out.println(e.getMessage());
172
		}
173
		
174
		recoveredChunks.clear();
175

  
176
		System.out.println("Restored file " + fileName + " with " + total + " bytes.");
177

  
178
		return false;
179
	}
180

  
181
	public ArrayList<FileChunker> getData()
182
	{
183
		return this.data;
184
	}
185

  
186
	public ConcurrentHashMap<String, Integer> getChunksToRestore()
187
	{
188
		return this.chunksToRestore;
189
	}
190

  
191
	void addFile(FileChunker fc)
192
	{
193
		this.data.add(fc);
194
	}
195

  
196

  
197
	public void addChunk(Chunk c) {
198
		String key = c.getChunkId() + ":" + c.getChunkNumber();
199
		storedChunks.put(key, c);
200
	}
201

  
202

  
203
	public void deleteChunks(FileChunker fc, ArrayList<FileChunker> fcs) 
204
	{
205

  
206
		String file_id = fc.getFileID();
207

  
208
		String path = Constants.CHUNKS_DIR + this.serverID + "/backup/" + file_id + '/';
209

  
210
		File f = new File(path);
211

  
212
		String[] entries = f.list();
213

  
214
		for(String s: entries)
215
		{
216
			File currentFile = new File(f.getPath(), s);
217
			currentFile.delete();
218
		}
219

  
220
		f = new File(path);
221
		f.delete();
222
	}
223

  
224
	public boolean storeRestoreChunk(Message m) {
225
		Chunk toStore = null;
226
		byte[] chunkData;
227

  
228
		try {
229
			chunkData = m.getChunkContent();
230
			toStore = new Chunk(Integer.parseInt(m.getHeader().getChunkNumber()), chunkData, m.getHeader().getFileId(), Integer.parseInt(m.getHeader().getDegree()));
231
		} catch (Exception e) {
232
			System.out.println("Error in chunk: " + e.getMessage());
233
			return false;
234
		}
235

  
236
		addToRecoveredChunks(toStore);
237

  
238
		return true;
239
	}
240

  
241
	public boolean storeChunk(Message m) {
242

  
243
		Chunk toStore = null;
244
		String filename, path;
245
		FileOutputStream out = null;
246
		File dir = null;
247
		byte[] chunkData;
248

  
249
		try {
250
			chunkData = m.getChunkContent();
251
			toStore = new Chunk(Integer.parseInt(m.getHeader().getChunkNumber()), chunkData, m.getHeader().getFileId(), Integer.parseInt(m.getHeader().getDegree()));
252
		} catch (Exception e) {
253
			System.out.println("Error in chunk: " + e.getMessage());
254
			return false;
255
		}
256

  
257
		if(toStore.getChunkContent().length > spaceRemaining) {
258
			System.out.println("There is no available space");
259
			return false;
260
		}
261

  
262
		this.addChunk(toStore);
263
		spaceRemaining -= toStore.getChunkSize();
264

  
265
		path = Constants.CHUNKS_DIR + this.serverID + "/backup/" + toStore.getChunkId() + "/";
266
		filename = getFileName(Integer.parseInt(toStore.getChunkNumber()));
267

  
268
		if(!doesDirExists(path)) {	
269
			dir = new File(path);
270
			if(!dir.mkdirs()) {
271
				return false;
272
			}
273
		}
274

  
275
		try {
276
			dir = new File(path + filename);
277
			dir.createNewFile();
278
			out = new FileOutputStream(dir);
279
			out.write(toStore.getChunkContent());
280
			out.close();
281
		} catch (IOException e) {
282
			System.out.println("Error writing file to disk: " + e.getMessage());
283
			return false;
284
		}
285

  
286
		return true;
287
	}
288

  
289
	public boolean checkIfChunkWasRestored(String fileid, String chunknr)
290
	{
291
		return this.recoveredChunks.containsKey(fileid + ':' + chunknr);	
292
	}
293

  
294
	public void addToRecoveredChunks(Chunk c) {
295
		String key = c.getChunkId() + ":" + c.getChunkNumber();
296
		if(!recoveredChunks.containsKey(key)) {
297
			recoveredChunks.put(key, c);
298
		}
299
	}
300

  
301
	public void registerFileChunkToRestore(String file_id, String chunknr)
302
	{
303

  
304
		if(!this.numberOfFileChunksToBeRestored.contains(file_id + ':' + chunknr))
305
		{
306
			this.numberOfFileChunksToBeRestored.add(file_id + ':' + chunknr);
307
		}
308
	}
309

  
310
	public boolean isFileChunkRegistered(String fileId, String chunknr) {
311
		return this.numberOfFileChunksToBeRestored.contains(fileId + ':' + chunknr);
312
	}
313

  
314
	public void createDirectoryFile(FileChunker fc) throws IOException
315
	{
316
		String file_id = fc.getFileID();
317

  
318
		String path = Constants.CHUNKS_DIR + this.serverID + "/backup/" + file_id + '/';
319

  
320
		if(checkOrCreateDirectory(path) == -1)
321
		{
322
			System.out.println("Couldn't create peer backup chunks dir for that file!");
323
		}
324

  
325
		for(int i = 0; i < fc.getChunks().size(); i++)
326
		{
327
			int nr = i + 1;
328
			String filename = "chk" + Integer.toString(nr);
329
			File dir = new File(path + filename);	
330

  
331

  
332
			dir.createNewFile();
333

  
334
			try(FileOutputStream out = new FileOutputStream(dir))
335
			{
336
				out.write(fc.getChunks().get(i).getChunkContent());
337
			}
338
		}
339
	}
340

  
341
	private static String getFileName(int chunkNumber) {
342
		return "chk" + Integer.toString(chunkNumber);
343
	}
344

  
345
	private static boolean doesDirExists(String path) {
346
		File f = new File(path);
347

  
348
		return f.exists();
349
	}
350

  
351
	public boolean checkIfPeerStoredFile(String filename) {
352
		return numberOfFileChunks.containsKey(filename);
353
	}
354

  
355
	public static int checkOrCreateDirectory(String dirName) 
356
	{
357

  
358
		File f = new File(dirName);
359

  
360
		int result = -1;
361

  
362
		if(!f.exists())
363
		{
364
			//			System.out.println("Creating directory " + f.getName());
365

  
366
			try
367
			{
368
				f.mkdirs();
369
				result = 1;
370
			}
371
			catch(SecurityException se)
372
			{
373
				//				 System.err.println("Dir exception: " + se.toString());
374
				se.printStackTrace(); 
375
				result = -1;
376
			}
377

  
378
			if(result == 1)
379
			{
380
				//				System.out.println("DIR " + f.getName() + " created");
381
			}
382
		}
383
		else if(f.exists())
384
		{
385
			//			System.out.println("Directory already exists, proceeding");
386
			result = 2;
387
		}
388
		else
389
		{
390
			result = -1;
391
		}
392

  
393
		return result;
394
	}
395

  
396
	public void createControlRoom(String ip, String port) throws IOException {
397
		controlRoom = new ControlChannel(ip, port, this);
398
	}
399

  
400
	public void createBackupChannel(String ip, String port) throws IOException {
401
		backupCh = new BackupChannel(ip, port, this);
402
	}
403

  
404
	public void createRestoreChannel(String ip, String port) throws IOException {
405
		restoreCh = new RestoreChannel(ip, port, this);
406
	}
407

  
408
	public void activateChannels() {
409
		controlRoom.start();
410
		backupCh.start();
411
		restoreCh.start();
412
	}
413

  
414

  
415
	public int getID() {
416
		return serverID;
417
	}
418

  
419
	public boolean storeChunk(Chunk c)
420
	{
421
		if(!this.storedChunks.containsKey(c.getChunkId() + ":" + c.getChunkNumber())) {
422
			addChunk(c);
423
			spaceRemaining -= c.getChunkSize();
424
		}
425

  
426
		return true;
427
	}
428

  
429
	public String getProtocolVersion() {
430
		return protocol_version;
431
	}
432

  
433
	public ConcurrentHashMap<String, Chunk> getStoredChunks()
434
	{
435
		return this.storedChunks;
436
	}
437

  
438

  
439
	public synchronized void registerNumberOfFileChunks(String fileID, Integer NFileChunks) {
440
		System.out.println(fileID + " registered.");
441
		numberOfFileChunks.put(fileID, NFileChunks);
442
	}
443

  
444
	public synchronized boolean insertAwaitingStoreFile(MessageHeader h) {
445

  
446
		String key = h.getFileId() + ':' + h.getChunkNumber();
447
		if(numberOfFileChunks.containsKey(h.getFileId())) {
448
			colaborativePeers.put(key, new HashSet<Integer>());
449
			numberOfChunksConfirmations.put(key, 0);
450
			return true;
451
		}
452
		else {
453
			System.out.println("Cloud hasn't got " + h.getFileId() + " registered.");
454
			return false;
455
		}
456
	}
457

  
458
	public boolean wasTheRepDegreeMatched(Chunk c) {	
459
		return c.getReplicationDegree() <= numberOfChunksConfirmations.get(c.getChunkId() + ':' + c.getChunkNumber());
460
	}
461

  
462

  
463

  
464
	public synchronized boolean intrepertStoredMessage(MessageHeader h) {
465
		String key = h.getFileId() + ':' + h.getChunkNumber();
466

  
467
		if(numberOfFileChunks.containsKey(h.getFileId())) {
468
			if(colaborativePeers.get(key).add(Integer.parseInt(h.getSenderId()))) {
469
				numberOfChunksConfirmations.put(key, numberOfChunksConfirmations.get(key) + 1);
470
			}
471
			return true;
472
		}
473
		else {
474
			if(storedChunks.containsKey(key)) {
475
				if(!colaborativePeers.containsKey(key)) {
476
					colaborativePeers.put(key, new HashSet<Integer>());
477
					colaborativePeers.get(key).add(getID());
478
				}			
479
				colaborativePeers.get(key).add(Integer.parseInt(h.getSenderId()));
480
				storedChunks.get(key).setActualReplicationDegree(colaborativePeers.get(key).size());
481

  
482
				return true;
483
			}
484
			return false;
485
		}
486
	}
487

  
488
	public synchronized boolean deleteFile(MessageHeader h) {
489
		String fileId = h.getFileId();
490
		String path = Constants.CHUNKS_DIR + this.serverID + "/backup/" + fileId + "/";
491
		File dir = new File(path);
492
		boolean somethingDeleted = false;
493

  
494
		if(dir.isDirectory()) {
495
			for(File f : dir.listFiles()) {
496
				f.delete();
497
				somethingDeleted = true;
498
			}
499

  
500
			if(!dir.delete()) {
501
				System.out.println("Couldn't erase " + h.getFileId() + " from file system");
502
				return false;
503
			}
504
		}
505

  
506
		for(String key : storedChunks.keySet()) {
507
			Chunk c = storedChunks.get(key);
508
			if(c.getChunkId().equals(fileId)) {
509
				storedChunks.remove(key);
510
				spaceRemaining += c.getChunkSize();
511
				somethingDeleted = true;
512
			}
513
		}
514

  
515
		if(somethingDeleted) {
516
			System.out.println(h.getFileId() + " erased from file system.");
517
		}
518

  
519
		return true;
520
	}
521

  
522
	public KeySetView<String, Integer> getBackedUpFileNames() {
523
		return numberOfFileChunks.keySet();
524
	}
525

  
526
	public String getPathNameFromFileId(String fileId) {
527
		if(storedChunks.containsKey(fileId + ":" + "0")) {
528
			return storedChunks.get(fileId + ":" + "0").getFilePathName();
529
		}
530

  
531
		return null;
532
	}
533

  
534
	public int getBackedUpChunkNecessaryRepDegree(String fileId) {
535
		if(storedChunks.containsKey(fileId + ":" + "0")) {
536
			return storedChunks.get(fileId + ":" + "0").getReplicationDegree();
537
		}
538

  
539
		return -1;
540
	}
541

  
542
	public int getBackepUpChunkPerceivedRepDegree(String fileId, int id) {
543
		if(colaborativePeers.containsKey(fileId + ":" + Integer.toString(id))) {
544
			return colaborativePeers.get(fileId + ":" + Integer.toString(id)).size();
545
		}
546

  
547
		return 0;
548
	}
549

  
550
	public long getAvailableSpace() {
551
		return this.spaceRemaining;
552
	}
553

  
554
	public int getNumberOfStoredChunks() {
555
		return storedChunks.size();
556
	}
557

  
558
	public long getChunkSpace() {
559
		long total = 0;
560

  
561
		for(String key : storedChunks.keySet()) {
562
			Chunk c = storedChunks.get(key);
563
			total += c.getChunkSize();
564
		}
565

  
566
		return total;
567
	}
568

  
569
	public long getTotalSpace() {		
570
		return this.spaceRemaining + getChunkSpace();
571
	}
572

  
573
	public void setAvailableSpace(long s) {
574
		this.spaceRemaining = s-getChunkSpace();
575
	}
576

  
577
	public void sendRemovedMessage(String fileId, String chunkNumber) {
578
		String[] headerElements = new String[Constants.REMOVED_N_ARGS];
579
		MessageHeader h;
580
		headerElements[0] = Constants.REMOVED;
581
		headerElements[1] = getProtocolVersion();
582
		headerElements[2] = Integer.toString(getID());
583
		headerElements[3] = fileId;
584
		headerElements[4] = chunkNumber; 
585

  
586
		h = new MessageHeader(headerElements);
587

  
588
		controlRoom.sendHeader(h);
589
	}
590

  
591
	public void sendHeader(MessageHeader mh)
592
	{
593
		controlRoom.sendHeader(mh);
594
	}
595

  
596
	public boolean freeUpSpace(long desiredSpace) {
597

  
598
		String fileId, chunkNo, path;
599
		File file;
600

  
601
		for(String key : storedChunks.keySet()) {
602
			Chunk c = storedChunks.get(key);
603
			if(getChunkSpace() <= desiredSpace) {
604
				break;
605
			}
606

  
607
			fileId = c.getChunkId();
608
			chunkNo = c.getChunkNumber();
609

  
610
			path = Constants.CHUNKS_DIR + this.serverID + "/backup/" + fileId + "/chk" + chunkNo;
611

  
612
			file = new File(path);
613

  
614
			if(file.delete()) {
615
				sendRemovedMessage(fileId, chunkNo);
616
				storedChunks.remove(key);
617
			}
618
		}
619

  
620
		setAvailableSpace(desiredSpace);
621

  
622
		return true;
623
	}
624

  
625
	public boolean intrepertRemovedMessage(MessageHeader h) {
626
		String fileId, chunkNo, senderId, key;
627

  
628
		fileId = h.getFileId();
629
		chunkNo = h.getChunkNumber();
630
		senderId = h.getSenderId();
631
		key = fileId + ":" + chunkNo;
632

  
633
		if(checkIfPeerStoredFile(fileId)) {
634
			if(numberOfChunksConfirmations.containsKey(key)) {
635
				numberOfChunksConfirmations.put(key, numberOfChunksConfirmations.get(key) - 1);
636
			}
637
			if(this.colaborativePeers.containsKey(key)) {
638
				this.colaborativePeers.get(key).remove(Integer.parseInt(senderId));
639
			}			
640
			if(storedChunks.containsKey(key)) {
641
				return storedChunks.get(key).getReplicationDegree() < numberOfChunksConfirmations.get(key);
642
			}
643
		}
644

  
645
		return true;
646
	}
647

  
648
	public int getNumberOfChunks(String file_id)
649
	{
650
		if(numberOfFileChunks.containsKey(file_id)) {
651
			return numberOfFileChunks.get(file_id);
652
		}
653

  
654
		return -1;
655
	}
656

  
657
	public Chunk getFileChunkRepDegree(String fileId, String chunkNo) {
658

  
659
		if(storedChunks.containsKey(fileId + ":" + chunkNo)) {
660
			return storedChunks.get(fileId + ":" + chunkNo);
661
		}
662

  
663
		return null;
664
	}
665

  
666
	public void intrepertGetChunkMessage(MessageHeader header) {
667

  
668
		String key = header.getFileId() + ":" + header.getChunkNumber();
669
		Chunk toSend;
670
		SendChunkMessageProtocol p;
671

  
672
		if(storedChunks.containsKey(key)) {
673
			System.out.println("Going to send chunk no " + header.getChunkNumber());
674
			toSend = storedChunks.get(key);
675

  
676
			p = new SendChunkMessageProtocol(toSend, this);
677

  
678
			p.run();
679
		}
680
		else {
681
			System.out.println("I dont have that chunk");
682
		}
683
	}
684

  
685

  
686
	@SuppressWarnings({ "unchecked" })
687
	public void loadFromDisk() throws IOException, Exception
688
	{	
689
		String path = Constants.DATABASE_DIR + "peer" + this.serverID + '/';
690

  
691

  
692
		String path1 = path + "storedChunks.ser";
693
		Path p = Paths.get(path1);
694
		if (Files.exists(p)) {
695
			FileInputStream fis = new FileInputStream(path1);
696
			ObjectInputStream ois = new ObjectInputStream(fis);
697
			this.setStoredChunks( (ConcurrentHashMap<String, Chunk>)ois.readObject());
698
			fis.close();
699
			ois.close();
700
		}
701

  
702

  
703

  
704

  
705
		String path2 = path + "numberOfFileChunks.ser";
706
		Path p2 = Paths.get(path2);
707
		if (Files.exists(p2)) {
708
			FileInputStream fis2 = new FileInputStream(path2);
709
			ObjectInputStream ois2 = new ObjectInputStream(fis2);
710
			this.setNumberOfFileChunks((ConcurrentHashMap<String, Integer>)ois2.readObject());
711
			fis2.close();
712
			ois2.close();
713
		}
714

  
715

  
716
		String path3 = path + "colaborativePeers.ser";
717
		Path p3 = Paths.get(path3);
718
		if (Files.exists(p3)) {
719
			FileInputStream fis3 = new FileInputStream(path3);
720
			ObjectInputStream ois3 = new ObjectInputStream(fis3);
721
			this.setColaborativePeers((ConcurrentHashMap<String, HashSet<Integer>>)ois3.readObject());
722
			fis3.close();
723
			ois3.close();
724
		}
725

  
726

  
727
		String path4 = path + "numberOfChunksConfirmations.ser";
728
		Path p4 = Paths.get(path4);
729
		if (Files.exists(p4)) {
730
			FileInputStream fis4 = new FileInputStream(path4);
731
			ObjectInputStream ois4 = new ObjectInputStream(fis4);
732
			this.setNumberOfChunksConfirmations((ConcurrentHashMap<String, Integer>)ois4.readObject());
733
			fis4.close();
734
			ois4.close();
735
		}
736

  
737

  
738
		String path5 = path + "chunksToRestore.ser";
739
		Path p5 = Paths.get(path5);
740
		if (Files.exists(p5)) {
741
			FileInputStream fis5 = new FileInputStream(path5);
742
			ObjectInputStream ois5 = new ObjectInputStream(fis5);
743
			this.setChunksToRestore((ConcurrentHashMap<String, Integer>)ois5.readObject());
744
			fis5.close();
745
			ois5.close();
746
		}
747

  
748
		String path6 = path + "recoveredChunks.ser";
749
		Path p6 = Paths.get(path6);
750
		if (Files.exists(p6)) {
751
			FileInputStream fis6 = new FileInputStream(path6);
752
			ObjectInputStream ois6 = new ObjectInputStream(fis6);
753
			this.setRecoveredChunks((ConcurrentHashMap<String, Chunk>)ois6.readObject());
754
			fis6.close();
755
			ois6.close();
756
		}
757

  
758
		String path7 = path + "numberOfFileChunksToBeRestored.ser";
759
		Path p7 = Paths.get(path7);
760
		if (Files.exists(p7)) {
761
			FileInputStream fis7 = new FileInputStream(path7);
762
			ObjectInputStream ois7 = new ObjectInputStream(fis7);
763
			this.setNumberOfFileChunksToBeRestored((ConcurrentLinkedQueue<String>)ois7.readObject());
764
			fis7.close();
765
			ois7.close();
766
		}
767
	}
768

  
769
	@SuppressWarnings("resource")
770
	public synchronized void saveToDisk() throws IOException
771
	{
772
		String path = Constants.DATABASE_DIR + "peer" + this.serverID + '/';
773

  
774
		checkOrCreateDirectory(path);
775

  
776
		String path1 = path + "storedChunks.ser";
777
		FileOutputStream fos1 = new FileOutputStream(path1);
778
		ObjectOutputStream oos1 = new ObjectOutputStream(fos1);
779
		oos1.writeObject(storedChunks);
780

  
781
		String path2 = path + "numberOfFileChunks.ser";
782
		FileOutputStream fos2 = new FileOutputStream(path2);
783
		ObjectOutputStream oos2 = new ObjectOutputStream(fos2);
784
		oos2.writeObject(numberOfFileChunks);
785

  
786
		String path3 = path + "colaborativePeers.ser";
787
		FileOutputStream fos3 = new FileOutputStream(path3);
788
		ObjectOutputStream oos3 = new ObjectOutputStream(fos3);
789
		oos3.writeObject(colaborativePeers);
790

  
791
		String path4 = path + "numberOfChunksConfirmations.ser";
792
		FileOutputStream fos4 = new FileOutputStream(path4);
793
		ObjectOutputStream oos4 = new ObjectOutputStream(fos4);
794
		oos4.writeObject(numberOfChunksConfirmations);
795

  
796
		String path5 = path + "chunksToRestore.ser";
797
		FileOutputStream fos5 = new FileOutputStream(path5);
798
		ObjectOutputStream oos5 = new ObjectOutputStream(fos5);
799
		oos5.writeObject(chunksToRestore);
800

  
801
		String path6 = path + "recoveredChunks.ser";
802
		FileOutputStream fos6 = new FileOutputStream(path6);
803
		ObjectOutputStream oos6 = new ObjectOutputStream(fos6);
804
		oos6.writeObject(recoveredChunks);
805

  
806
		String path7 = path + "numberOfFileChunksToBeRestored.ser";
807
		FileOutputStream fos7 = new FileOutputStream(path7);
808
		ObjectOutputStream oos7 = new ObjectOutputStream(fos7);
809
		oos7.writeObject(numberOfFileChunksToBeRestored);
810
	}
811

  
812
	public void setStoredChunks(ConcurrentHashMap<String, Chunk> sc)
813
	{
814
		storedChunks = sc;
815
	}
816

  
817
	public void setNumberOfFileChunks(ConcurrentHashMap<String, Integer> si)
818
	{
819
		numberOfFileChunks = si;
820
	}
821

  
822
	public void setColaborativePeers(ConcurrentHashMap<String, HashSet<Integer>> sh)
823
	{
824
		colaborativePeers = sh;
825
	}
826

  
827
	public void setNumberOfChunksConfirmations(ConcurrentHashMap<String, Integer> si)
828
	{
829
		numberOfChunksConfirmations = si;
830
	}
831

  
832
	public void setChunksToRestore(ConcurrentHashMap<String, Integer> si)
833
	{
834
		chunksToRestore = si;
835
	}
836

  
837
	public void setRecoveredChunks(ConcurrentHashMap<String, Chunk> sc)
838
	{
839
		recoveredChunks = sc;
840
	}
841

  
842
	public void setNumberOfFileChunksToBeRestored(ConcurrentLinkedQueue<String> s)
843
	{
844
		numberOfFileChunksToBeRestored = s;
845
	}
846
}
channels/BackupChannel.java
1
package channels;
2

  
3
import java.io.IOException;
4

  
5
import protocols.SendSoredMessageProtocol;
6
import service.Cloud;
7
import service.Constants;
8

  
9
public class BackupChannel extends Channel {
10
	private Message receivedMessage;
11

  
12
	public BackupChannel(String addr, String p, Cloud f) throws IOException {
13
		super(addr, p, f.getID(), f);
14
	}
15
	
16
	public void printLog(String msg) {
17
		System.out.println("MDB " + serverId + msg);
18
	}
19
	
20
//	This message is used to ensure that the chunk is backed up with the desired replication degree as follows. The initiator-peer collects the confirmation messages during a time interval of one second. If the number of confirmation messages it received up to the end of that interval is lower than the desired replication degree, it retransmits the backup message on the MDB channel, and doubles the time interval for receiving confirmation messages. This procedure is repeated up to a maximum number of five times, i.e. the initiator will send at most 5 PUTCHUNK messages per chunk.
21
//
22
//	Hint: Because UDP is not reliable, a peer that has stored a chunk must reply with a STORED message to every PUTCHUNK message it receives. Furthermore, the initiator-peer needs to keep track of which peers have responded.
23
//
24
//	A peer should also count the number of confirmation messages for each of the chunks it has stored and keep that count in non-volatile memory. This information can be useful if the peer runs out of disk space: in that event, the peer may try to free some space by evicting chunks whose actual replication degree is higher than the desired replication degree.
25
	
26
	@Override
27
	public void run () {
28
		printLog(": UP");
29
		int messageSenderId = -1;
30
		String messageType;
31
		
32
		try {
33
			joinChannelGroup();
34
		} catch (IOException e1) {
35
			e1.printStackTrace();
36
		}
37
		
38
		do {
39
			try {
40
				
41
				
42
				receivedMessage = captureData();
43
				
44
				if(receivedMessage == null) {
45
					printLog(" passing message.");
46
					continue;
47
				}
48
				
49
				messageType = receivedMessage.getHeader().getMessageType();
50
				
51
				messageSenderId = Integer.parseInt(receivedMessage.getHeader().getSenderId());
52
			
53
				if(messageType.equals(Constants.PUTCHUNK) && messageSenderId != getServerId()) {
54
					//printLog(" received: " + receivedMessage.getHeader().toString());
55

  
56
					if(father.storeChunk(receivedMessage)) {
57
						SendSoredMessageProtocol mp = new SendSoredMessageProtocol(buildMessage(), father);
58
						
59
						mp.start();
60
					}
61
					else {
62
						printLog(" ERROR: couldn't store chunk");
63
					}
64
				}
65
				else {
66
					//printLog(" ERROR: Invalid message header received -> " + receivedMessage.getHeader().toString());
67
				}
68
				
69
//				leaveGroupChannel();			
70
			}
71
			catch (IOException e) {
72
				printLog(" ERROR: " + "IOException -> " + e.getMessage());
73
			}
74
		} while(true);
75
	}
76
	
77
	public MessageHeader buildMessage() {
78
		MessageHeader response;
79
		
80
		response = buildResponseMessage();
81

  
82
		return response;
83
	}
84
	
85
	public MessageHeader buildResponseMessage() {
86
		MessageHeader response;
87
	    String[] headerElements = new String[Constants.STORED_N_ARGS];
88
	    
89
	    headerElements[0] = Constants.STORED;
90
	    headerElements[1] = father.getProtocolVersion();
91
	    headerElements[2] = Integer.toString(father.getID());
92
	    headerElements[3] = receivedMessage.getHeader().getFileId();
93
	    headerElements[4] = receivedMessage.getHeader().getChunkNumber();
94
	    
95
	    response = new MessageHeader(headerElements);
96
		
97
		return response;
98
	}
99
}
service/Constants.java
1
package service;
2

  
3
public class Constants {
4
	public static final int MAX_CHUNK_SIZE = 64 * 1000;
5
	public static final int MAX_MESSAGE_SIZE = MAX_CHUNK_SIZE + 1024;
6
	public static final String CHUNKS_DIR = "../Project 1 -- Distributed Backup Service/peers/peer";
7
	public static final String DATABASE_DIR = "../Project 1 -- Distributed Backup Service/database/";
8
	public static final int MAX_NUMBER_OF_THREADS = 8;
9
	public static final String NORMAL_VERSION = "1.0";
10
	
11
	// Available header types
12
	public static final String PUTCHUNK = "PUTCHUNK";
13
	public static final String STORED = "STORED";
14
	public static final String GETCHUNK = "GETCHUNK";
15
	public static final String CHUNK = "CHUNK";
16
	public static final String DELETE = "DELETE";
17
	public static final String REMOVED = "REMOVED";
18
	
19
	
20
	// headers necessary arguments
21
	
22
	public static final int PUTCHUNK_N_ARGS = 6;
23
	public static final int STORED_N_ARGS = 5;
24
	public static final int GETCHUNK_N_ARGS = 5;
25
	public static final int CHUNK_N_ARGS = 5;
26
	public static final int DELETE_N_ARGS = 4;
27
	public static final int REMOVED_N_ARGS = 5;
28
	
29
	//'0xD''0xA'
30
	public static final byte CR = 0xD;
31
	public static final byte LF = 0xA;
32
	
33
	public static final int MAX_PUTCHUNK_MESSAGES = 5;
34
}
service/Peer.java
1
package service;
2

  
3
import java.io.IOException;
4
import java.nio.file.Files;
5
import java.nio.file.Path;
6
import java.nio.file.Paths;
7
import java.rmi.registry.Registry;
8
import java.rmi.registry.LocateRegistry;
9
import java.rmi.server.UnicastRemoteObject;
10
import java.util.ArrayList;
11
import java.util.List;
12
import java.util.concurrent.Callable;
13
import java.util.concurrent.ExecutionException;
14
import java.util.concurrent.ExecutorService;
15
import java.util.concurrent.Executors;
16
import java.util.concurrent.Future;
17
import java.util.concurrent.ScheduledThreadPoolExecutor;
18
import java.util.concurrent.TimeUnit;
19

  
20
import channels.Message;
21
import channels.MessageHeader;
22
import protocols.BackupChunkProtocol;
23
import protocols.RestoreChunksProtocol;
24

  
25
public class Peer implements RMI {
26

  
27
	ScheduledThreadPoolExecutor exec;
28
	private static Cloud cloud;
29

  
30
	public Peer() {}
31

  
32
	public static void main(String[] args) {
33

  
34
		try 
35
		{
36
			Peer peer = new Peer();
37
			RMI rmi = (RMI) UnicastRemoteObject.exportObject(peer, 0);
38
			String acess_point = args[2];
39
			Registry registry = LocateRegistry.getRegistry();
40
			registry.bind(acess_point, rmi);
41
			System.err.println("Peer ready");
42
		} 
43
		catch (Exception e) {
44
			System.err.println("Peer exception: " + e.toString());
45
			return;
46
		}
47

  
48
		if(args.length != 9) {
49
			printHelp();
50
			return;
51
		}
52

  
53
		try {
54
			Cloud aux = new Cloud(args[0], args[1]);
55
			cloud = aux.getPeerTools(args[0], args[1]);
56
			cloud.createControlRoom(args[3], args[4]);
57
			cloud.createBackupChannel(args[5], args[6]);
58
			cloud.createRestoreChannel(args[7], args[8]);
59

  
60
			String path = Constants.DATABASE_DIR;
61
			Path p = Paths.get(path);
62
			if (Files.exists(p)) 
63
			{
64
				try {
65
					cloud.loadFromDisk();
66
				} catch (Exception e) {
67
					e.printStackTrace();
68
				}
69

  
70
			}
71

  
72
			//			System.out.println(cloud.getStoredChunks().size());
73

  
74
		} catch (NumberFormatException | IOException e) {
75
			System.out.println(e.getMessage());
76
			return;
77
		}
78

  
79
		if(Cloud.checkOrCreateDirectory(Constants.CHUNKS_DIR + args[0] + "/") == -1) {
80
			System.out.println("Couldn't create peer chunks dir!");
81
		}
82

  
83
		cloud.activateChannels();
84
		System.out.println("Channels activated");
85
	}
86

  
87
	// the name of each multicast channel consists of the ip multicast address and
88
	// port, passed as cmd line arguments, followed by protocol version, server id
89
	// and service access point
90
	private static void printHelp() {
91
		System.out.println("java service.Peer SERVER_id VERSION ACCESS_POINT MC_ip MC_port MDB_ip MDB_port MDR_ip MDR_port");
92
	}
93

  
94
	public void saveState() {
95
		try {
96
			cloud.saveToDisk();
97
		} catch (IOException e) {
98
			e.printStackTrace();
99
		}
100
	}
101

  
102

  
103
	public synchronized String backup(String file_path, int replication_degree) 
104
	{  	
105
		FileChunker fc;
106
		Message m;
107
		MessageHeader h;
108
		int numberOfChunks;
109
		ExecutorService WORKER_THREAD_POOL;
110
		List<Callable<Object>> callables = new ArrayList<Callable<Object>>();
111
		List<Future<Object>> futures = null;
112
		ArrayList<Chunk> chunkHolder = null;
113
		String[] headerElements = new String[Constants.PUTCHUNK_N_ARGS];
114
		Chunk aux;
115
		int numberOfSteps;
116

  
117
		System.out.println("BACKUP request received");
118

  
119
		try {
120
			fc = new FileChunker(file_path, replication_degree);
121
			//		cloud.createDirectoryFile(fc);
122
		} 
123
		catch (IOException e) {
124
			System.out.println("ERROR chunking file -> " + e.getMessage());
125
			return "ERROR chunking file -> " + e.getMessage();
126
		}
127

  
128
		chunkHolder = fc.getChunks();
129
		numberOfChunks = chunkHolder.size();
130

  
131
		cloud.registerNumberOfFileChunks(fc.getFileID(), numberOfChunks);
132

  
133
		System.out.println("File splited into " + numberOfChunks + " chunks.");
134

  
135
		headerElements[0] = Constants.PUTCHUNK;
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff