Module stream

Types

Input*[T] = ref object
  bufferSize: int
  queue: Queue[T]
  onRecvReady: Event[void]
  onSendReady: Event[void]
  sendClosed: bool
  recvClosed: bool
  sendCloseException: ref Exception
  recvCloseException: ref Exception
  when not defined(release):
      marker: uint32

  
  Source Edit
Output* {.
borrow: `.`
.}[T] = distinct Input[T]
  Source Edit
Pipe*[T] = ref object {.
inheritable
.} input*: Input[T] output*: Output[T]
  Source Edit
CloseException* = object of Exception
  
Just close the stream/provider, without any error.   Source Edit
LengthInput*[T] = tuple[length: int64, stream: Input[T]]
  Source Edit

Lets

JustClose* = (ref CloseException)(msg: "just close")
  Source Edit

Procs

proc `$`*[T](input: Input[T]): string
  Source Edit
proc `$`*[T](output: Output[T]): string
  Source Edit
proc newPipe*[T](input: Input[T]; output: Output[T]): Pipe[T]
  Source Edit
proc newInputOutputPair*[T](bufferSize = 0): tuple[input: Input[T], output: Output[T]]
Create a new stream/provider pair. Proving values to provider will make them available on stream. If more than bufferSize items are provided without being consumed by stream, provide operation blocks. If bufferSize == 0 is the implementation specific default is chosen.   Source Edit
proc newPipe*[T](typ: typedesc[T]): tuple[a: Pipe[T], b: Pipe[T]]
  Source Edit
proc `onRecvReady`*[T](self: Input[T]): auto
  Source Edit
proc `onSendReady`*[T](self: Output[T]): auto
  Source Edit
proc getRecvCloseException*(self: Output): auto
  Source Edit
proc getSendCloseException*(self: Input): auto
  Source Edit
proc isRecvClosed*(self: Output): bool
  Source Edit
proc isSendClosed*(self: Input): bool
  Source Edit
proc sendSome*[T](self: Output[T]; data: ConstView[T]): int
Provides some items pointed by view data. Returns how many items were actualy provided.   Source Edit
proc sendAll*[T](self: Output[T]; data: seq[T] | string): Future[void]
Provides items from data. Returns Future that finishes when all items are provided.   Source Edit
proc send*[T](self: Output[T]; item: T): Future[void]
Provides a single item. Returns Future that finishes when the item is pushed into queue.   Source Edit
proc sendClose*(self: Output; exc: ref Exception)
Closes the output stream -- signals that no more items will be provided.   Source Edit
proc waitForRecvClose*[T](self: Output[T]; callback: proc ())
  Source Edit
proc recvClose*[T](self: Input[T]; exc: ref Exception)
Closes the input stream -- signals that no more items will be received.   Source Edit
proc close*[T](self: Pipe[T]; exc: ref Exception)
  Source Edit
proc freeBufferSize*[T](self: Output[T]): int
How many items can be pushed to the queue without blocking?   Source Edit
proc dataAvailable*[T](self: Input[T]): int
How many items can be received from the queue without blocking?   Source Edit
proc peekMany*[T](self: Input[T]): ConstView[T]
Look at several items from the input.   Source Edit
proc discardItems*[T](self: Input[T]; count: int)
Discard count items from the stream. Often used after peekMany.   Source Edit
proc waitForData*[T](self: Input[T]; allowSpurious = false): Future[void]
  Source Edit
proc waitForSpace*[T](self: Output[T]; allowSpurious = false): Future[void]
Waits until space is available in the buffer. For use with sendSome and freeBufferSize.   Source Edit
proc receiveSomeInto*[T](self: Input[T]; target: View[T]): int
Pops all available data into target, but not more that the length of target. Returns the number of bytes copied to target.   Source Edit
proc receiveSome*[T](self: Input[T]; n: int): Future[seq[T]]
Pops at most n items from the stream.   Source Edit
proc receiveAll*[T](self: Input[T]; n: int): Future[seq[T]]
Pops n items from the stream.   Source Edit
proc receive*[T](self: Input[T]): Future[T]
Pop an item from the stream.   Source Edit
proc completeFromStreamClose*(c: Completer[void]; err: ref Exception)
Complete c with error err is not JustClose.   Source Edit
proc pipeChunks*[T, R](self: Input[T]; target: Output[R]; function: (
    proc (source: ConstView[T]; target: var seq[R])) = nil): Future[void]

Copy data in chunks from self to target. If function is provided it will be called to copy data from source to destination chunks (so custom processing can be made).

Use this instead of pipe to reduce function call overhead for small elements.

Returned future completes successfuly when there is no more data to copy. If any errros occurs the future completes with error.

  Source Edit
proc pipe*[T, R](self: Input[T]; target: Output[R]; function: (proc (x: T): R)): Future[
    void]

Copy data from Input to Output while processing them with function.

Returned future completes successfuly when there is no more data to copy. If any errros occurs the future completes with error.

  Source Edit
proc pipe*[T](self: Input[T]; target: Output[T]): Future[void]

Copy data from Input to Output.

Returned future completes successfuly when there is no more data to copy. If any errros occurs the future completes with error.

  Source Edit
proc mapChunks*[T, R](self: Input[T];
                    function: (proc (source: ConstView[T]; target: var seq[R]))): Input[
    R]

Map data in chunks from self and return mapped stream. function will be called to copy data from source to destination chunks (so custom processing can be made).

Use this instead of map to function call overhead for small elements.

  Source Edit
proc flatMap*[T, R](self: Input[T]; function: (proc (x: T): seq[R])): Input[R]
Flat-map data from self. Data from self will be passed to function and items returned from it will be placed it order in result.   Source Edit
proc map*[T, R](self: Input[T]; function: (proc (x: T): R)): Input[R]
Map data from self placing modified data in result.   Source Edit
proc map*[T, R](self: Output[T]; function: (proc (x: R): T)): Output[R]
Map data from result placing modified data in self.   Source Edit
proc unwrapInputFuture*[T](f: Future[Input[T]]): Input[T]
Wait until f completes and pipe elements from it to result.   Source Edit
proc unwrapOutputFuture*[T](f: Future[Output[T]]): Output[T]
Wait until f completes and pipe elements from result to it.   Source Edit
proc logClose*(err: ref Exception)
  Source Edit
proc onErrorClose*(f: Future[void]; p: Output)
When future f completes with error, close provider p.   Source Edit
proc onErrorClose*(f: Future[void]; s: Input)
When future f completes with error, close stream s.   Source Edit