Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var AfterMessageDelivery = func(m *protocol.Message) {
logger.WithField("message", m).Debug("message delivered")
}
var ( // Config is the active configuration of guble (used when starting-up the server) Config = &GubleConfig{ Log: kingpin.Flag("log", "Log level"). Default(log.ErrorLevel.String()). Envar(g("LOG")). Enum(logLevels()...), EnvName: kingpin.Flag("env", `Name of the environment on which the application is running`). Default(development). Envar(g("ENV")). Enum(environments...), HttpListen: kingpin.Flag("http", `The address to for the HTTP server to listen on (format: "[Host]:Port")`). Default(defaultHttpListen). Envar(g("HTTP_LISTEN")). String(), KVS: kingpin.Flag("kvs", "The storage backend for the key-value store to use : file | memory | postgres "). Default(defaultKVSBackend). Envar(g("KVS")). String(), MS: kingpin.Flag("ms", "The message storage backend : file | memory"). Default(defaultMSBackend). HintOptions("file", "memory"). Envar(g("MS")). String(), StoragePath: kingpin.Flag("storage-path", "The path for storing messages and key-value data if 'file' is selected"). Default(defaultStoragePath). Envar(g("STORAGE_PATH")). ExistingDir(), HealthEndpoint: kingpin.Flag("health-endpoint", `The health endpoint to be used by the HTTP server (value for disabling it: "")`). Default(defaultHealthEndpoint). Envar(g("HEALTH_ENDPOINT")). String(), MetricsEndpoint: kingpin.Flag("metrics-endpoint", `The metrics endpoint to be used by the HTTP server (value for disabling it: "")`). Default(defaultMetricsEndpoint). Envar(g("METRICS_ENDPOINT")). String(), PrometheusEndpoint: kingpin.Flag("prometheus-endpoint", `The metrics Prometheus endpoint to be used by the HTTP server (value for disabling it: "")`). Default(defaultPrometheusEndpoint). Envar(g("PROMETHEUS_ENDPOINT")). String(), TogglesEndpoint: kingpin.Flag("toggles-endpoint", `The Feature-Toggles endpoint to be used by the HTTP server (value for disabling it: "")`). Default(defaultTogglesEndpoint). Envar(g("TOGGLES_ENDPOINT")). String(), Profile: kingpin.Flag("profile", `The profiler to be used (default: none): mem | cpu | block`). Default(""). Envar(g("PROFILE")). Enum("mem", "cpu", "block", ""), Postgres: PostgresConfig{ Host: kingpin.Flag("pg-host", "The PostgreSQL hostname"). Default("localhost"). Envar(g("PG_HOST")). String(), Port: kingpin.Flag("pg-port", "The PostgreSQL port"). Default("5432"). Envar(g("PG_PORT")). Int(), User: kingpin.Flag("pg-user", "The PostgreSQL user"). Default("guble"). Envar(g("PG_USER")). String(), Password: kingpin.Flag("pg-password", "The PostgreSQL password"). Default("guble"). Envar(g("PG_PASSWORD")). String(), DbName: kingpin.Flag("pg-dbname", "The PostgreSQL database name"). Default("guble"). Envar(g("PG_DBNAME")). String(), }, FCM: fcm.Config{ Enabled: kingpin.Flag("fcm", "Enable the Google Firebase Cloud Messaging connector"). Envar(g("FCM")). Bool(), APIKey: kingpin.Flag("fcm-api-key", "The Google API Key for Google Firebase Cloud Messaging"). Envar(g("FCM_API_KEY")). String(), Workers: kingpin.Flag("fcm-workers", "The number of workers handling traffic with Firebase Cloud Messaging (default: number of CPUs)"). Default(strconv.Itoa(runtime.NumCPU())). Envar(g("FCM_WORKERS")). Int(), Endpoint: kingpin.Flag("fcm-endpoint", "The Google Firebase Cloud Messaging endpoint"). Default(defaultFCMEndpoint). Envar(g("FCM_ENDPOINT")). String(), Prefix: kingpin.Flag("fcm-prefix", "The FCM prefix / endpoint"). Envar(g("FCM_PREFIX")). Default("/fcm/"). String(), IntervalMetrics: &defaultFCMMetrics, }, APNS: apns.Config{ Enabled: kingpin.Flag("apns", "Enable the APNS connector (by default, in Development mode)"). Envar(g("APNS")). Bool(), Production: kingpin.Flag("apns-production", "Enable the APNS connector in Production mode"). Envar(g("APNS_PRODUCTION")). Bool(), CertificateFileName: kingpin.Flag("apns-cert-file", "The APNS certificate file name"). Envar(g("APNS_CERT_FILE")). String(), CertificateBytes: kingpin.Flag("apns-cert-bytes", "The APNS certificate bytes, as a string of hex-values"). Envar(g("APNS_CERT_BYTES")). HexBytes(), CertificatePassword: kingpin.Flag("apns-cert-password", "The APNS certificate password"). Envar(g("APNS_CERT_PASSWORD")). String(), AppTopic: kingpin.Flag("apns-app-topic", "The APNS topic (as used by the mobile application)"). Envar(g("APNS_APP_TOPIC")). String(), Prefix: kingpin.Flag("apns-prefix", "The APNS prefix / endpoint"). Envar(g("APNS_PREFIX")). Default("/apns/"). String(), Workers: kingpin.Flag("apns-workers", "The number of workers handling traffic with APNS (default: number of CPUs)"). Default(strconv.Itoa(runtime.NumCPU())). Envar(g("APNS_WORKERS")). Int(), IntervalMetrics: &defaultAPNSMetrics, }, Cluster: ClusterConfig{ NodeID: kingpin.Flag("node-id", "(cluster mode) This guble node's own ID: a strictly positive integer number which must be unique in cluster"). Envar(g("NODE_ID")). Uint8(), NodePort: kingpin.Flag("node-port", "(cluster mode) This guble node's own local port: a strictly positive integer number"). Default(defaultNodePort). Envar(g("NODE_PORT")). Int(), Remotes: tcpAddrListParser(kingpin.Flag("remotes", `(cluster mode) The list of TCP addresses of some other guble nodes (format: "IP:port")`). Envar(g("NODE_REMOTES"))), }, SMS: sms.Config{ Enabled: kingpin.Flag("sms", "Enable the SMS gateway"). Envar(g("SMS")). Bool(), APIKey: kingpin.Flag("sms-api-key", "The Nexmo API Key for Sending sms"). Envar(g("SMS_API_KEY")). String(), APISecret: kingpin.Flag("sms-api-secret", "The Nexmo API Secret for Sending sms"). Envar(g("SMS_API_SECRET")). String(), SMSTopic: kingpin.Flag("sms-topic", "The topic for sms route"). Envar(g("SMS_TOPIC")). Default(sms.SMSDefaultTopic). String(), Toggleable: kingpin.Flag("sms-toggleable", "If sms gateway should be able to be stopped and restarted at runtime"). Envar(g("SMS_TOGGLEABLE")). Bool(), Workers: kingpin.Flag("sms-workers", "The number of workers handling traffic with Nexmo sms endpoint"). Default(strconv.Itoa(runtime.NumCPU())). Envar(g("SMS_WORKERS")). Int(), IntervalMetrics: &defaultSMSMetrics, }, WS: websocket.Config{ Enabled: kingpin.Flag("ws", "Enable the websocket module"). Envar(g("WS")). Bool(), Prefix: kingpin.Flag("ws-prefix", "The Websocket prefix"). Envar(g("WS_PREFIX")). Default("/stream/"). String(), }, KafkaProducer: kafka.Config{ Brokers: configstring.NewFromKingpin( kingpin.Flag("kafka-brokers", `The list Kafka brokers to which Guble should connect (formatted as host:port, separated by spaces or commas)`). Envar(g("KAFKA_BROKERS"))), }, KafkaReportingConfig: KafkaReportingConfig{ SmsReportingTopic: kingpin.Flag("sms-kafka-topic", "The name of the SMS-Reporting Kafka topic"). Envar("GUBLE_SMS_KAFKA_TOPIC"). String(), SubscribeUnsubscribeReportingTopic: kingpin.Flag("subscribe-kafka-topic", "The name of the Subscribe/Unsubscribe Reporting Kafka topic"). Envar("GUBLE_SUBSCRIBE_KAFKA_TOPIC"). String(), ApnsReportingTopic: kingpin.Flag("apns-kafka-topic", "The name of the Apns-Reporting Kafka topic"). Envar("GUBLE_APNS_KAFKA_TOPIC"). String(), FcmReportingTopic: kingpin.Flag("fcm-kafka-topic", "The name of the fcm-Reporting Kafka topic"). Envar("GUBLE_FCM_KAFKA_TOPIC"). String(), }, } )
var CreateKVStore = func() kvstore.KVStore { switch *Config.KVS { case "memory": return kvstore.NewMemoryKVStore() case "file": db := kvstore.NewSqliteKVStore(path.Join(*Config.StoragePath, "kv-store.db"), true) if err := db.Open(); err != nil { logger.WithError(err).Panic("Could not open sqlite database connection") } return db case "postgres": db := kvstore.NewPostgresKVStore(kvstore.PostgresConfig{ ConnParams: map[string]string{ "host": *Config.Postgres.Host, "port": strconv.Itoa(*Config.Postgres.Port), "user": *Config.Postgres.User, "password": *Config.Postgres.Password, "dbname": *Config.Postgres.DbName, "sslmode": "disable", }, MaxIdleConns: 1, MaxOpenConns: runtime.GOMAXPROCS(0), }) if err := db.Open(); err != nil { logger.WithError(err).Panic("Could not open postgres database connection") } return db default: panic(fmt.Errorf("Unknown key-value backend: %q", *Config.KVS)) } }
CreateKVStore is a func which returns a kvstore.KVStore implementation (currently, based on guble configuration).
var CreateMessageStore = func() store.MessageStore { switch *Config.MS { case "none", "memory", "": return dummystore.New(kvstore.NewMemoryKVStore()) case "file": logger.WithField("storagePath", *Config.StoragePath).Info("Using FileMessageStore in directory") return filestore.New(*Config.StoragePath) default: panic(fmt.Errorf("Unknown message-store backend: %q", *Config.MS)) } }
CreateMessageStore is a func which returns a store.MessageStore implementation (currently, based on guble configuration).
var CreateModules = func(router router.Router) (modules []interface{}) { modules = append(modules, rest.NewRestMessageAPI(router, "/api/")) var kafkaProducer kafka.Producer if (*Config.KafkaProducer.Brokers).IsEmpty() { logger.Info("KafkaProducer: disabled") } else { logger.Info("KafkaProducer: enabled") var errKafka error kafkaProducer, errKafka = kafka.NewProducer(Config.KafkaProducer) if errKafka != nil { logger.WithError(errKafka).Error("Could not create KafkaProducer") } else { modules = append(modules, kafkaProducer) } } if *Config.WS.Enabled { if wsHandler, err := websocket.NewWSHandler(router, *Config.WS.Prefix); err != nil { logger.WithError(err).Error("Error loading WSHandler module") } else { modules = append(modules, wsHandler) } } if *Config.FCM.Enabled { logger.Info("Firebase Cloud Messaging: enabled") if *Config.FCM.APIKey == "" { logger.Panic("The API Key has to be provided when Firebase Cloud Messaging is enabled") } Config.FCM.AfterMessageDelivery = AfterMessageDelivery *Config.FCM.IntervalMetrics = true if Config.FCM.Endpoint != nil { gcm.GcmSendEndpoint = *Config.FCM.Endpoint } sender := fcm.NewSender(*Config.FCM.APIKey) if fcmConn, err := fcm.New(router, sender, Config.FCM, kafkaProducer, *Config.KafkaReportingConfig.SubscribeUnsubscribeReportingTopic, *Config.KafkaReportingConfig.FcmReportingTopic); err != nil { logger.WithError(err).Error("Error creating FCM connector") } else { modules = append(modules, fcmConn) } } else { logger.Info("Firebase Cloud Messaging: disabled") } if *Config.APNS.Enabled { if *Config.APNS.Production { logger.Info("APNS: enabled in production mode") } else { logger.Info("APNS: enabled in development mode") } logger.Info("APNS: enabled") if *Config.APNS.CertificateFileName == "" && Config.APNS.CertificateBytes == nil { logger.Panic("The certificate (as filename or bytes) has to be provided when APNS is enabled") } if *Config.APNS.CertificatePassword == "" { logger.Panic("A non-empty password has to be provided when APNS is enabled") } if *Config.APNS.AppTopic == "" { logger.Panic("The Mobile App Topic (usually the bundle-id) has to be provided when APNS is enabled") } apnsSender, err := apns.NewSender(Config.APNS) if err != nil { logger.Panic("APNS Sender could not be created") } *Config.APNS.IntervalMetrics = true if apnsConn, err := apns.New(router, apnsSender, Config.APNS, kafkaProducer, *Config.KafkaReportingConfig.SubscribeUnsubscribeReportingTopic, *Config.KafkaReportingConfig.ApnsReportingTopic); err != nil { logger.WithError(err).Error("Error creating APNS connector") } else { modules = append(modules, apnsConn) } } else { logger.Info("APNS: disabled") } if *Config.SMS.Enabled { logger.Info("SMS: enabled") if *Config.SMS.APIKey == "" || *Config.SMS.APISecret == "" { logger.Panic("The API Key and Secret have to be provided when NEXMO SMS connector is enabled") } nexmoSender, err := sms.NewNexmoSender(*Config.SMS.APIKey, *Config.SMS.APISecret, kafkaProducer, *Config.KafkaReportingConfig.SmsReportingTopic) if err != nil { logger.WithError(err).Error("Error creating Nexmo Sender") } smsGateway, err := sms.New(router, nexmoSender, Config.SMS) if err != nil { logger.WithError(err).Error("Error creating SMS Gateway") } else { modules = append(modules, smsGateway) } } else { logger.Info("SMS: disabled") } return }
CreateModules is a func which returns a slice of modules which should be used by the service (currently, based on guble configuration); see package `service` for terminological details.
var ValidateStoragePath = func() error { if *Config.KVS == fileOption || *Config.MS == fileOption { testfile := path.Join(*Config.StoragePath, "write-test-file") f, err := os.Create(testfile) if err != nil { logger.WithError(err).WithField("storagePath", *Config.StoragePath).Error("Storage path not present/writeable.") return err } f.Close() os.Remove(testfile) } return nil }
ValidateStoragePath validates the guble configuration with regard to the storagePath (which can be used by MessageStore and/or KVStore implementations).
Functions ¶
func StartService ¶
StartService starts a server.Service after first creating the router (and its dependencies), the webserver.
Types ¶
type ClusterConfig ¶
ClusterConfig is used for configuring the cluster component.
type GubleConfig ¶
type GubleConfig struct { Log *string EnvName *string HttpListen *string KVS *string MS *string StoragePath *string HealthEndpoint *string MetricsEndpoint *string PrometheusEndpoint *string TogglesEndpoint *string Profile *string Postgres PostgresConfig FCM fcm.Config APNS apns.Config SMS sms.Config WS websocket.Config KafkaProducer kafka.Config Cluster ClusterConfig KafkaReportingConfig KafkaReportingConfig }
GubleConfig is used for configuring Guble server (including its modules / connectors).