bufpipe

package module
v0.0.0-...-0ae6559 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2022 License: BSD-3-Clause Imports: 5 Imported by: 0

README

golib-bufpipe : Buffered pipe with FIFO queue

Documentation

Overview

Package bufpipe implements several buffered pipe types of infinite size.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoData = fmt.Errorf("no data") // No data in the Pipe[T]
)
View Source
var (
	ReadFromBufSize = 1 * 1024 * 1024 // default size for BytePipe.ReadFromSize
)

Functions

This section is empty.

Types

type BytePipe

type BytePipe struct {
	Pipe[[]byte] // Pipe of []byte data block

	ReadFromSize int // size of []byte data blocks created by ReadFrom()
	// contains filtered or unexported fields
}

A Pipe of []Byte with io.Reader, io.WriteCloser and io.ReadFrom interface.

func NewBytePipe

func NewBytePipe() *BytePipe

Create a new BytePipe.

func (*BytePipe) Close

func (bp *BytePipe) Close() bool

io.Closer for io.WriteCloser, but not for io.ReadCloser. Closing BytePipe prevents data from writing, but Read()/Fetch()/Receive() are OK until io.EOF reached. Check for returning io.EOF, or EOF() to know the end of the stream.

func (*BytePipe) EOF

func (bp *BytePipe) EOF() bool

Check if the BytePipe is closed and no data left for read.

func (*BytePipe) Read

func (bp *BytePipe) Read(p []byte) (n int, err error)

io.Reader inteface for BytePipe. The data is internally copied from the Pipe to the provided buffer. Use Fetch() or Receive() for zero-copy data receiving.

func (*BytePipe) ReadFrom

func (bp *BytePipe) ReadFrom(r io.Reader) (n int64, err error)

io.ReaderFrom interface for BytePipe. Incoming data are partitioned into multiple []byte of bp.ReadFromSize and stored.

func (*BytePipe) Write

func (bp *BytePipe) Write(p []byte) (n int, err error)

io.Writer interface for BytePipe. The data is copied from the provided buffer to an internal buffer when writing. Use Append() for zero-copy data passing.

type NotifyCh

type NotifyCh[T any] struct {
	Value     T
	Cancelled bool
	// contains filtered or unexported fields
}

A lock-free one-time notification channel that can be cancelled. NotifyCh internally uses a channel which is closed when notified.

func NewNotifyCh

func NewNotifyCh[T any]() *NotifyCh[T]

Create a notification channel.

func (*NotifyCh[T]) Cancel

func (c *NotifyCh[T]) Cancel() bool

Cancel the notification channel. Returns false if the channel is already notified or cancelled.

func (*NotifyCh[T]) CancelWithValue

func (c *NotifyCh[T]) CancelWithValue(value T) bool

Cancel the notification channel setting the value. Returns false if the channel is already notified or cancelled.

func (*NotifyCh[T]) FetchChannel

func (c *NotifyCh[T]) FetchChannel() chan any

Get a one-time channel for detecting notification, usually to mux with select{}. The returning channel will be closed when notificaion is sent or cancelled. The function returns nil if the channel is already fetched or notified. If the user does NOT consumed the returned channel for some reason, then the channel should be reverted to unreferenced state using UnfetchChannel().

func (*NotifyCh[T]) Notify

func (c *NotifyCh[T]) Notify(value T) bool

Send notification to the notification channel. if the notification channel is already notified or cancelled, then returns false.

func (*NotifyCh[T]) UnfetchChannel

func (c *NotifyCh[T]) UnfetchChannel(ch chan any) bool

Revert the channel fetched by FetchChannel() to unused state. Do not return if the channel is notified / closed.

func (*NotifyCh[T]) Wait

func (c *NotifyCh[T]) Wait(ctx context.Context) (value T, err error)

Wait for the notification. This function blocks until a Notify() or Cancel() call, or ctx.Done() is done. Returned value is the value set by Notify() or CancelWithValue(). The returned err is nil if the notification is sent by Notify(). The err is io.ErrClosedPipe if cancelled, and will be io.EOF if already used. If the functions terminated by the provided context, then err is the error of the context, or context.Cancelled.

type Pipe

type Pipe[T any] struct {
	// contains filtered or unexported fields
}

Pipe is a FIFO queue that can be closed with Close(). When closed, the read functions will return io.EOF when no data left. Please note that this object does NOT suits for io.PipeReader and io.PipeWriter interface. Especially, Close() closes the stream on write side, but the data remains OK on the read side.

func NewPipe

func NewPipe[T any]() *Pipe[T]

Make a new pipe of type T.

func (*Pipe[T]) Append

func (q *Pipe[T]) Append(v T) (n int, err error)

Append a data to the pipe. n is current number of entries in the pipe. If the pipe is closed, an io.ErrClosedPipe is returned.

func (*Pipe[T]) Close

func (q *Pipe[T]) Close() bool

Close the pipe on the write side. After the Close(), Append() will fail but Fetch() and Receive() do work until the data runs out.

func (*Pipe[T]) Fetch

func (q *Pipe[T]) Fetch() (v T, err error)

Get a data from the pipe. if there is no data and the pipe is NOT closed, then returns ErrNoData. if there is no data and the pipe is closed, then returns io.EOF.

func (*Pipe[T]) Len

func (q *Pipe[T]) Len() int

Number of data entries in the pipe.

func (*Pipe[T]) Receive

func (q *Pipe[T]) Receive(ctx context.Context) (p T, err error)

Receive a data from the pipe. This function blocks until a new data is received, the pipe is closed, or the ctx.Done() is done. Returns io.EOF if the pipe is closed and no data left.

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

A lock-free, first-in-first-out queue. https://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf

func NewQueue

func NewQueue[T any]() *Queue[T]

Create a queue of type T

func (*Queue[T]) Dequeue

func (q *Queue[T]) Dequeue() (value T, ok bool)

Get a entry from the queue. Fetches the size of entires in the queue.

func (*Queue[T]) Enqueue

func (q *Queue[T]) Enqueue(v T) int

Add a entry to the queue. Returns the size of entries in the queue. This is the implementation exactly on the paper.

func (*Queue[T]) Len

func (q *Queue[T]) Len() int

Number of entries in the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL