128 lines
4.4 KiB
JavaScript
128 lines
4.4 KiB
JavaScript
'use strict';
|
||
|
||
// connection mixins
|
||
// implementation of http://dev.mysql.com/doc/internals/en/compression.html
|
||
|
||
const zlib = require('zlib');
|
||
const PacketParser = require('./packet_parser.js');
|
||
|
||
function handleCompressedPacket(packet) {
|
||
// eslint-disable-next-line consistent-this, no-invalid-this
|
||
const connection = this;
|
||
const deflatedLength = packet.readInt24();
|
||
const body = packet.readBuffer();
|
||
|
||
if (deflatedLength !== 0) {
|
||
connection.inflateQueue.push(task => {
|
||
zlib.inflate(body, (err, data) => {
|
||
if (err) {
|
||
connection._handleNetworkError(err);
|
||
return;
|
||
}
|
||
connection._bumpCompressedSequenceId(packet.numPackets);
|
||
connection._inflatedPacketsParser.execute(data);
|
||
task.done();
|
||
});
|
||
});
|
||
} else {
|
||
connection.inflateQueue.push(task => {
|
||
connection._bumpCompressedSequenceId(packet.numPackets);
|
||
connection._inflatedPacketsParser.execute(body);
|
||
task.done();
|
||
});
|
||
}
|
||
}
|
||
|
||
function writeCompressed(buffer) {
|
||
// http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html
|
||
// note: sending a MySQL Packet of the size 2^24−5 to 2^24−1 via compression
|
||
// leads to at least one extra compressed packet.
|
||
// (this is because "length of the packet before compression" need to fit
|
||
// into 3 byte unsigned int. "length of the packet before compression" includes
|
||
// 4 byte packet header, hence 2^24−5)
|
||
const MAX_COMPRESSED_LENGTH = 16777210;
|
||
let start;
|
||
if (buffer.length > MAX_COMPRESSED_LENGTH) {
|
||
for (start = 0; start < buffer.length; start += MAX_COMPRESSED_LENGTH) {
|
||
writeCompressed.call(
|
||
// eslint-disable-next-line no-invalid-this
|
||
this,
|
||
buffer.slice(start, start + MAX_COMPRESSED_LENGTH)
|
||
);
|
||
}
|
||
return;
|
||
}
|
||
|
||
// eslint-disable-next-line no-invalid-this, consistent-this
|
||
const connection = this;
|
||
|
||
let packetLen = buffer.length;
|
||
const compressHeader = Buffer.allocUnsafe(7);
|
||
|
||
// seqqueue is used here because zlib async execution is routed via thread pool
|
||
// internally and when we have multiple compressed packets arriving we need
|
||
// to assemble uncompressed result sequentially
|
||
(function(seqId) {
|
||
connection.deflateQueue.push(task => {
|
||
zlib.deflate(buffer, (err, compressed) => {
|
||
if (err) {
|
||
connection._handleFatalError(err);
|
||
return;
|
||
}
|
||
let compressedLength = compressed.length;
|
||
|
||
if (compressedLength < packetLen) {
|
||
compressHeader.writeUInt8(compressedLength & 0xff, 0);
|
||
compressHeader.writeUInt16LE(compressedLength >> 8, 1);
|
||
compressHeader.writeUInt8(seqId, 3);
|
||
compressHeader.writeUInt8(packetLen & 0xff, 4);
|
||
compressHeader.writeUInt16LE(packetLen >> 8, 5);
|
||
connection.writeUncompressed(compressHeader);
|
||
connection.writeUncompressed(compressed);
|
||
} else {
|
||
// http://dev.mysql.com/doc/internals/en/uncompressed-payload.html
|
||
// To send an uncompressed payload:
|
||
// - set length of payload before compression to 0
|
||
// - the compressed payload contains the uncompressed payload instead.
|
||
compressedLength = packetLen;
|
||
packetLen = 0;
|
||
compressHeader.writeUInt8(compressedLength & 0xff, 0);
|
||
compressHeader.writeUInt16LE(compressedLength >> 8, 1);
|
||
compressHeader.writeUInt8(seqId, 3);
|
||
compressHeader.writeUInt8(packetLen & 0xff, 4);
|
||
compressHeader.writeUInt16LE(packetLen >> 8, 5);
|
||
connection.writeUncompressed(compressHeader);
|
||
connection.writeUncompressed(buffer);
|
||
}
|
||
task.done();
|
||
});
|
||
});
|
||
})(connection.compressedSequenceId);
|
||
connection._bumpCompressedSequenceId(1);
|
||
}
|
||
|
||
function enableCompression(connection) {
|
||
connection._lastWrittenPacketId = 0;
|
||
connection._lastReceivedPacketId = 0;
|
||
|
||
connection._handleCompressedPacket = handleCompressedPacket;
|
||
connection._inflatedPacketsParser = new PacketParser(p => {
|
||
connection.handlePacket(p);
|
||
}, 4);
|
||
connection._inflatedPacketsParser._lastPacket = 0;
|
||
connection.packetParser = new PacketParser(packet => {
|
||
connection._handleCompressedPacket(packet);
|
||
}, 7);
|
||
|
||
connection.writeUncompressed = connection.write;
|
||
connection.write = writeCompressed;
|
||
|
||
const seqqueue = require('seq-queue');
|
||
connection.inflateQueue = seqqueue.createQueue();
|
||
connection.deflateQueue = seqqueue.createQueue();
|
||
}
|
||
|
||
module.exports = {
|
||
enableCompression: enableCompression
|
||
};
|