blob: 0bb8d7b0a976b12b72cbd48cf337107a82745990 [file] [log] [blame]
Stuart McCullochbb014372012-06-07 21:57:32 +00001package aQute.libg.cafs;
2
3import static aQute.lib.io.IO.*;
4
5import java.io.*;
6import java.nio.channels.*;
7import java.security.*;
8import java.util.*;
9import java.util.concurrent.atomic.*;
10import java.util.zip.*;
11
12import aQute.lib.index.*;
13import aQute.libg.cryptography.*;
14
15/**
16 * CAFS implements a SHA-1 based file store. The basic idea is that every file
17 * in the universe has a unique SHA-1. Hard to believe but people smarter than
18 * me have come to that conclusion. This class maintains a compressed store of
19 * SHA-1 identified files. So if you have the SHA-1, you can get the contents.
20 *
21 * This makes it easy to store a SHA-1 instead of the whole file or maintain a
22 * naming scheme. An added advantage is that it is always easy to verify you get
23 * the right stuff. The SHA-1 Content Addressable File Store is the core
24 * underlying idea in Git.
25 */
26public class CAFS implements Closeable, Iterable<SHA1> {
27 final static byte[] CAFS = "CAFS".getBytes();
28 final static byte[] CAFE = "CAFE".getBytes();
29 final static String INDEXFILE = "index.idx";
30 final static String STOREFILE = "store.cafs";
31 final static String ALGORITHM = "SHA-1";
32 final static int KEYLENGTH = 20;
33 final static int HEADERLENGTH = 4 // CAFS
34 + 4 // flags
35 + 4 // compressed length
36 + 4 // uncompressed length
37 + KEYLENGTH // key
38 + 2 // header checksum
39 ;
40
41 final File home;
42 Index index;
43 RandomAccessFile store;
44 FileChannel channel;
45
46 /**
47 * Constructor for a Content Addressable File Store
48 *
49 * @param home
50 * @param create
51 * @throws Exception
52 */
53 public CAFS(File home, boolean create) throws Exception {
54 this.home = home;
55 if (!home.isDirectory()) {
56 if (create) {
57 home.mkdirs();
58 } else
59 throw new IllegalArgumentException("CAFS requires a directory with create=false");
60 }
61
62 index = new Index(new File(home, INDEXFILE), KEYLENGTH);
63 store = new RandomAccessFile(new File(home, STOREFILE), "rw");
64 channel = store.getChannel();
65 if (store.length() < 0x100) {
66 if (create) {
67 store.write(CAFS);
68 for (int i = 1; i < 64; i++)
69 store.writeInt(0);
70 channel.force(true);
71 } else
72 throw new IllegalArgumentException("Invalid store file, length is too short "
73 + store);
74 System.err.println(store.length());
75 }
76 store.seek(0);
77 if (!verifySignature(store, CAFS))
78 throw new IllegalArgumentException("Not a valid signature: CAFS at start of file");
79
80 }
81
82 /**
83 * Store an input stream in the CAFS while calculating and returning the
84 * SHA-1 code.
85 *
86 * @param in
87 * The input stream to store.
88 * @return The SHA-1 code.
89 * @throws Exception
90 * if anything goes wrong
91 */
92 public SHA1 write(InputStream in) throws Exception {
93
94 Deflater deflater = new Deflater();
95 MessageDigest md = MessageDigest.getInstance(ALGORITHM);
96 DigestInputStream din = new DigestInputStream(in, md);
97 ByteArrayOutputStream bout = new ByteArrayOutputStream();
98 DeflaterOutputStream dout = new DeflaterOutputStream(bout, deflater);
99 copy(din, dout);
100
101 synchronized (store) {
102 // First check if it already exists
103 SHA1 sha1 = new SHA1(md.digest());
104
105 long search = index.search(sha1.digest());
106 if (search > 0)
107 return sha1;
108
109 byte[] compressed = bout.toByteArray();
110
111 // we need to append this file to our store,
112 // which requires a lock. However, we are in a race
113 // so others can get the lock between us getting
114 // the length and someone else getting the lock.
115 // So we must verify after we get the lock that the
116 // length was unchanged.
117 FileLock lock = null;
118 try {
119 long insertPoint;
120 int recordLength = compressed.length + HEADERLENGTH;
121
122 while (true) {
123 insertPoint = store.length();
124 lock = channel.lock(insertPoint, recordLength, false);
125
126 if (store.length() == insertPoint)
127 break;
128
129 // We got the wrong lock, someone else
130 // got in between reading the length
131 // and locking
132 lock.release();
133 }
134 int totalLength = deflater.getTotalIn();
135 store.seek(insertPoint);
136 update(sha1.digest(), compressed, totalLength);
137 index.insert(sha1.digest(), insertPoint);
138 return sha1;
139 } finally {
140 if (lock != null)
141 lock.release();
142 }
143 }
144 }
145
146 /**
147 * Read the contents of a sha 1 key.
148 *
149 * @param sha1
150 * The key
151 * @return An Input Stream on the content or null of key not found
152 * @throws Exception
153 */
154 public InputStream read(final SHA1 sha1) throws Exception {
155 synchronized (store) {
156 long offset = index.search(sha1.digest());
157 if (offset < 0)
158 return null;
159
160 byte[] readSha1;
161 byte[] buffer;
162 store.seek(offset);
163 if (!verifySignature(store, CAFE))
164 throw new IllegalArgumentException("No signature");
165
166 int flags =store.readInt();
167 int compressedLength = store.readInt();
168 int uncompressedLength = store.readInt();
169 readSha1 = new byte[KEYLENGTH];
170 store.read(readSha1);
171 SHA1 rsha1 = new SHA1(readSha1);
172
173 if (!sha1.equals(rsha1))
174 throw new IOException("SHA1 read and asked mismatch: " + sha1 + " " + rsha1);
175
176 short crc = store.readShort(); // Read CRC
177 if ( crc != checksum(flags,compressedLength, uncompressedLength, readSha1))
178 throw new IllegalArgumentException("Invalid header checksum: " + sha1);
179
180 buffer = new byte[compressedLength];
181 store.readFully(buffer);
182 return getSha1Stream(sha1, buffer, uncompressedLength);
183 }
184 }
185
186 public boolean exists(byte[] sha1) throws Exception {
187 return index.search(sha1) >= 0;
188 }
189
190 public void reindex() throws Exception {
191 long length;
192 synchronized (store) {
193 length = store.length();
194 if (length < 0x100)
195 throw new IllegalArgumentException(
196 "Store file is too small, need to be at least 256 bytes: " + store);
197 }
198
199 RandomAccessFile in = new RandomAccessFile(new File(home, STOREFILE), "r");
200 try {
201 byte[] signature = new byte[4];
202 in.readFully(signature);
203 if (!Arrays.equals(CAFS, signature))
204 throw new IllegalArgumentException("Store file does not start with CAFS: " + in);
205
206 in.seek(0x100);
207 File ixf = new File(home, "index.new");
208 Index index = new Index(ixf, KEYLENGTH);
209
210 while (in.getFilePointer() < length) {
211 long entry = in.getFilePointer();
212 SHA1 sha1 = verifyEntry(in);
213 index.insert(sha1.digest(), entry);
214 }
215
216 synchronized (store) {
217 index.close();
218 File indexFile = new File(home, INDEXFILE);
219 ixf.renameTo(indexFile);
220 this.index = new Index(indexFile, KEYLENGTH);
221 }
222 } finally {
223 in.close();
224 }
225 }
226
227 public void close() throws IOException {
228 synchronized (store) {
229 try {
230 store.close();
231 } finally {
232 index.close();
233 }
234 }
235 }
236
237 private SHA1 verifyEntry(RandomAccessFile in) throws IOException, NoSuchAlgorithmException {
238 byte[] signature = new byte[4];
239 in.readFully(signature);
240 if (!Arrays.equals(CAFE, signature))
241 throw new IllegalArgumentException("File is corrupted: " + in);
242
243 /* int flags = */in.readInt();
244 int compressedSize = in.readInt();
245 int uncompressedSize = in.readInt();
246 byte[] key = new byte[KEYLENGTH];
247 in.readFully(key);
248 SHA1 sha1 = new SHA1(key);
249
250 byte[] buffer = new byte[compressedSize];
251 in.readFully(buffer);
252
253 InputStream xin = getSha1Stream(sha1, buffer, uncompressedSize);
254 xin.skip(uncompressedSize);
255 xin.close();
256 return sha1;
257 }
258
259 private boolean verifySignature(DataInput din, byte[] org) throws IOException {
260 byte[] read = new byte[org.length];
261 din.readFully(read);
262 return Arrays.equals(read, org);
263 }
264
265 private InputStream getSha1Stream(final SHA1 sha1, byte[] buffer, final int total)
266 throws NoSuchAlgorithmException {
267 ByteArrayInputStream in = new ByteArrayInputStream(buffer);
268 InflaterInputStream iin = new InflaterInputStream(in) {
269 int count = 0;
270 final MessageDigest digestx = MessageDigest.getInstance(ALGORITHM);
271 final AtomicBoolean calculated = new AtomicBoolean();
272
273 @Override public int read(byte[] data, int offset, int length) throws IOException {
274 int size = super.read(data, offset, length);
275 if (size <= 0)
276 eof();
277 else {
278 count+=size;
279 this.digestx.update(data, offset, size);
280 }
281 return size;
282 }
283
284 public int read() throws IOException {
285 int c = super.read();
286 if (c < 0)
287 eof();
288 else {
289 count++;
290 this.digestx.update((byte) c);
291 }
292 return c;
293 }
294
295 void eof() throws IOException {
296 if ( calculated.getAndSet(true))
297 return;
298
299 if ( count != total )
300 throw new IOException("Counts do not match. Expected to read: " + total + " Actually read: " + count);
301
302 SHA1 calculatedSha1 = new SHA1(digestx.digest());
303 if (!sha1.equals(calculatedSha1))
304 throw ( new IOException("SHA1 caclulated and asked mismatch, asked: "
305 + sha1 + ", \nfound: " +calculatedSha1));
306 }
307
308 public void close() throws IOException {
309 eof();
310 super.close();
311 }
312 };
313 return iin;
314 }
315
316 /**
317 * Update a record in the store, assuming the store is at the right
318 * position.
319 *
320 * @param sha1
321 * The checksum
322 * @param compressed
323 * The compressed length
324 * @param totalLength
325 * The uncompressed length
326 * @throws IOException
327 * The exception
328 */
329 private void update(byte[] sha1, byte[] compressed, int totalLength) throws IOException {
330 //System.err.println("pos: " + store.getFilePointer());
331 store.write(CAFE); // 00-03 Signature
332 store.writeInt(0); // 04-07 Flags for the future
333 store.writeInt(compressed.length); // 08-11 Length deflated data
334 store.writeInt(totalLength); // 12-15 Length
335 store.write(sha1); // 16-35
336 store.writeShort( checksum(0,compressed.length, totalLength, sha1));
337 store.write(compressed);
338 channel.force(false);
339 }
340
341
342
343 private short checksum(int flags, int compressedLength, int totalLength, byte[] sha1) {
344 CRC32 crc = new CRC32();
345 crc.update(flags);
346 crc.update(flags>>8);
347 crc.update(flags>>16);
348 crc.update(flags>>24);
349 crc.update(compressedLength);
350 crc.update(compressedLength>>8);
351 crc.update(compressedLength>>16);
352 crc.update(compressedLength>>24);
353 crc.update(totalLength);
354 crc.update(totalLength>>8);
355 crc.update(totalLength>>16);
356 crc.update(totalLength>>24);
357 crc.update(sha1);
358 return (short) crc.getValue();
359 }
360
361 public Iterator<SHA1> iterator() {
362
363 return new Iterator<SHA1>() {
364 long position = 0x100;
365
366 public boolean hasNext() {
367 synchronized(store) {
368 try {
369 return position < store.length();
370 } catch (IOException e) {
371 throw new RuntimeException(e);
372 }
373 }
374 }
375
376 public SHA1 next() {
377 synchronized(store) {
378 try {
379 store.seek(position);
380 byte [] signature = new byte[4];
381 store.readFully(signature);
382 if ( !Arrays.equals(CAFE, signature))
383 throw new IllegalArgumentException("No signature");
384
385 int flags = store.readInt();
386 int compressedLength = store.readInt();
387 int totalLength = store.readInt();
388 byte []sha1 = new byte[KEYLENGTH];
389 store.readFully(sha1);
390 short crc = store.readShort();
391 if ( crc != checksum(flags,compressedLength, totalLength, sha1))
392 throw new IllegalArgumentException("Header checksum fails");
393
394 position += HEADERLENGTH + compressedLength;
395 return new SHA1(sha1);
396 } catch (IOException e) {
397 throw new RuntimeException(e);
398 }
399 }
400 }
401
402 public void remove() {
403 throw new UnsupportedOperationException("Remvoe not supported, CAFS is write once");
404 }
405 };
406 }
407
408
409 public boolean isEmpty() throws IOException {
410 synchronized(store) {
411 return store.getFilePointer() <= 256;
412 }
413 }
414}