API/api.medcify.app/node_modules/culvert/README.md
2022-09-26 11:41:44 +05:30

103 lines
2.5 KiB
Markdown

Culvert
=======
Channel for easy streaming of work between complex logics.
This is used in place of streams for CSP style flow. I use it in js-git for network and file streams.
Usually, you'll want to split sides to create a duplex channel.
```js
var makeChannel = require('culvert');
var serverChannel = makeChannel();
var clientChannel = makeChannel();
function connect(host, port) {
// This represents the server-side of the duplex pipe
var socket = {
put: serverChannel.put,
drain: serverChannel.drain,
take: cientChannel.drain
};
// When we want to send data to the consumer...
socket.put(someData);
// When we want to read from the consumer...
socket.take(function (err, item) {});
// Return the client's end of the pipe
return {
put: clientChannel.put,
drain: clientChannel.drain,
take: serverChannel.take
};
}
```
If you want/need to preserve back-pressure and honor the buffer limit,
make sure to wait for drain when `put` returns false.
```js
// Start a read
socket.take(onData);
function onData(err, item) {
if (err) throw err;
if (item === undefined) {
// End stream when nothing comes out
console.log("done");
}
else if (socket.put(item)) {
// If put returned true, keep reading
socket.take(onData);
}
else {
// Otherwise pause and wait for drain
socket.drain(onDrain);
}
}
function onDrain(err) {
if (err) throw err;
// Resume reading
socket.take(onData);
}
```
If you're using continuables and generators, it's much nicer syntax.
```js
var item;
while (item = yield socket.take, item !== undefined) {
if (!socket.put(item)) yield socket.drain;
}
console.log("done");
```
Also the continuable version won't blow the stack if lots of events come in on the same tick.
## makeChannel(bufferSize, monitor)
Create a new channel.
The optional bufferSize is how many items can be in the queue and still be considered not full.
The optional monitor function will get called with `(type, item)` where `type` is either "put" or "take" and `item` is the value being put or taken.
## channel.put(item) -> more
This is a sync function. You can add as many items to the channel as you want and it will queue them up.
This returns `true` when the queue is smaller than bufferSize, it returns false if you should wait for drain.
## channel.drain(callback)
Drain is a reusable continuable. Use this when you want to wait for the buffer to be below the bufferSize mark.
## channel.take(callback)
Take is for reading. The callback will have the next item. It may call sync or it may be later.