catman

package module
v0.0.0-...-8473c24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2019 License: MIT Imports: 14 Imported by: 0

README

catman

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/samuel/go-zookeeper/zk"

	"github.com/shanexu/go-catman"
)

func main() {
	cm, err := catman.NewCatMan([]string{"127.0.0.1:2181"}, time.Second)
	if err != nil {
		panic(err)
	}
	defer cm.Close()
	var wg sync.WaitGroup
	wg.Add(1)
	cs, err := cm.CMChildren("/children", catman.WatcherFunc(func(event zk.Event) {
		fmt.Println(event)
		wg.Done()
	}))
	if err != nil {
		panic(err)
	}
	fmt.Printf("%+v\n", cs)
	wg.Wait()
}

election

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/shanexu/go-catman"
)

func main() {
	cm, err := catman.NewCatMan([]string{"127.0.0.1:2181"}, time.Second)
	if err != nil {
		panic(err)
	}
	defer cm.Close()
	l := cm.NewLeaderElectionSupport("myhost", "/election")
	l.AddListener(catman.LeaderElectionAwareFunc(func(event catman.ElectionEvent) {
		fmt.Println("ElectionEvent:", event)
	}))
	if err := l.Start(); err != nil {
		panic(err)
	}
	quit := make(chan os.Signal)
	signal.Notify(quit, syscall.SIGTERM, syscall.SIGINT)
	<-quit
	if err := l.Stop(); err != nil {
		panic(err)
	}
}

queue

package main

import (
	"fmt"
	"time"

	"github.com/shanexu/go-catman"
)

func main() {
	cm, err := catman.NewCatMan([]string{"127.0.0.1:2181"}, time.Second)
	defer cm.Close()
	if err != nil {
		panic(err)
	}
	q := cm.NewDistributedQueue("/queue")

	_, err = q.Offer([]byte("hello"))
	if err != nil {
		panic(err)
	}

	data, err := q.Element()
	if err != nil {
		panic(err)
	}

	fmt.Printf("%s\n", data)

	data, err = q.Take()
	if err != nil {
		panic(err)
	}
	fmt.Printf("%s\n", data)
}

lock

package main

import (
	"fmt"
	"time"

	"github.com/shanexu/go-catman"
)

type lockListener int

func (l lockListener) LockAcquired() {
	fmt.Printf("LockAcquired %d\n", l)
}

func (l lockListener) LockReleased() {
	fmt.Printf("LockReleased %d\n", l)
}

func main() {
	cm, err := catman.NewCatMan([]string{"127.0.0.1:2181"}, time.Second)
	if err != nil {
		panic(err)
	}
	go func() {
		l := cm.NewLock("/mylock", catman.OpenAclUnsafe, lockListener(0))
		l.Lock()
		fmt.Println("locked")
		time.Sleep(time.Second * 10)
		l.Unlock()
	}()

	l := cm.NewLock("/mylock", catman.OpenAclUnsafe, lockListener(1))
	l.Lock()
	fmt.Println("locked")
	time.Sleep(time.Second * 10)
	l.Unlock()
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	OpenAclUnsafe = zk.WorldACL(zk.PermAll)
	CreatorAllAcl = zk.AuthACL(zk.PermAll)
	ReadAclUnsafe = zk.WorldACL(zk.PermRead)
)
View Source
var (
	ErrBadPath = errors.New("bad path")
)
View Source
var (
	ErrNoSuchElement = errors.New("no such element")
)
View Source
var LeaderOfferComparator = utils.Comparator(func(a interface{}, b interface{}) int {
	aAsserted := a.(*LeaderOffer)
	bAsserted := b.(*LeaderOffer)

	return utils.IntComparator(aAsserted.id, bAsserted.id)
})
View Source
var ZNodeNameComparator = utils.Comparator(func(a interface{}, b interface{}) int {
	aAsserted := a.(*ZNodeName)
	bAsserted := b.(*ZNodeName)

	answer := aAsserted.sequence - bAsserted.sequence
	if answer != 0 {
		return answer
	}
	return utils.StringComparator(aAsserted.prefix, bAsserted.prefix)
})

Functions

This section is empty.

Types

type CatMan

type CatMan struct {
	*zk.Conn
	// contains filtered or unexported fields
}

func NewCatMan

func NewCatMan(servers []string, sessionTimeout time.Duration, opts ...CatManConfigOption) (*CatMan, error)

func (*CatMan) CMChildren

func (cm *CatMan) CMChildren(parent string, watcher Watcher) ([]string, error)

func (*CatMan) CMCreate

func (cm *CatMan) CMCreate(path string, data []byte, opts ...CreateConfigOption) (string, error)

func (*CatMan) CMCreateEphemeralSequential

func (cm *CatMan) CMCreateEphemeralSequential(pathPrefix string, data []byte, opts ...CreateConfigOption) (string, int64, error)

func (*CatMan) CMCreateProtectedEphemeralSequential

func (cm *CatMan) CMCreateProtectedEphemeralSequential(
	path string,
	data []byte,
	opts ...CreateConfigOption,
) (string, int64, error)

func (*CatMan) CMCreateSequential

func (cm *CatMan) CMCreateSequential(pathPrefix string, data []byte, opts ...CreateConfigOption) (string, error)

func (*CatMan) CMDelete

func (cm *CatMan) CMDelete(path string, version int32) error

func (*CatMan) CMExists

func (cm *CatMan) CMExists(path string, watcher Watcher) (*zk.Stat, error)

func (*CatMan) CMGet

func (cm *CatMan) CMGet(path string) ([]byte, error)

func (*CatMan) Close

func (cm *CatMan) Close()

func (*CatMan) NewDistributedQueue

func (cm *CatMan) NewDistributedQueue(dir string) *DistributedQueue

func (*CatMan) NewLeaderElectionSupport

func (cm *CatMan) NewLeaderElectionSupport(hostName, rootNodeName string) *LeaderElectionSupport

func (*CatMan) NewLock

func (cm *CatMan) NewLock(dir string, acl []zk.ACL, callback LockListener) *Lock

type CatManConfig

type CatManConfig struct {
	ACL     []zk.ACL
	Watcher Watcher
	Log     utils.Logger
}

type CatManConfigOption

type CatManConfigOption func(*CatManConfig)

type CreateConfig

type CreateConfig struct {
	Flag int32
	ACL  []zk.ACL
}

type CreateConfigOption

type CreateConfigOption func(*CreateConfig)

type DistributedQueue

type DistributedQueue struct {
	// contains filtered or unexported fields
}

func (*DistributedQueue) Element

func (q *DistributedQueue) Element() ([]byte, error)

Return the head of the queue without modifying the queue.

func (*DistributedQueue) Offer

func (q *DistributedQueue) Offer(data []byte) (bool, error)

Inserts data into queue.

func (*DistributedQueue) Peek

func (q *DistributedQueue) Peek() ([]byte, error)

Returns the data at the first element of the queue, or null if the queue is empty.

func (*DistributedQueue) Poll

func (q *DistributedQueue) Poll() ([]byte, error)

Attempts to remove the head of the queue and return it. Returns null if the queue is empty.

func (*DistributedQueue) Remove

func (q *DistributedQueue) Remove() ([]byte, error)

Attempts to remove the head of the queue and return it.

func (*DistributedQueue) Take

func (q *DistributedQueue) Take() ([]byte, error)

Removes the head of the queue and returns it, blocks until it succeeds.

type ElectionEvent

type ElectionEvent int
const (
	ElectionEventStart ElectionEvent = iota
	ElectionEventOfferStart
	ElectionEventOfferComplete
	ElectionEventDetermineStart
	ElectionEventDetermineComplete
	ElectionEventElectedStart
	ElectionEventElectedComplete
	ElectionEventReadyStart
	ElectionEventReadyComplete
	ElectionEventFailed
	ElectionEventStopStart
	ElectionEventStopComplete
)

func (ElectionEvent) String

func (i ElectionEvent) String() string

type ElectionState

type ElectionState int
const (
	ElectionStateStart ElectionState = iota
	ElectionStateOffer
	ElectionStateDetermine
	ElectionStateElected
	ElectionStateReady
	ElectionStateFailed
	ElectionStateStop
)

func (ElectionState) String

func (i ElectionState) String() string

type ErrUnexpectedEvent

type ErrUnexpectedEvent struct {
	zk.EventType
}

func (*ErrUnexpectedEvent) Error

func (ue *ErrUnexpectedEvent) Error() string

type LeaderElectionAware

type LeaderElectionAware interface {
	OnElectionEvent(event ElectionEvent)
}

Called during each state transition. Current, low level events are provided at the beginning and end of each state. For instance, START may be followed by OFFER_START, OFFER_COMPLETE, DETERMINE_START, DETERMINE_COMPLETE, and so on

type LeaderElectionAwareFunc

type LeaderElectionAwareFunc func(event ElectionEvent)

func (LeaderElectionAwareFunc) OnElectionEvent

func (l LeaderElectionAwareFunc) OnElectionEvent(event ElectionEvent)

type LeaderElectionSupport

type LeaderElectionSupport struct {
	// contains filtered or unexported fields
}

func (*LeaderElectionSupport) AddListener

func (l *LeaderElectionSupport) AddListener(listener LeaderElectionAware)

func (*LeaderElectionSupport) HostName

func (l *LeaderElectionSupport) HostName() string

func (*LeaderElectionSupport) LeaderHostName

func (l *LeaderElectionSupport) LeaderHostName() (string, error)

func (*LeaderElectionSupport) LeaderOffer

func (l *LeaderElectionSupport) LeaderOffer() *LeaderOffer

func (*LeaderElectionSupport) Process

func (l *LeaderElectionSupport) Process(event zk.Event)

func (*LeaderElectionSupport) RemoveListener

func (l *LeaderElectionSupport) RemoveListener(listener LeaderElectionAware)

func (*LeaderElectionSupport) RootNodeName

func (l *LeaderElectionSupport) RootNodeName() string

func (*LeaderElectionSupport) SetHostName

func (l *LeaderElectionSupport) SetHostName(hostName string)

func (*LeaderElectionSupport) SetRootNodeName

func (l *LeaderElectionSupport) SetRootNodeName(rootNodeName string)

func (*LeaderElectionSupport) Start

func (l *LeaderElectionSupport) Start() error

start the election process. This method will create a leader offer, determine its status, and either become the leader or become ready.

func (*LeaderElectionSupport) Stop

func (l *LeaderElectionSupport) Stop() error

stops all election services, revokes any outstanding leader offers, and disconnects from ZooKeeper.

type LeaderOffer

type LeaderOffer struct {
	// contains filtered or unexported fields
}

func NewLeaderOffer

func NewLeaderOffer(id int, nodePath string, hostName string) *LeaderOffer

func (*LeaderOffer) HostName

func (l *LeaderOffer) HostName() string

func (*LeaderOffer) Id

func (l *LeaderOffer) Id() int

func (*LeaderOffer) NodePath

func (l *LeaderOffer) NodePath() string

func (*LeaderOffer) SetHostName

func (l *LeaderOffer) SetHostName(hostName string)

func (*LeaderOffer) SetId

func (l *LeaderOffer) SetId(id int)

func (*LeaderOffer) SetNodePath

func (l *LeaderOffer) SetNodePath(nodePath string)

type Lock

type Lock struct {
	// contains filtered or unexported fields
}

func (*Lock) Acl

func (l *Lock) Acl() []zk.ACL

return the acl its using.

func (*Lock) CatMan

func (l *Lock) CatMan() *CatMan

return zookeeper client instance.

func (*Lock) Close

func (l *Lock) Close()

Closes this strategy and releases any ZooKeeper resources; but keeps the ZooKeeper instance open.

func (*Lock) Closed

func (l *Lock) Closed() bool

Returns true if this protocol has been closed.

func (*Lock) IsOwner

func (l *Lock) IsOwner() bool

func (*Lock) Lock

func (l *Lock) Lock() (bool, error)

func (*Lock) LockListener

func (l *Lock) LockListener() LockListener

return the current locklistener.

func (*Lock) RetryDelay

func (l *Lock) RetryDelay() time.Duration

get the retry delay

func (*Lock) SetAcl

func (l *Lock) SetAcl(acl []zk.ACL)

set the acl.

func (*Lock) SetLockListener

func (l *Lock) SetLockListener(callback LockListener)

register a different call back listener.

func (*Lock) SetRetryDelay

func (l *Lock) SetRetryDelay(retryDelay time.Duration)

Sets the time waited between retry delays.

func (*Lock) Unlock

func (l *Lock) Unlock() error

type LockListener

type LockListener interface {
	LockAcquired()
	LockReleased()
}

type Watcher

type Watcher interface {
	Process(zk.Event)
}

type WatcherFunc

type WatcherFunc func(zk.Event)

func (WatcherFunc) Process

func (f WatcherFunc) Process(event zk.Event)

type ZNodeName

type ZNodeName struct {
	// contains filtered or unexported fields
}

func NewZNodeName

func NewZNodeName(name string) *ZNodeName

func (*ZNodeName) Name

func (n *ZNodeName) Name() string

type ZooKeeperOperation

type ZooKeeperOperation interface {
	Execute() (bool, error)
}

type ZooKeeperOperationFunc

type ZooKeeperOperationFunc func() (bool, error)

Performs the operation - which may be involved multiple times if the connection to ZooKeeper closes during this operation.

func (ZooKeeperOperationFunc) Execute

func (f ZooKeeperOperationFunc) Execute() (bool, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL