Module stream


Input*[T] = ref object
  bufferSize: int
  queue: Queue[T]
  onRecvReady: Event[void]
  onSendReady: Event[void]
  sendClosed: bool
  recvClosed: bool
  sendCloseException: ref Exception
  recvCloseException: ref Exception
  when not defined(release):
      marker: uint32

Output* {.
borrow: `.`
.}[T] = distinct Input[T]
Pipe*[T] = ref object {.
.} input*: Input[T] output*: Output[T]
CloseException* = object of Exception
Just close the stream/provider, without any error.
LengthInput*[T] = tuple[length: int64, stream: Input[T]]
  Source Edit


JustClose* = (ref CloseException)(msg: "just close")
proc `$`*[T](input: Input[T]): string
proc `$`*[T](output: Output[T]): string
proc newPipe*[T](input: Input[T]; output: Output[T]): Pipe[T]
proc newInputOutputPair*[T](bufferSize = 0): tuple[input: Input[T], output: Output[T]]
Create a new stream/provider pair. Proving values to provider will make them available on stream. If more than bufferSize items are provided without being consumed by stream, provide operation blocks. If bufferSize == 0 is the implementation specific default is chosen.
proc newPipe*[T](typ: typedesc[T]): tuple[a: Pipe[T], b: Pipe[T]]
proc `onRecvReady`*[T](self: Input[T]): auto
proc `onSendReady`*[T](self: Output[T]): auto
proc getRecvCloseException*(self: Output): auto
proc getSendCloseException*(self: Input): auto
proc isRecvClosed*(self: Output): bool
proc isSendClosed*(self: Input): bool
proc sendSome*[T](self: Output[T]; data: ConstView[T]): int
Provides some items pointed by view data. Returns how many items were actualy provided.
proc sendAll*[T](self: Output[T]; data: seq[T] | string): Future[void]
Provides items from data. Returns Future that finishes when all items are provided.
proc send*[T](self: Output[T]; item: T): Future[void]
Provides a single item. Returns Future that finishes when the item is pushed into queue.
proc sendClose*(self: Output; exc: ref Exception)
Closes the output stream -- signals that no more items will be provided.
proc waitForRecvClose*[T](self: Output[T]; callback: proc ())
proc recvClose*[T](self: Input[T]; exc: ref Exception)
Closes the input stream -- signals that no more items will be received.
proc close*[T](self: Pipe[T]; exc: ref Exception)
proc freeBufferSize*[T](self: Output[T]): int
How many items can be pushed to the queue without blocking?
proc dataAvailable*[T](self: Input[T]): int
How many items can be received from the queue without blocking?
proc peekMany*[T](self: Input[T]): ConstView[T]
Look at several items from the input.
proc discardItems*[T](self: Input[T]; count: int)
Discard count items from the stream. Often used after peekMany.
proc waitForData*[T](self: Input[T]; allowSpurious = false): Future[void]
proc waitForSpace*[T](self: Output[T]; allowSpurious = false): Future[void]
Waits until space is available in the buffer. For use with sendSome and freeBufferSize.
proc receiveSomeInto*[T](self: Input[T]; target: View[T]): int
Pops all available data into target, but not more that the length of target. Returns the number of bytes copied to target.
proc receiveSome*[T](self: Input[T]; n: int): Future[seq[T]]
Pops at most n items from the stream.
proc receiveAll*[T](self: Input[T]; n: int): Future[seq[T]]
Pops n items from the stream.
proc receive*[T](self: Input[T]): Future[T]
Pop an item from the stream.
proc completeFromStreamClose*(c: Completer[void]; err: ref Exception)
Complete c with error err is not JustClose.
proc pipeChunks*[T, R](self: Input[T]; target: Output[R]; function: (
    proc (source: ConstView[T]; target: var seq[R])) = nil): Future[void]

Copy data in chunks from self to target. If function is provided it will be called to copy data from source to destination chunks (so custom processing can be made).

Use this instead of pipe to reduce function call overhead for small elements.

Returned future completes successfuly when there is no more data to copy. If any errros occurs the future completes with error.

proc pipe*[T, R](self: Input[T]; target: Output[R]; function: (proc (x: T): R)): Future[

Copy data from Input to Output while processing them with function.

Returned future completes successfuly when there is no more data to copy. If any errros occurs the future completes with error.

proc pipe*[T](self: Input[T]; target: Output[T]): Future[void]

Copy data from Input to Output.

Returned future completes successfuly when there is no more data to copy. If any errros occurs the future completes with error.

proc mapChunks*[T, R](self: Input[T];
                    function: (proc (source: ConstView[T]; target: var seq[R]))): Input[

Map data in chunks from self and return mapped stream. function will be called to copy data from source to destination chunks (so custom processing can be made).

Use this instead of map to function call overhead for small elements.

proc flatMap*[T, R](self: Input[T]; function: (proc (x: T): seq[R])): Input[R]
Flat-map data from self. Data from self will be passed to function and items returned from it will be placed it order in result.
proc map*[T, R](self: Input[T]; function: (proc (x: T): R)): Input[R]
Map data from self placing modified data in result.
proc map*[T, R](self: Output[T]; function: (proc (x: R): T)): Output[R]
Map data from result placing modified data in self.
proc unwrapInputFuture*[T](f: Future[Input[T]]): Input[T]
Wait until f completes and pipe elements from it to result.
proc unwrapOutputFuture*[T](f: Future[Output[T]]): Output[T]
Wait until f completes and pipe elements from result to it.
proc logClose*(err: ref Exception)
proc onErrorClose*(f: Future[void]; p: Output)
When future f completes with error, close provider p.
proc onErrorClose*(f: Future[void]; s: Input)
When future f completes with error, close stream s.