202 lines
5.8 KiB
JavaScript
202 lines
5.8 KiB
JavaScript
|
"use strict";
|
||
|
|
||
|
var sha1 = require('git-sha1');
|
||
|
var applyDelta = require('../lib/apply-delta.js');
|
||
|
var codec = require('../lib/object-codec.js');
|
||
|
var decodePack = require('../lib/pack-codec.js').decodePack;
|
||
|
var encodePack = require('../lib/pack-codec.js').encodePack;
|
||
|
var makeChannel = require('culvert');
|
||
|
|
||
|
module.exports = function (repo) {
|
||
|
// packChannel is a writable culvert channel {put,drain} containing raw packfile binary data
|
||
|
// opts can contain "onProgress" or "onError" hook functions.
|
||
|
// callback will be called with a list of all unpacked hashes on success.
|
||
|
repo.unpack = unpack; // (packChannel, opts) => hashes
|
||
|
|
||
|
// hashes is an array of hashes to pack
|
||
|
// packChannel will be a readable culvert channel {take} containing raw packfile binary data
|
||
|
repo.pack = pack; // (hashes, opts) => packChannel
|
||
|
};
|
||
|
|
||
|
function unpack(packChannel, opts, callback) {
|
||
|
/*jshint validthis:true*/
|
||
|
if (!callback) return unpack.bind(this, packChannel, opts);
|
||
|
|
||
|
packChannel = applyParser(packChannel, decodePack, callback);
|
||
|
|
||
|
var repo = this;
|
||
|
|
||
|
var version, num, numDeltas = 0, count = 0, countDeltas = 0;
|
||
|
var done, startDeltaProgress = false;
|
||
|
|
||
|
// hashes keyed by offset for ofs-delta resolving
|
||
|
var hashes = {};
|
||
|
// key is hash, boolean is cached "has" value of true or false
|
||
|
var has = {};
|
||
|
// key is hash we're waiting for, value is array of items that are waiting.
|
||
|
var pending = {};
|
||
|
|
||
|
return packChannel.take(onStats);
|
||
|
|
||
|
function onDone(err) {
|
||
|
if (done) return;
|
||
|
done = true;
|
||
|
if (err) return callback(err);
|
||
|
return callback(null, values(hashes));
|
||
|
}
|
||
|
|
||
|
function onStats(err, stats) {
|
||
|
if (err) return onDone(err);
|
||
|
version = stats.version;
|
||
|
num = stats.num;
|
||
|
packChannel.take(onRead);
|
||
|
}
|
||
|
|
||
|
function objectProgress(more) {
|
||
|
if (!more) startDeltaProgress = true;
|
||
|
var percent = Math.round(count / num * 100);
|
||
|
return opts.onProgress("Receiving objects: " + percent + "% (" + (count++) + "/" + num + ") " + (more ? "\r" : "\n"));
|
||
|
}
|
||
|
|
||
|
function deltaProgress(more) {
|
||
|
if (!startDeltaProgress) return;
|
||
|
var percent = Math.round(countDeltas / numDeltas * 100);
|
||
|
return opts.onProgress("Applying deltas: " + percent + "% (" + (countDeltas++) + "/" + numDeltas + ") " + (more ? "\r" : "\n"));
|
||
|
}
|
||
|
|
||
|
function onRead(err, item) {
|
||
|
if (err) return onDone(err);
|
||
|
if (opts.onProgress) objectProgress(item);
|
||
|
if (item === undefined) return onDone();
|
||
|
if (item.size !== item.body.length) {
|
||
|
return onDone(new Error("Body size mismatch"));
|
||
|
}
|
||
|
if (item.type === "ofs-delta") {
|
||
|
numDeltas++;
|
||
|
item.ref = hashes[item.offset - item.ref];
|
||
|
return resolveDelta(item);
|
||
|
}
|
||
|
if (item.type === "ref-delta") {
|
||
|
numDeltas++;
|
||
|
return checkDelta(item);
|
||
|
}
|
||
|
return saveValue(item);
|
||
|
}
|
||
|
|
||
|
function resolveDelta(item) {
|
||
|
if (opts.onProgress) deltaProgress();
|
||
|
return repo.loadRaw(item.ref, function (err, buffer) {
|
||
|
if (err) return onDone(err);
|
||
|
if (!buffer) return onDone(new Error("Missing base image at " + item.ref));
|
||
|
var target = codec.deframe(buffer);
|
||
|
item.type = target.type;
|
||
|
item.body = applyDelta(item.body, target.body);
|
||
|
return saveValue(item);
|
||
|
});
|
||
|
}
|
||
|
|
||
|
function checkDelta(item) {
|
||
|
var hasTarget = has[item.ref];
|
||
|
if (hasTarget === true) return resolveDelta(item);
|
||
|
if (hasTarget === false) return enqueueDelta(item);
|
||
|
return repo.hasHash(item.ref, function (err, value) {
|
||
|
if (err) return onDone(err);
|
||
|
has[item.ref] = value;
|
||
|
if (value) return resolveDelta(item);
|
||
|
return enqueueDelta(item);
|
||
|
});
|
||
|
}
|
||
|
|
||
|
function saveValue(item) {
|
||
|
var buffer = codec.frame(item);
|
||
|
var hash = sha1(buffer);
|
||
|
hashes[item.offset] = hash;
|
||
|
has[hash] = true;
|
||
|
if (hash in pending) {
|
||
|
// I have yet to come across a pack stream that actually needs this.
|
||
|
// So I will only implement it when I have concrete data to test against.
|
||
|
console.error({
|
||
|
list: pending[hash],
|
||
|
item: item
|
||
|
});
|
||
|
throw "TODO: pending value was found, resolve it";
|
||
|
}
|
||
|
return repo.saveRaw(hash, buffer, onSave);
|
||
|
}
|
||
|
|
||
|
function onSave(err) {
|
||
|
if (err) return callback(err);
|
||
|
packChannel.take(onRead);
|
||
|
}
|
||
|
|
||
|
function enqueueDelta(item) {
|
||
|
var list = pending[item.ref];
|
||
|
if (!list) pending[item.ref] = [item];
|
||
|
else list.push(item);
|
||
|
packChannel.take(onRead);
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
// TODO: Implement delta refs to reduce stream size
|
||
|
function pack(hashes, opts, callback) {
|
||
|
/*jshint validthis:true*/
|
||
|
if (!callback) return pack.bind(this, hashes, opts);
|
||
|
var repo = this;
|
||
|
var i = 0, first = true, done = false;
|
||
|
return callback(null, applyParser({ take: take }, encodePack));
|
||
|
|
||
|
function take(callback) {
|
||
|
if (done) return callback();
|
||
|
if (first) return readFirst(callback);
|
||
|
var hash = hashes[i++];
|
||
|
if (hash === undefined) {
|
||
|
return callback();
|
||
|
}
|
||
|
repo.loadRaw(hash, function (err, buffer) {
|
||
|
if (err) return callback(err);
|
||
|
if (!buffer) return callback(new Error("Missing hash: " + hash));
|
||
|
// Reframe with pack format header
|
||
|
callback(null, codec.deframe(buffer));
|
||
|
});
|
||
|
}
|
||
|
|
||
|
function readFirst(callback) {
|
||
|
first = false;
|
||
|
callback(null, {num: hashes.length});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function values(object) {
|
||
|
var keys = Object.keys(object);
|
||
|
var length = keys.length;
|
||
|
var out = new Array(length);
|
||
|
for (var i = 0; i < length; i++) {
|
||
|
out[i] = object[keys[i]];
|
||
|
}
|
||
|
return out;
|
||
|
}
|
||
|
|
||
|
|
||
|
function applyParser(stream, parser, onError) {
|
||
|
var extra = makeChannel();
|
||
|
extra.put = parser(extra.put);
|
||
|
stream.take(onData);
|
||
|
|
||
|
function onData(err, item) {
|
||
|
if (err) return onError(err);
|
||
|
var more;
|
||
|
try { more = extra.put(item); }
|
||
|
catch (err) { return onError(err); }
|
||
|
if (more) stream.take(onData);
|
||
|
else extra.drain(onDrain);
|
||
|
}
|
||
|
|
||
|
function onDrain(err) {
|
||
|
if (err) return onError(err);
|
||
|
stream.take(onData);
|
||
|
}
|
||
|
|
||
|
return { take: extra.take };
|
||
|
}
|