core

package
Version: v0.0.0-...-2c58ee7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2016 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

The core package provides public feature for oryx, for example, the config, logger and the utility.

Index

Examples

Constants

View Source
const (
	// global specified.
	ReloadWorkers = iota
	ReloadLog
	ReloadListen
	ReloadCpuProfile
	ReloadGcPercent
	// vhost specified.
	ReloadMwLatency
)

the scope for reload.

View Source
const (
	RtmpListen       = 1935
	RtmpDefaultVhost = "__defaultVhost__"
	RtmpDefaultApp   = "__defaultApp__"
)
View Source
const (
	LogInfoLabel  = logLabel + "[info] "
	LogTraceLabel = logLabel + "[trace] "
	LogWarnLabel  = logLabel + "[warn] "
	LogErrorLabel = logLabel + "[error] "
)
View Source
const OryxSigAuthors = "winlin"
View Source
const OryxSigCode = "MonkeyKing"
View Source
const OryxSigCopyright = "Copyright (c) 2013-2015 Oryx(ossrs)"
View Source
const OryxSigEmail = "winlin@vip.126.com"
View Source
const OryxSigKey = "Oryx"

project info.

View Source
const OryxSigLicense = "The MIT License (MIT)"
View Source
const OryxSigName = OryxSigKey + "(SRS++)"
View Source
const OryxSigProduct = "The go-oryx is SRS++, focus on real-time live streaming cluster."
View Source
const OryxSigRole = "cluster"
View Source
const OryxSigStable = 0

stable major version

View Source
const OryxSigUrl = "https://" + OryxSigUrlShort
View Source
const OryxSigUrlShort = "github.com/ossrs/go-oryx"
View Source
const OryxSigWeb = "http://ossrs.net"

Variables

View Source
var OverflowError error = errors.New("system overflow")

when channel overflow, for example, the c0c1 never overflow when channel buffer size set to 2.

View Source
var QuitError error = errors.New("system quit")

the quit error, used for goroutine to return.

View Source
var TimeoutError error = errors.New("io timeout")

when io timeout to wait.

View Source
var VhostNotFoundError error = errors.New("vhost not found")

when the rtmp vhost not found.

Functions

func IsNormalQuit

func IsNormalQuit(err interface{}) bool

whether the object in recover or returned error can ignore, for instance, the error is a Quit error.

func Marshal

func Marshal(o Marshaler, b *bytes.Buffer) (err error)

marshal the object o to b

Example

marshal multiple objects to buffer.

package main

import (
	"bytes"

	"github.com/ossrs/go-oryx/core"
)

func main() {
	// objects to marshal
	var x core.Marshaler // for example NewAmf0String("oryx")
	var y core.Marshaler // for example NewAmf0Number(1.0)

	var b bytes.Buffer // marshal objects to b

	if err := core.Marshal(x, &b); err != nil {
		_ = err // when error.
	}
	if err := core.Marshal(y, &b); err != nil {
		_ = err // when error.
	}

	_ = b.Bytes() // use the bytes contains x and y
}
Output:

func Marshals

func Marshals(o ...Marshaler) (data []byte, err error)

marshal multiple o, which can be nil.

func NewSrsConfCommentReader

func NewSrsConfCommentReader(r io.Reader) io.Reader

the reader support bash-style comment,

line: # comments

func NewSrsConfDirective

func NewSrsConfDirective() *srsConfDirective

func NewSrsConfParser

func NewSrsConfParser(r io.Reader) *srsConfParser

func OryxSigContributorsUrl

func OryxSigContributorsUrl() string

func OryxSigHandshake

func OryxSigHandshake() string

func OryxSigPrimary

func OryxSigPrimary() string

func OryxSigRelease

func OryxSigRelease() string

func OryxSigServer

func OryxSigServer() string

func OryxSigStableBranch

func OryxSigStableBranch() string

func RandomFill

func RandomFill(b []byte)

randome fill the bytes.

func Recover

func Recover(ctx Context, name string, f func() error)

invoke the f with recover. the name of goroutine, use empty to ignore.

func RewriteLogger

func RewriteLogger()

rewrite the label and set alias for logger. @remark for normal application, use the ocore directly.

func Unmarshal

func Unmarshal(o UnmarshalSizer, b *bytes.Buffer) (err error)

unmarshal the object from b

Example

unmarshal multiple objects from buffer

package main

import (
	"bytes"

	"github.com/ossrs/go-oryx/core"
)

func main() {
	var b bytes.Buffer // read from network.

	var x core.UnmarshalSizer // for example Amf0String
	var y core.UnmarshalSizer // for example Amf0Number

	if err := core.Unmarshal(x, &b); err != nil {
		_ = err // when error.
	}
	if err := core.Unmarshal(y, &b); err != nil {
		_ = err // when error.
	}

	// use x and y.
	_ = x
	_ = y
}
Output:

func Unmarshals

func Unmarshals(b *bytes.Buffer, o ...UnmarshalSizer) (err error)

unmarshal multiple o pointers, which can be nil.

func Version

func Version() string

Types

type Agent

type Agent interface {
	// an agent is a resource manager.
	OpenCloser

	// do agent jobs, to pump messages
	// from source to sink.
	Pump() (err error)
	// write to source, from upstream sink.
	Write(m Message) (err error)

	// source tie to the upstream sink.
	Tie(sink Agent) (err error)
	// destroy the link between source and upstream sink.
	UnTie(sink Agent) (err error)
	// get the tied upstream sink of source.
	TiedSink() (sink Agent)

	// sink flow to the downstream source.
	// @remark internal api, sink.Flow(source) when source.tie(sink).
	Flow(source Agent) (err error)
	// destroy the link between sink and downstream sink.
	UnFlow(source Agent) (err error)
}

the agent contains a source which ingest message from upstream sink write message to channel finally delivery to downstream sink.

the arch for agent is:

  +-----upstream----+           +---downstream----+
--+-source => sink--+--(tie->)--+-source => sink--+--
  +-----------------+           +-----------------+

@remark all method is sync, user should never assume it's async.

type Config

type Config struct {
	// the global section.
	Workers int `json:"workers"` // the number of cpus to use

	// the rtmp global section.
	Listen    int  `json:"listen"`     // the system service RTMP listen port
	Daemon    bool `json:"daemon"`     // whether enabled the daemon for unix-like os
	ChunkSize int  `json:"chunk_size"` // the output chunk size. [128, 65535].

	// the go section.
	Go struct {
		Writev     bool   `json:"writev"`      // whether use private writev.
		GcTrace    int    `json:"gc_trace"`    // the gc trace interval in seconds.
		GcInterval int    `json:"gc_interval"` // the gc interval in seconds.
		GcPercent  int    `json:"gc_percent"`  // the gc percent.
		CpuProfile string `json:"cpu_profile"` // the cpu profile file.
		MemProfile string `json:"mem_profile"` // the memory profile file.
	}

	// the log config.
	Log struct {
		Tank  string `json:"tank"`  // the log tank, file or console
		Level string `json:"level"` // the log level, info/trace/warn/error
		File  string `json:"file"`  // for log tank file, the log file path.
	} `json:"log"`

	// the heartbeat section.
	Heartbeat struct {
		Enabled  bool    `json:"enabled"`   // whether enable the heartbeat.
		Interval float64 `json:"interval"`  // the heartbeat interval in seconds.
		Url      string  `json:"url"`       // the url to report.
		DeviceId string  `json:"device_id"` // the device id to report.
		Summary  bool    `json:"summaries"` // whether enable the detail summary.
		Listen   int     `json:"listen"`    // the heartbeat http api listen port.
	} `json:"heartbeat"`

	// the stat section.
	Stat struct {
		Network int      `json:"network"` // the network device index to use as exported ip.
		Disks   []string `json:"disk"`    // the disks to stat.
	} `json:"stats"`

	Debug struct {
		RtmpDumpRecv bool `json:"rtmp_dump_recv"`
	} `json:"debug"`

	// the vhosts section.
	Vhosts []*Vhost `json:"vhosts"`
	// contains filtered or unexported fields
}

the config for this application, which can load from file in json style, and convert to json string. @remark user can use the GsConfig object.

var Conf *Config

the current global config.

func NewConfig

func NewConfig(ctx Context) *Config

func (*Config) Conf

func (v *Config) Conf() string

get the config file path.

func (*Config) Loads

func (v *Config) Loads(conf string) error

loads and validate config from config file.

Example
package main

import (
	"fmt"

	"github.com/ossrs/go-oryx/core"
)

func main() {
	ctx := core.NewContext()
	c := core.NewConfig(ctx)
	c.SetDefaults()

	//if err := c.Loads("config.json"); err != nil {
	//    panic(err)
	//}

	fmt.Println("listen at", c.Listen)
	fmt.Println("workers is", c.Workers)
	if c.Go.GcInterval == 0 {
		fmt.Println("go gc use default interval.")
	}

}
Output:

listen at 1935
workers is 0
go gc use default interval.

func (*Config) LogTank

func (v *Config) LogTank(level string, dw io.Writer) io.Writer

get the log tank writer for specified level. the param dw is the default writer.

func (*Config) LogToFile

func (v *Config) LogToFile() bool

whether log tank is file

func (*Config) Reload

func (v *Config) Reload(cc *Config) (err error)

func (*Config) ReloadCycle

func (v *Config) ReloadCycle(wc WorkerContainer)

func (*Config) SetDefaults

func (c *Config) SetDefaults()

func (*Config) Subscribe

func (v *Config) Subscribe(h ReloadHandler)

subscribe the reload event, when got reload event, notify all handlers.

func (*Config) Unsubscribe

func (v *Config) Unsubscribe(h ReloadHandler)

func (*Config) Validate

func (v *Config) Validate() error

validate the config whether ok.

func (*Config) Vhost

func (v *Config) Vhost(name string) (*Vhost, error)

func (*Config) VhostGroupMessages

func (v *Config) VhostGroupMessages(vhost string) (n int, err error)

func (*Config) VhostRealtime

func (v *Config) VhostRealtime(vhost string) (r bool, err error)

type Context

type Context interface {
	ocore.Context
}

alias the Context interface. @remark user can directly use ocore Context.

func NewContext

func NewContext() Context

type Logger

type Logger interface {
	ocore.Logger
}

alias the Logger interface. @remark user can directly use ocore Logger.

var Error Logger = nil

error, the error level, fatal error things, ot stderr.

var Info Logger = nil

the application loggers info, the verbose info level, very detail log, the lowest level, to discard.

var Trace Logger = nil

trace, the trace level, something important, the default log level, to stdout.

var Warn Logger = nil

warn, the warning level, dangerous information, to stderr.

func NewLoggerPlus

func NewLoggerPlus(l *log.Logger) Logger

alias for log plus.

type Marshaler

type Marshaler interface {
	encoding.BinaryMarshaler
}

unmarshaler

type Message

type Message interface {
	fmt.Stringer

	// the muxer of message.
	Muxer() MessageMuxer
}

the message for oryx the common structure for RTMP/FLV/HLS/MP4 or any message, it can be media message or control message. the message flow from agent to another agent.

type MessageMuxer

type MessageMuxer uint8

the muxer of oryx message type.

const (
	MuxerRtmp MessageMuxer = iota
	MuxerFlv
	MuxerH264
	MuxerRtsp
	MuxerTs
	MuxerAac
	MuxerMp3
)

type OpenCloser

type OpenCloser interface {
	Opener
	io.Closer
}

the open and closer for resource manage.

type Opener

type Opener interface {
	// open the resource.
	Open() error
}

the opener to open the resource.

type Play

type Play struct {
	MwLatency int `json:"mw_latency`
}

func NewConfPlay

func NewConfPlay() *Play

type Quiter

type Quiter struct {
	// contains filtered or unexported fields
}

which used for quit. TODO: FIXME: server should use it.

func NewQuiter

func NewQuiter() *Quiter

func (*Quiter) QC

func (v *Quiter) QC() <-chan bool

func (*Quiter) Quit

func (v *Quiter) Quit() (err error)

type ReloadHandler

type ReloadHandler interface {
	// when reload the global scopes,
	// for example, the workers, listen and log.
	// @param scope defined in const ReloadXXX.
	// @param cc the current loaded config, GsConfig.
	// @param pc the previous old config.
	OnReloadGlobal(scope int, cc, pc *Config) (err error)
	// when reload the vhost scopes,
	// for example, the Vhost.Play.MwLatency
	// @param scope defined in const ReloadXXX.
	// @param cc the current loaded config, GsConfig.
	// @param pc the previous old config.
	OnReloadVhost(vhost string, scope int, cc, pc *Config) (err error)
}

the reload handler, the client which care about the reload event, must implements this interface and then register itself to the config.

type UnmarshalSizer

type UnmarshalSizer interface {
	encoding.BinaryUnmarshaler

	// the total size of bytes for this amf0 instance.
	Size() int
}

unmarshaler and sizer.

type Vhost

type Vhost struct {
	Name     string `json:"name"`
	Realtime bool   `json:"min_latency"`
	Play     *Play  `json:"play,ommit-empty"`
}

the vhost section in config.

func NewConfVhost

func NewConfVhost() *Vhost

type WorkerContainer

type WorkerContainer interface {
	// get the quit channel,
	// worker can fetch the quit signal.
	// please use Quit to notify the container to quit.
	QC() <-chan bool
	// notify the container to quit.
	// for example, when goroutine fatal error,
	// which can't be recover, notify server to cleanup and quit.
	// @remark when got quit signal, the goroutine must notify the
	//      container to Quit(), for which others goroutines wait.
	// @remark this quit always return a core.QuitError error, which can be ignore.
	Quit() (err error)
	// fork a new goroutine with work container.
	// the param f can be a global func or object method.
	// the param name is the goroutine name.
	GFork(name string, f func(WorkerContainer))
}

the container for all worker, which provides the quit and cleanup methods.

Example (Fatal)

the goroutine cycle notify container to quit when error.

package main

import (
	"time"

	"github.com/ossrs/go-oryx/core"
)

func main() {
	var wc core.WorkerContainer
	wc.GFork("myservice", func(wc core.WorkerContainer) {
		for {
			select {
			case <-time.After(3 * time.Second):
				// select other channel, do something cycle to get error.
				if err := error(nil); err != nil {
					// when got none-recoverable error, notify container to quit.
					wc.Quit()
					return
				}
			case <-wc.QC():
				// when got a quit signal, break the loop.
				// and must notify the container again for other workers
				// in container to quit.
				wc.Quit()
				return
			}
		}
	})
}
Output:

Example (Recoverable)

the goroutine cycle ignore any error.

package main

import (
	"time"

	"github.com/ossrs/go-oryx/core"
)

func main() {
	var wc core.WorkerContainer
	wc.GFork("myservice", func(wc core.WorkerContainer) {
		for {
			select {
			case <-time.After(3 * time.Second):
				// select other channel, do something cycle to get error.
				if err := error(nil); err != nil {
					// recoverable error, log it only and continue or return.
					continue
				}
			case <-wc.QC():
				// when got a quit signal, break the loop.
				// and must notify the container again for other workers
				// in container to quit.
				wc.Quit()
				return
			}
		}
	})
}
Output:

Example (Safe)

the goroutine cycle absolutely safe, no panic no error to quit.

package main

import (
	"time"

	"github.com/ossrs/go-oryx/core"
)

func main() {
	var wc core.WorkerContainer
	wc.GFork("myservice", func(wc core.WorkerContainer) {
		defer func() {
			if r := recover(); r != nil {
				// log the r and ignore.
				return
			}
		}()

		for {
			select {
			case <-time.After(3 * time.Second):
				// select other channel, do something cycle to get error.
				if err := error(nil); err != nil {
					// recoverable error, log it only and continue or return.
					continue
				}
			case <-wc.QC():
				// when got a quit signal, break the loop.
				// and must notify the container again for other workers
				// in container to quit.
				wc.Quit()
				return
			}
		}
	})
}
Output:

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL