stream

package module
v0.0.0-...-171658a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2017 License: MIT Imports: 5 Imported by: 2

README

stream

Go Travis Ci Go Report Card

make simple ReactiveX pattern for golang

Subsets

Example

  • bytes stream
      obv := NewObserver(nil)
      stream := NewBytesStream(obv)
      stream.Target = func() {
      	for i := 0; i <= 10; i++ {
      		stream.Send([]byte{byte(i)})
      	}
      	stream.OnComplete()
      }
    
      stream.Publish(nil).Subscribe(func(data []byte) {
      	log.Print(data)
      })
    

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AfterSignal

func AfterSignal() chan os.Signal

AfterSignal process의 종료을 뜻하는 모든 os signal, system call을 감시

Types

type BytesPipeline

type BytesPipeline func(data [][]byte) <-chan []byte

BytesStream handle data with the sub stream

type BytesStream

type BytesStream struct {
	AtSubscribe func(data []byte)

	*Observer
	// contains filtered or unexported fields
}

BytesStream is bytestyped go channel stream

func NewBytesStream

func NewBytesStream(obv *Observer) *BytesStream

NewBytesStream create the BytesStream

func (*BytesStream) Chunk

func (bs *BytesStream) Chunk(size int) *BytesStream

Chunk gather data until fill the buffer

func (*BytesStream) Publish

func (bs *BytesStream) Publish(target func()) *BytesStream

Publish the stream with the observer

func (*BytesStream) Send

func (bs *BytesStream) Send(data []byte)

Send bytes data to the go channel

func (*BytesStream) Subscribe

func (bs *BytesStream) Subscribe(call func([]byte))

Subscribe data from the go channel

type ErrHandler

type ErrHandler func(error)

ErrHandler error 예외처리를 하는 함수 형태

type ObservHandler

type ObservHandler struct {
	AtComplete StateHandler
	AtCancel   StateHandler
	AtError    ErrHandler
}

ObservHandler Observer의 상태 변화에 따라 실행될 핸들러

func DefaultObservHandler

func DefaultObservHandler() *ObservHandler

DefaultObservHandler 기본 핸들러 설정

type Observer

type Observer struct {
	DoneSubscribe chan struct{}

	Handler ObservHandler
	Target  func()
	WG      sync.WaitGroup
	// contains filtered or unexported fields
}

Observer Status를 모니터링하는 관찰자 객체

func NewObserver

func NewObserver(handler *ObservHandler) *Observer

NewObserver Observer 생성

func (*Observer) AfterCancel

func (o *Observer) AfterCancel() <-chan struct{}

AfterCancel Observable을 취소하기 위한 메소드

func (*Observer) Cancel

func (o *Observer) Cancel()

Cancel Observ 활동을 취소시킨다. NOTE Observ의 종료와 동시에 Subscribe도 종료된다. NOTE Cancel시에 데이터를 완전히 보장하지 않고, 바로 종료시킨다.

func (*Observer) OnComplete

func (o *Observer) OnComplete()

OnComplete Observer의 활동을 끝낸다. Observable 에서 return과 동반되거나 종료될 때 사용

func (*Observer) OnError

func (o *Observer) OnError(err error)

OnError error를 전달한다.

func (*Observer) Watch

func (o *Observer) Watch(target func())

Watch watch the target handler

type StateHandler

type StateHandler func()

StateHandler 상태 변화에 호출되는 함수 형태

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL