Project

General

Profile

Statistics
| Revision:

root / Peer.java @ 1

History | View | Annotate | Download (10.4 KB)

1
import java.io.File;
2
import java.io.FileInputStream;
3
import java.io.FileOutputStream;
4
import java.io.IOException;
5
import java.net.InetAddress;
6
import java.rmi.RemoteException;
7
import java.rmi.registry.LocateRegistry;
8
import java.rmi.registry.Registry;
9
import java.rmi.server.UnicastRemoteObject;
10
import java.util.concurrent.ConcurrentHashMap;
11

    
12
/**
13
 * Peer
14
 */
15
public class Peer extends Thread implements TestAppRemote {
16
    private int serverId;
17
    private String serviceAccessPoint;
18
    private int storageCapacity = 100000000; //100MB
19
    private int storageUsed = 0;
20

    
21
    //HashMap for all the chunks being backed up by this peer
22
    public ConcurrentHashMap<String, BackupFile> chunksMap = new ConcurrentHashMap<>();
23
    //HashMap for all the chunks retrieved to restore a file
24
    public ConcurrentHashMap<String, RestoreFile> restoreMap = new ConcurrentHashMap<>();
25
    //HashMap for all files asked to be deleted
26
    public ConcurrentHashMap<String, DeleteFile> deleteMap = new ConcurrentHashMap<>();
27
    //HashMap for all perceived replication degrees in the network
28
    public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Integer>> repDegMap = new ConcurrentHashMap<>();
29
    //HashMap for all files backed up in the network
30
    public ConcurrentHashMap<String, BackupFile> fileMap = new ConcurrentHashMap<>();
31

    
32

    
33

    
34
    private InetAddress MDRaddress;
35
    private int MDRport;
36
    private InetAddress MDBaddress;
37
    private int MDBport;
38
    private InetAddress MCaddress;
39
    private int MCport;
40

    
41
    public static void main(String[] args) throws NumberFormatException, IOException {
42
        if(!parseInputs(args))
43
            return;
44
        Peer peer = new Peer(args[0], args[1], args[2], args[3]);
45
        Peer obj = peer;
46
        TestAppRemote stub = (TestAppRemote) UnicastRemoteObject.exportObject(obj, 0);
47

    
48
        // Bind the remote object's stub in the registry
49
        Registry registry = LocateRegistry.getRegistry();
50
        registry.rebind(peer.getServiceAccessPoint(), stub);
51
    }
52

    
53
    public Peer(String serverId, String MC, String MDB, String MDR)
54
            throws NumberFormatException, IOException {
55
        this.serverId = Integer.parseInt(serverId);
56
        this.serviceAccessPoint = serverId;
57

    
58
        String[] MCinfo = MC.split("_");
59
        this.MCaddress = InetAddress.getByName(MCinfo[0]);
60
        this.MCport = Integer.parseInt(MCinfo[1]);
61
        MCListener mcListener = new MCListener(InetAddress.getByName(MCinfo[0]), Integer.parseInt(MCinfo[1]),
62
                this.serverId, this);
63
        new Thread(mcListener).start();
64

    
65
        String[] MDBinfo = MDB.split("_");
66
        this.MDBaddress = InetAddress.getByName(MDBinfo[0]);
67
        this.MDBport = Integer.parseInt(MDBinfo[1]);
68
        MDBListener mdbListener = new MDBListener(InetAddress.getByName(MDBinfo[0]), Integer.parseInt(MDBinfo[1]),
69
                this.serverId, InetAddress.getByName(MCinfo[0]), Integer.parseInt(MCinfo[1]), this);
70
        new Thread(mdbListener).start();
71

    
72
        String[] MDRinfo = MDR.split("_");
73
        this.MDRaddress = InetAddress.getByName(MDRinfo[0]);
74
        this.MDRport = Integer.parseInt(MDRinfo[1]);
75
        MDRListener mdrListener = new MDRListener(InetAddress.getByName(MDRinfo[0]), Integer.parseInt(MDRinfo[1]),
76
                this.serverId, InetAddress.getByName(MCinfo[0]), Integer.parseInt(MCinfo[1]), this);
77
        new Thread(mdrListener).start();
78

    
79
        fillChunksMap();
80
        File dir = new File("Peer-" + this.getSenderId() + File.separator + "Backups");
81
        dir.mkdirs();
82
    }
83

    
84
    // Saves the chunk under the correct directory: "Chunks\FILE_ID\CHUNK_ID"
85
    public void saveChunk(Chunk chunk) throws IOException {
86
        // Create the directory if needed
87
        File dir = new File(chunk.getDir());
88
        dir.mkdirs();
89

    
90
        FileOutputStream outputStream = new FileOutputStream(chunk.getPath());
91
        outputStream.write(chunk.getData());
92

    
93
        outputStream.close();
94
    }
95

    
96
    // Get current storage usage in Bytes
97
    public long getCurrentStorageSize(File dir) {
98
        File[] files = dir.listFiles();
99

    
100
        int count = files.length;
101
        long length = 0;
102

    
103
        for (int i = 0; i < count; i++) {
104
            if (files[i].isFile()) {
105
                length += files[i].length();
106
            } else {
107
                length += getCurrentStorageSize(files[i]);
108
            }
109
        }
110
        return length;
111
    }
112

    
113
    private static boolean parseInputs(String[] args) {
114
        try {
115
            Integer.parseInt(args[0]);
116
        } catch (NumberFormatException e) {
117
            System.out.println("Invalid arguments!: serverId must be a valid int");
118
            return false;
119
        }
120

    
121
        String[] MCinfo = args[1].split("_");
122
        if (MCinfo.length != 2) {
123
            System.out.println("Invalid arguments!: MC name must be of the format <ip_address>_<port>");
124
            return false;
125
        }
126
        try {
127
            InetAddress.getByName(MCinfo[0]);
128
        } catch (Exception e) {
129
            System.out.println("Invalid arguments!: MC IP address must be a valid IP");
130
            return false;
131
        }
132
        try {
133
            Integer.parseInt(MCinfo[1]);
134
        } catch (Exception e) {
135
            System.out.println("Invalid arguments!: MC socket must be valid");
136
            return false;
137
        }
138

    
139
        String[] MDBinfo = args[2].split("_");
140
        if (MDBinfo.length != 2) {
141
            System.out.println("Invalid arguments!: MDB name must be of the format <ip_address>_<port>");
142
            return false;
143
        }
144
        try {
145
            InetAddress.getByName(MDBinfo[0]);
146
        } catch (Exception e) {
147
            System.out.println("Invalid arguments!: MDB IP address must be a valid IP");
148
            return false;
149
        }
150
        try {
151
            Integer.parseInt(MDBinfo[1]);
152
        } catch (Exception e) {
153
            System.out.println("Invalid arguments!: MDB socket must be valid");
154
            return false;
155
        }
156

    
157
        String[] MDRinfo = args[3].split("_");
158
        if (MDRinfo.length != 2) {
159
            System.out.println("Invalid arguments!: MDR name must be of the format <ip_address>_<port>");
160
            return false;
161
        }
162
        try {
163
            InetAddress.getByName(MDRinfo[0]);
164
        } catch (Exception e) {
165
            System.out.println("Invalid arguments!: MDR IP address must be a valid IP");
166
            return false;
167
        }
168
        try {
169
            Integer.parseInt(MDRinfo[1]);
170
        } catch (Exception e) {
171
            System.out.println("Invalid arguments!: MDR socket must be valid");
172
            return false;
173
        }
174

    
175
        return true;
176
    }
177

    
178
    @Override
179
    public void backup(String pathname, int replicationDeg, boolean enhanced) throws RemoteException {
180
        print("starting BACKUP protocol");
181
        if (enhanced) {
182
            new Thread(new BackupTask(pathname, replicationDeg, "2.0", this)).start();
183
        } else {
184
            new Thread(new BackupTask(pathname, replicationDeg, "1.0", this)).start();
185
        }
186
    }
187

    
188
    @Override
189
    public void restore(String pathname, boolean enhanced) throws RemoteException {
190
        print("starting RESTORE protocol");
191
        if (enhanced) {
192
            new Thread(new RestoreTask(pathname, "2.0", this)).start();
193
        } else {
194
            new Thread(new RestoreTask(pathname, "1.0", this)).start();
195
        }        
196
    }
197

    
198
    @Override
199
    public void delete(String pathname, boolean enhanced) throws RemoteException {
200
        print("starting DELETE protocol");
201
        if (enhanced) {
202
            new Thread(new DeleteTask(pathname, "2.0", this)).start();
203
        } else {
204
            new Thread(new DeleteTask(pathname, "1.0", this)).start();
205
        }
206
    }
207

    
208
    @Override
209
    public void reclaim(int diskspace) throws RemoteException {
210
        print("starting RECLAIM protocol");
211
        new Thread(new ReclaimTask(diskspace, this)).start();
212
    }
213

    
214
    @Override
215
    public void state() throws RemoteException {
216
        print("starting STATE protocol");
217
        new Thread(new StateTask(this)).start();
218
    }
219

    
220
    @Override
221
    public void chunk(String fileId, byte[] chunk) throws RemoteException {
222
        this.print("Enhanced CHUNK protocol started");
223
        this.restoreMap.get(fileId).getChunks().add(chunk);
224
        this.print("Enhanced CHUNK protocol finished");
225
    }
226

    
227
    public String getServiceAccessPoint() {
228
        return this.serviceAccessPoint;
229
    }
230

    
231
    public void print(String message) {
232
        System.out.println("[PEER-" + this.serverId + "-Service] " + message);
233
    }
234

    
235
    public int getSenderId() {
236
        return this.serverId;
237
    }
238

    
239
    public InetAddress getMDBaddress() {
240
        return this.MDBaddress;
241
    }
242

    
243
    public int getMDBport() {
244
        return this.MDBport;
245
    }
246

    
247
    public InetAddress getMDRaddress() {
248
        return this.MDRaddress;
249
    }
250

    
251
    public int getMDRport() {
252
        return this.MDRport;
253
    }
254

    
255
    public InetAddress getMCaddress() {
256
        return this.MCaddress;
257
    }
258

    
259
    public int getMCport() {
260
        return this.MCport;
261
    }
262

    
263
    public ConcurrentHashMap<String, BackupFile> getFileMap() {
264
                return this.fileMap;
265
    }
266

    
267
    public void addToFileMap(String key, BackupFile file){
268
        this.fileMap.put(key, file);
269
    }
270
    
271
    private void fillChunksMap() throws IOException {
272
        File peerDir = new File("Peer-"+this.serverId+File.separator+"Chunks");
273
        peerDir.mkdirs();
274
        for (File file : peerDir.listFiles()) {
275
            BackupFile backupFile = new BackupFile(file.getName());
276
            
277
            for (File chunk : file.listFiles()) {
278
                FileInputStream stream = new FileInputStream(chunk);
279
                byte[] data = new byte[(int)chunk.length()];
280
                stream.read(data);
281
                backupFile.addChunk(Integer.parseInt(chunk.getName()), new Chunk(backupFile, Integer.parseInt(chunk.getName()), data));
282
                this.addStorage(data.length);
283
                stream.close();
284
            }
285

    
286
            this.chunksMap.put(file.getName(), backupFile);
287
        }
288
    }
289

    
290
    public long addStorage(long size) {
291
        this.storageUsed += size;
292

    
293
        if(this.storageUsed > this.storageCapacity)
294
                print("Storage overload!");
295

    
296
        return this.storageUsed;
297
    }
298

    
299
    public long removeStorage(long size) {
300
        this.storageUsed -= size;
301

    
302
        if(this.storageUsed < 0)
303
                print("Negative storage...");
304

    
305
        return this.storageUsed;
306
    }
307

    
308
    public int getStorageUsed() {
309
        return this.storageUsed;
310
    }
311

    
312
    public int getStorageCapacity() {
313
        return this.storageCapacity;
314
    }
315

    
316
    public void setStorageCapacity(int cap) {
317
        this.storageCapacity = cap;
318
    }
319
}