platform_connector_lib

package module
v0.0.0-...-b1e6648 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2024 License: Apache-2.0 Imports: 30 Imported by: 14

README

Library to create a platform-connector to handle a protocol which communicates with devices

Documentation

Index

Constants

View Source
const MutedDeviceErrorsAttribute = "platform/mute-format-error"
View Source
const Seperator = "$"

Variables

View Source
var ErrorUnknownLocalServiceId = errors.New("unknown local service id")

Functions

func DecodeModifierParameter

func DecodeModifierParameter(parameter string) (result map[string][]string, err error)

func SplitModifier

func SplitModifier(id string) (pureId string, modifier map[string][]string)

func TrimIdModifier

func TrimIdModifier(id string) (pureId string)

Types

type AsyncCommandHandler

type AsyncCommandHandler func(commandRequest model.ProtocolMsg, requestMsg CommandRequestMsg, t time.Time) (err error)

type CommandRequestMsg

type CommandRequestMsg = map[ProtocolSegmentName]string

type CommandResponseMsg

type CommandResponseMsg = map[ProtocolSegmentName]string

type Config

type Config struct {
	KafkaUrl           string
	KafkaResponseTopic string
	KafkaGroupName     string
	FatalKafkaError    bool
	Protocol           string

	DeviceManagerUrl string
	DeviceRepoUrl    string

	AuthClientId             string //keycloak-client
	AuthClientSecret         string //keycloak-secret
	AuthExpirationTimeBuffer float64
	AuthEndpoint             string

	JwtPrivateKey string
	JwtExpiration int64
	JwtIssuer     string

	DeviceExpiration      int32
	DeviceTypeExpiration  int32
	TokenCacheExpiration  int32
	IotCacheUrl           []string
	TokenCacheUrl         []string
	Debug                 bool
	SerializationFallback string

	Validate                  bool
	ValidateAllowUnknownField bool
	ValidateAllowMissingField bool

	CharacteristicExpiration int32
	PartitionsNum            int
	ReplicationFactor        int

	PublishToPostgres bool
	PostgresHost      string
	PostgresPort      int
	PostgresUser      string
	PostgresPw        string
	PostgresDb        string

	HttpCommandConsumerPort string

	PermQueryUrl string

	AsyncPgThreadMax    int
	AsyncFlushMessages  int
	AsyncFlushFrequency time.Duration
	AsyncCompression    sarama.CompressionCodec
	SyncCompression     sarama.CompressionCodec

	KafkaConsumerMaxWait  string
	KafkaConsumerMinBytes int
	KafkaConsumerMaxBytes int

	IotCacheTimeout      string
	IotCacheMaxIdleConns int

	NotificationUrl string

	DeviceTypeTopic string

	KafkaTopicConfigs map[string][]kafka.ConfigEntry

	NotificationsIgnoreDuplicatesWithinS int
	NotificationUserOverwrite            string

	DeveloperNotificationUrl string //optional

}

func LoadConfig

func LoadConfig(location string) (config Config, err error)

loads config from json in location and used environment variables (e.g KafkaUrl --> ZOOKEEPER_URL)

type Connector

type Connector struct {
	Config Config

	IotCache *iot.PreparedCache
	// contains filtered or unexported fields
}

func New

func New(config Config) (connector *Connector, err error)

func (*Connector) CleanMsg

func (this *Connector) CleanMsg(msg map[string]interface{}, service model.Service) (map[string]interface{}, error)

func (*Connector) GetProducer

func (this *Connector) GetProducer(qos Qos) (producer kafka.ProducerInterface, err error)

func (*Connector) HandleClientError

func (this *Connector) HandleClientError(userId string, clientId string, errorMessage string)

func (*Connector) HandleCommandError

func (this *Connector) HandleCommandError(userId string, commandRequest model.ProtocolMsg, errorMessage string)

func (*Connector) HandleCommandResponse

func (this *Connector) HandleCommandResponse(commandRequest model.ProtocolMsg, commandResponse CommandResponseMsg, qos Qos) (err error)

func (*Connector) HandleDeviceError

func (this *Connector) HandleDeviceError(userId string, device model.Device, errorMessage string)

func (*Connector) HandleDeviceEvent

func (this *Connector) HandleDeviceEvent(username string, password string, deviceId string, serviceId string, protocolParts map[string]string, qos Qos) (err error)

func (*Connector) HandleDeviceEventWithAuthToken

func (this *Connector) HandleDeviceEventWithAuthToken(token security.JwtToken, deviceId string, serviceId string, eventMsg EventMsg, qos Qos) (err error)

func (*Connector) HandleDeviceIdentEvent

func (this *Connector) HandleDeviceIdentEvent(username string, password string, deviceId string, localDeviceId string, serviceId string, localServiceId string, eventMsg EventMsg, qos Qos) (info HandledDeviceInfo, err error)

func (*Connector) HandleDeviceIdentEventWithAuthToken

func (this *Connector) HandleDeviceIdentEventWithAuthToken(token security.JwtToken, deviceId string, localDeviceId string, serviceId string, localServiceId string, eventMsg EventMsg, qos Qos) (info HandledDeviceInfo, err error)

func (*Connector) HandleDeviceRefEvent

func (this *Connector) HandleDeviceRefEvent(username string, password string, deviceUri string, serviceUri string, eventMsg EventMsg, qos Qos) (info HandledDeviceInfo, err error)

func (*Connector) HandleDeviceRefEventWithAuthToken

func (this *Connector) HandleDeviceRefEventWithAuthToken(token security.JwtToken, deviceUri string, serviceUri string, eventMsg EventMsg, qos Qos) (info HandledDeviceInfo, err error)

func (*Connector) InitProducer

func (this *Connector) InitProducer(ctx context.Context, qosList []Qos) (err error)

func (*Connector) Iot

func (this *Connector) Iot() *iot.Iot

func (*Connector) Security

func (this *Connector) Security() Security

func (*Connector) SendNotification

func (this *Connector) SendNotification(message Notification) error

func (*Connector) SetAsyncCommandHandler

func (this *Connector) SetAsyncCommandHandler(handler AsyncCommandHandler) *Connector

asyncCommandHandler, endpointCommandHandler and deviceCommandHandler are mutual exclusive

func (*Connector) SetDeviceCommandHandler

func (this *Connector) SetDeviceCommandHandler(handler DeviceCommandHandler) *Connector

asyncCommandHandler, endpointCommandHandler and deviceCommandHandler are mutual exclusive

func (*Connector) SetKafkaLogger

func (this *Connector) SetKafkaLogger(logger *log.Logger)

func (*Connector) Start

func (this *Connector) Start(ctx context.Context, qosList ...Qos) (err error)

func (*Connector) StartConsumer

func (this *Connector) StartConsumer(ctx context.Context) (err error)

func (*Connector) ValidateMsg

func (this *Connector) ValidateMsg(msg map[string]interface{}, service model.Service) error

type DeviceCommandHandler

type DeviceCommandHandler func(deviceId string, deviceUri string, serviceId string, serviceUri string, requestMsg CommandRequestMsg) (responseMsg CommandResponseMsg, qos Qos, err error)

type DeviceTypeCommand

type DeviceTypeCommand struct {
	Command string `json:"command"`
	Id      string `json:"id"`
}

type EventMsg

type EventMsg = map[ProtocolSegmentName]string

type HandledDeviceInfo

type HandledDeviceInfo struct {
	DeviceId     string
	DeviceTypeId string
	ServiceIds   []string
}

type Notification

type Notification struct {
	UserId  string `json:"userId" bson:"userId"`
	Title   string `json:"title" bson:"title"`
	Message string `json:"message" bson:"message"`
}

type ProtocolSegmentName

type ProtocolSegmentName = string

type Qos

type Qos int
const (
	Async Qos = iota
	Sync
	SyncIdempotent
)

type Security

type Security interface {
	Access() (token security.JwtToken, err error)
	ResetAccess()
	GenerateUserTokenById(userid string) (token security.JwtToken, err error)
	GenerateUserToken(username string) (token security.JwtToken, err error)
	ExchangeUserToken(userid string) (token security.JwtToken, err error)
	GetUserToken(username string, password string) (token security.JwtToken, err error)
	GetCachedUserToken(username string) (token security.JwtToken, err error)
	GetUserId(username string) (userid string, err error)
	GetUserRoles(userid string) (roles []string, err error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL