Documentation
¶
Index ¶
- Constants
- Variables
- func GetNToken(privateKeyPath, domain, service, keyVersion string, expireTime time.Duration) (ntoken string, err error)
- func GetRoleToken(url, authHeader, ntoken, tenantDomain, providerDomain, service, role string) (roleToken *zts.RoleToken, err error)
- func InitOptions(opts *Options) (err error)
- func NewCommandConnect(c *Config, useCache bool) (connect *pulsar_proto.CommandConnect, err error)
- func NewConn(c *Config) (conn net.Conn, err error)
- func NewTcpConn(c *Config) (conn net.Conn, err error)
- func NewTlsConn(c *Config) (conn net.Conn, err error)
- type AsyncConn
- func (ac *AsyncConn) Close()
- func (ac *AsyncConn) Connect(msg *pulsar_proto.CommandConnect) (err error)
- func (ac *AsyncConn) GetCommandFromBroker() (cmd *pulsar_proto.BaseCommand)
- func (ac *AsyncConn) GetConfig() (c *Config)
- func (ac *AsyncConn) GetConnection() (conn Conn)
- func (ac *AsyncConn) GetID() (id string)
- func (ac *AsyncConn) LookupTopic(msg *pulsar_proto.CommandLookupTopic) (res *pulsar_proto.CommandLookupTopicResponse, err error)
- func (ac *AsyncConn) Receive() (response *Response, err error)
- func (ac *AsyncConn) Request(r *Request) (response *Response, err error)
- func (ac *AsyncConn) Run()
- func (ac *AsyncConn) Send(r *Request) (err error)
- type AsyncConns
- type AthenzConfig
- type AthenzKey
- type AthenzKeys
- type Authentication
- type AuthenticationAthenz
- type AuthenticationDataAthenz
- type AuthenticationDataProvider
- type Certificate
- type Config
- type Conn
- type ConnPool
- type ConnectionState
- type Consumer
- func (c *Consumer) CloseConsumer(consumerId, requestId uint64) (err error)
- func (c *Consumer) Flow(consumerId uint64, messagePermits uint32) (err error)
- func (c *Consumer) ReceiveMessage() (msg *Message, err error)
- func (c *Consumer) SendAck(consumerId uint64, ackType pulsar_proto.CommandAck_AckType, ...) (err error)
- func (c *Consumer) SendRedeliverUnacknowledgedMessages(subType pulsar_proto.CommandSubscribe_SubType, consumerId uint64, ...) (err error)
- func (c *Consumer) Subscribe(topic, subscription string, subType pulsar_proto.CommandSubscribe_SubType, ...) (err error)
- type IniConfig
- type KeyValue
- type KeyValues
- type Message
- type Options
- type PoolStatus
- type PrivateKey
- type Producer
- func (p *Producer) CloseProducer(producerId, requestId uint64) (err error)
- func (p *Producer) CreateProducer(topic string, producerId, requestId uint64) (err error)
- func (p *Producer) ReceiveProducerSuccess() (success *pulsar_proto.CommandProducerSuccess, err error)
- func (p *Producer) ReceiveSendReceipt() (receipt *pulsar_proto.CommandSendReceipt, err error)
- func (p *Producer) SendBatchSend(producerId, sequenceId uint64, producerName string, ...) (err error)
- func (p *Producer) SendSend(producerId, sequenceId uint64, producerName, payload string, ...) (err error)
- type PulsarClient
- func (c *PulsarClient) Close()
- func (c *PulsarClient) ConnectToBroker(response *pulsar_proto.CommandLookupTopicResponse) (ac *AsyncConn, err error)
- func (c *PulsarClient) GetPartitionedTopicMetadata(topic string, requestId uint64) (err error)
- func (c *PulsarClient) KeepAlive() (err error)
- func (c *PulsarClient) LookupTopicWithConnect(conn Conn, topic string, requestId uint64, authoritative bool) (ac *AsyncConn, err error)
- func (c *PulsarClient) ReceiveSuccess() (success *pulsar_proto.CommandSuccess, err error)
- func (c *PulsarClient) SetLookupTopicConnection(topic string, requestId uint64, authoritative bool) (err error)
- type Request
- type Response
Constants ¶
View Source
const ( ClientName = "go-pulsar" DefaultProtocolVersion = 9 )
View Source
const ( OptionsCommandConsume = "consume" OptionsCommandProduce = "produce" )
View Source
const (
DefaultDeadlineTimeout = time.Duration(40) * time.Second
)
View Source
const (
OptionsAuthMethodAthenz = "athenz"
)
View Source
const (
PROTO_TCP = "tcp"
)
Variables ¶
View Source
var ( ErrKeepAlive = errors.New("failed to receive pong command") ErrLookupTopicResponseFailed = errors.New( "got failed as response type from lookup topic", ) )
View Source
var ( ErrAppendTrustCerts = errors.New("failed to append trust certs file") ErrNoConnection = errors.New("need to establish a connection") ErrSentConnect = errors.New("connecting now, wait for a couple of seconds") ErrHasConnection = errors.New("connection has already established") ErrCloseReadChan = errors.New("read channel has closed") ErrCloseProducerByBroker = errors.New("producer has closed by broker") ErrCloseConsumerByBroker = errors.New("consumer has closed by broker") )
Functions ¶
func GetRoleToken ¶
func InitOptions ¶
func NewCommandConnect ¶
func NewCommandConnect( c *Config, useCache bool, ) (connect *pulsar_proto.CommandConnect, err error)
Types ¶
type AsyncConn ¶
type AsyncConn struct {
// contains filtered or unexported fields
}
func (*AsyncConn) Connect ¶
func (ac *AsyncConn) Connect(msg *pulsar_proto.CommandConnect) (err error)
func (*AsyncConn) GetCommandFromBroker ¶
func (ac *AsyncConn) GetCommandFromBroker() (cmd *pulsar_proto.BaseCommand)
func (*AsyncConn) GetConnection ¶
func (*AsyncConn) LookupTopic ¶
func (ac *AsyncConn) LookupTopic( msg *pulsar_proto.CommandLookupTopic, ) (res *pulsar_proto.CommandLookupTopicResponse, err error)
type AsyncConns ¶
type AsyncConns []*AsyncConn
type AthenzConfig ¶
type AthenzConfig struct {
ZmsUrl string `json:"zmsUrl"`
ZtsUrl string `json:"ztsUrl"`
ZtsPublicKeys AthenzKeys `json:"ztsPublicKeys"`
ZmsPublicKeys AthenzKeys `json:"zmsPublicKeys"`
}
func GetAthenzConfig ¶
func GetAthenzConfig(path string) (athenzConfig *AthenzConfig, err error)
type AthenzKeys ¶
type AthenzKeys []AthenzKey
type Authentication ¶
type Authentication interface {
GetAuthMethodName() string
GetAuthData() (AuthenticationDataProvider, error)
Configure(map[string]string)
Start() error
}
func NewAuthentication ¶
func NewAuthentication(name string, config *Config) (auth Authentication, err error)
type AuthenticationAthenz ¶
type AuthenticationAthenz struct {
// contains filtered or unexported fields
}
func NewAuthenticationAthenz ¶
func NewAuthenticationAthenz(config *Config) (athenz *AuthenticationAthenz)
func (*AuthenticationAthenz) Configure ¶
func (a *AuthenticationAthenz) Configure(authParams map[string]string)
func (*AuthenticationAthenz) GetAuthData ¶
func (a *AuthenticationAthenz) GetAuthData() ( provider AuthenticationDataProvider, err error, )
func (*AuthenticationAthenz) GetAuthMethodName ¶
func (a *AuthenticationAthenz) GetAuthMethodName() (name string)
func (*AuthenticationAthenz) Start ¶
func (a *AuthenticationAthenz) Start() (err error)
type AuthenticationDataAthenz ¶
type AuthenticationDataAthenz struct {
AuthenticationDataProvider
// contains filtered or unexported fields
}
func NewAuthenticationDataAthenz ¶
func NewAuthenticationDataAthenz( roleToken, httpHeaderName string, ) (provider *AuthenticationDataAthenz)
func (*AuthenticationDataAthenz) GetCommandData ¶
func (a *AuthenticationDataAthenz) GetCommandData() string
func (*AuthenticationDataAthenz) GetHttpHeaders ¶
func (a *AuthenticationDataAthenz) GetHttpHeaders() (headers http.Header)
func (*AuthenticationDataAthenz) HasDataForTls ¶
func (a *AuthenticationDataAthenz) HasDataForTls() bool
func (*AuthenticationDataAthenz) HasDataFromCommand ¶
func (a *AuthenticationDataAthenz) HasDataFromCommand() bool
type AuthenticationDataProvider ¶
type AuthenticationDataProvider interface {
HasDataForTls() bool
GetTlsCertificates() []Certificate
GetTlsPrivateKey() PrivateKey
HasDataForHttp() bool
GetHttpAuthType() string
GetHttpHeaders() http.Header
HasDataFromCommand() bool
GetCommandData() string
}
type Certificate ¶
type Certificate struct {
}
type Config ¶
type Config struct {
Proto string
LocalAddr *net.TCPAddr
RemoteAddr *net.TCPAddr
Timeout time.Duration
MinConnectionNum int
MaxConnectionNum int
AuthMethod string
AuthParams map[string]string
UseTLS bool
TLSAllowInsecureConnection bool
TLSTrustCertsFilepath string
AuthenticationDataProvider AuthenticationDataProvider
AthenzConfig *AthenzConfig
AthenzAuthHeader string
ServiceURL *url.URL
LogLevel log.Level
}
func NewConfigFromIni ¶
func NewConfigFromOptions ¶
type Conn ¶
type Conn interface {
GetID() string
GetConfig() *Config
GetConnection() Conn
GetCommandFromBroker() *pulsar_proto.BaseCommand
LookupTopic(*pulsar_proto.CommandLookupTopic,
) (*pulsar_proto.CommandLookupTopicResponse, error)
Connect(*pulsar_proto.CommandConnect) error
Send(*Request) error
Receive() (*Response, error)
Request(*Request) (*Response, error)
Close()
}
type ConnPool ¶
type ConnPool struct {
// contains filtered or unexported fields
}
func NewConnPool ¶
func (*ConnPool) GetStatus ¶
func (p *ConnPool) GetStatus() (status *PoolStatus)
type ConnectionState ¶
type ConnectionState int
const ( ConnectionStateNone ConnectionState = iota + 1 ConnectionStateSentConnectFrame ConnectionStateReady )
type Consumer ¶
type Consumer struct {
*PulsarClient
}
func NewConsumer ¶
func NewConsumer(client *PulsarClient) (c *Consumer)
func (*Consumer) CloseConsumer ¶
func (*Consumer) ReceiveMessage ¶
func (*Consumer) SendAck ¶
func (c *Consumer) SendAck( consumerId uint64, ackType pulsar_proto.CommandAck_AckType, msgIdData *pulsar_proto.MessageIdData, validationError *pulsar_proto.CommandAck_ValidationError, ) (err error)
func (*Consumer) SendRedeliverUnacknowledgedMessages ¶
func (c *Consumer) SendRedeliverUnacknowledgedMessages( subType pulsar_proto.CommandSubscribe_SubType, consumerId uint64, idsList []*pulsar_proto.MessageIdData, ) (err error)
func (*Consumer) Subscribe ¶
func (c *Consumer) Subscribe( topic, subscription string, subType pulsar_proto.CommandSubscribe_SubType, consumerId, requestId uint64, ) (err error)
type IniConfig ¶
type IniConfig struct {
LogLevelString string `ini:"log_level"`
ServiceURLString string `ini:"service_url"`
Timeout time.Duration `ini:"timeout"`
MinConnectionNum int `ini:"min_connection_num"`
MaxConnectionNum int `ini:"max_connection_num"`
AuthMethod string `ini:"auth_method"`
AuthParams string `ini:"auth_params"`
UseTLS bool `ini:"use_tls"`
TLSAllowInsecureConnection bool `ini:"tls_allow_insecure_connection"`
TLSTrustCertsFilepath string `ini:"tls_trust_certs_filepath"`
AthenzConf string `ini:"athenz_conf"`
AthenzAuthHeader string `ini:"athenz_auth_header"`
// internal use
ServiceURL *url.URL `ini:"-"`
LogLevel log.Level `ini:"-"`
AthenzConfig *AthenzConfig `ini:"-"`
}
func LoadIniFile ¶
type KeyValues ¶
type KeyValues []KeyValue
func ConvertPropertiesToKeyValues ¶
func ConvertPropertiesToKeyValues( properties []*pulsar_proto.KeyValue, ) (keyValues KeyValues)
func (KeyValues) Convert ¶
func (kvs KeyValues) Convert() (properties []*pulsar_proto.KeyValue)
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
func NewMessage ¶
func NewMessage( cmd *pulsar_proto.CommandMessage, meta *pulsar_proto.MessageMetadata, body string, batchMessage command.BatchMessage, ) (msg *Message)
func (Message) GetBatchMessage ¶
func (m Message) GetBatchMessage() (batchMessage command.BatchMessage)
func (Message) GetKeyValues ¶
func (Message) GetMessageId ¶
func (m Message) GetMessageId() (data *pulsar_proto.MessageIdData)
func (Message) HasBatchMessage ¶
type Options ¶
type Options struct {
// common options
ServiceURLString *string `long:"serviceUrl" env:"PULSAR_SERVICE_URL" description:"pulsar service url"`
AuthMethod *string `long:"authMethod" env:"PULSAR_AUTH_METHOD" description:"authentication method"`
AuthParams *string `long:"authParams" env:"PULSAR_AUTH_PARAMS" description:"authentication params"`
UseTLS bool `long:"useTls" env:"USE_TLS" description:"use tls to connect"`
TLSAllowInsecureConnection bool `long:"tlsAllowInsecureConnection" env:"TLS_ALLOW_INSECURE_CONNECTION" description:"allow insecure tls connection"`
AthenzConf *string `long:"athenzConf" env:"PULSAR_ATHENZ_CONF" description:"path to athenz config file"`
AthenzAuthHeader *string `long:"athenzAuthHeader" env:"PULSAR_ATHENZ_AUTH_HEADER" description:"athenz authentication header"`
Conf *string `long:"conf" env:"PULSAR_CONF" description:"path to pulsar config file"`
Verbose bool `long:"verbose" env:"VERBOSE" description:"use verbose mode"`
Timeout *time.Duration `long:"timeout" env:"PULSAR_TIMEOUT" description:"timeout to communicate with pulsar broker"`
Command *string `long:"command" env:"PULSAR_COMMAND" description:"produce or consume"`
Topic string `long:"topic" env:"PULSAR_TOPIC" required:"true" description:"topic name"`
// for producer
Messages []string `long:"messages" env:"PULSAR_MESSAGES" description:"messages to produce"`
Properties []string `long:"properties" env:"PULSAR_PROPERTIES" description:"properties to produce. e.g) key1:value1,key2:value2"`
// for consumer
NumMessages int `long:"numMessages" env:"PULSAR_NUM_MESSAGES" default:"1" description:"number of messages to consume"`
SubscriptionName string `long:"subscriptionName" env:"PULSAR_SUBSCRIPTION_NAME" description:"subscription name"`
SubscriptionType string `` /* 135-byte string literal not displayed */
// internal use
ServiceURL *url.URL
}
type PoolStatus ¶
type PoolStatus struct {
// contains filtered or unexported fields
}
func (*PoolStatus) String ¶
func (p *PoolStatus) String() (s string)
type PrivateKey ¶
type PrivateKey interface {
}
type Producer ¶
type Producer struct {
*PulsarClient
}
func NewProducer ¶
func NewProducer(client *PulsarClient) (p *Producer)
func (*Producer) CloseProducer ¶
func (*Producer) CreateProducer ¶
func (*Producer) ReceiveProducerSuccess ¶
func (p *Producer) ReceiveProducerSuccess() ( success *pulsar_proto.CommandProducerSuccess, err error, )
func (*Producer) ReceiveSendReceipt ¶
func (p *Producer) ReceiveSendReceipt() ( receipt *pulsar_proto.CommandSendReceipt, err error, )
func (*Producer) SendBatchSend ¶
func (p *Producer) SendBatchSend( producerId, sequenceId uint64, producerName string, batchMessage command.BatchMessage, compression *pulsar_proto.CompressionType, ) (err error)
type PulsarClient ¶
type PulsarClient struct {
Conn
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(ac *AsyncConn) (client *PulsarClient)
func (*PulsarClient) Close ¶
func (c *PulsarClient) Close()
func (*PulsarClient) ConnectToBroker ¶
func (c *PulsarClient) ConnectToBroker( response *pulsar_proto.CommandLookupTopicResponse, ) (ac *AsyncConn, err error)
func (*PulsarClient) GetPartitionedTopicMetadata ¶
func (c *PulsarClient) GetPartitionedTopicMetadata( topic string, requestId uint64, ) (err error)
func (*PulsarClient) KeepAlive ¶
func (c *PulsarClient) KeepAlive() (err error)
func (*PulsarClient) LookupTopicWithConnect ¶
func (*PulsarClient) ReceiveSuccess ¶
func (c *PulsarClient) ReceiveSuccess() ( success *pulsar_proto.CommandSuccess, err error, )
func (*PulsarClient) SetLookupTopicConnection ¶
func (c *PulsarClient) SetLookupTopicConnection( topic string, requestId uint64, authoritative bool, ) (err error)
Set c.conn to a broker received by lookup topic response
type Request ¶
type Request struct {
Message proto.Message
Meta *pulsar_proto.MessageMetadata
Payload string
BatchMessage command.BatchMessage
}
type Response ¶
type Response struct {
BaseCommand *command.Base
Meta *pulsar_proto.MessageMetadata
Payload string
BatchMessage command.BatchMessage
Error error
}
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cli
|
|
|
pulsar-client
command
|
|
|
internal
|
|
|
proto
|
|
|
pb
Package pulsar_proto is a generated protocol buffer package.
|
Package pulsar_proto is a generated protocol buffer package. |
Click to show internal directories.
Click to hide internal directories.