server

package
v0.0.0-...-b4df255 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2017 License: MIT Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AfterMessageDelivery = func(m *protocol.Message) {
	logger.WithField("message", m).Debug("message delivered")
}
View Source
var (

	// Config is the active configuration of guble (used when starting-up the server)
	Config = &GubleConfig{
		Log: kingpin.Flag("log", "Log level").
			Default(log.ErrorLevel.String()).
			Envar(g("LOG")).
			Enum(logLevels()...),
		EnvName: kingpin.Flag("env", `Name of the environment on which the application is running`).
			Default(development).
			Envar(g("ENV")).
			Enum(environments...),
		HttpListen: kingpin.Flag("http", `The address to for the HTTP server to listen on (format: "[Host]:Port")`).
			Default(defaultHttpListen).
			Envar(g("HTTP_LISTEN")).
			String(),
		KVS: kingpin.Flag("kvs", "The storage backend for the key-value store to use : file | memory | postgres ").
			Default(defaultKVSBackend).
			Envar(g("KVS")).
			String(),
		MS: kingpin.Flag("ms", "The message storage backend : file | memory").
			Default(defaultMSBackend).
			HintOptions("file", "memory").
			Envar(g("MS")).
			String(),
		StoragePath: kingpin.Flag("storage-path", "The path for storing messages and key-value data if 'file' is selected").
			Default(defaultStoragePath).
			Envar(g("STORAGE_PATH")).
			ExistingDir(),
		HealthEndpoint: kingpin.Flag("health-endpoint", `The health endpoint to be used by the HTTP server (value for disabling it: "")`).
			Default(defaultHealthEndpoint).
			Envar(g("HEALTH_ENDPOINT")).
			String(),
		MetricsEndpoint: kingpin.Flag("metrics-endpoint", `The metrics endpoint to be used by the HTTP server (value for disabling it: "")`).
			Default(defaultMetricsEndpoint).
			Envar(g("METRICS_ENDPOINT")).
			String(),
		PrometheusEndpoint: kingpin.Flag("prometheus-endpoint", `The metrics Prometheus endpoint to be used by the HTTP server (value for disabling it: "")`).
			Default(defaultPrometheusEndpoint).
			Envar(g("PROMETHEUS_ENDPOINT")).
			String(),
		TogglesEndpoint: kingpin.Flag("toggles-endpoint", `The Feature-Toggles endpoint to be used by the HTTP server (value for disabling it: "")`).
			Default(defaultTogglesEndpoint).
			Envar(g("TOGGLES_ENDPOINT")).
			String(),
		Profile: kingpin.Flag("profile", `The profiler to be used (default: none): mem | cpu | block`).
			Default("").
			Envar(g("PROFILE")).
			Enum("mem", "cpu", "block", ""),
		Postgres: PostgresConfig{
			Host: kingpin.Flag("pg-host", "The PostgreSQL hostname").
				Default("localhost").
				Envar(g("PG_HOST")).
				String(),
			Port: kingpin.Flag("pg-port", "The PostgreSQL port").
				Default("5432").
				Envar(g("PG_PORT")).
				Int(),
			User: kingpin.Flag("pg-user", "The PostgreSQL user").
				Default("guble").
				Envar(g("PG_USER")).
				String(),
			Password: kingpin.Flag("pg-password", "The PostgreSQL password").
				Default("guble").
				Envar(g("PG_PASSWORD")).
				String(),
			DbName: kingpin.Flag("pg-dbname", "The PostgreSQL database name").
				Default("guble").
				Envar(g("PG_DBNAME")).
				String(),
		},
		FCM: fcm.Config{
			Enabled: kingpin.Flag("fcm", "Enable the Google Firebase Cloud Messaging connector").
				Envar(g("FCM")).
				Bool(),
			APIKey: kingpin.Flag("fcm-api-key", "The Google API Key for Google Firebase Cloud Messaging").
				Envar(g("FCM_API_KEY")).
				String(),
			Workers: kingpin.Flag("fcm-workers", "The number of workers handling traffic with Firebase Cloud Messaging (default: number of CPUs)").
				Default(strconv.Itoa(runtime.NumCPU())).
				Envar(g("FCM_WORKERS")).
				Int(),
			Endpoint: kingpin.Flag("fcm-endpoint", "The Google Firebase Cloud Messaging endpoint").
				Default(defaultFCMEndpoint).
				Envar(g("FCM_ENDPOINT")).
				String(),
			Prefix: kingpin.Flag("fcm-prefix", "The FCM prefix / endpoint").
				Envar(g("FCM_PREFIX")).
				Default("/fcm/").
				String(),
			IntervalMetrics: &defaultFCMMetrics,
		},
		APNS: apns.Config{
			Enabled: kingpin.Flag("apns", "Enable the APNS connector (by default, in Development mode)").
				Envar(g("APNS")).
				Bool(),
			Production: kingpin.Flag("apns-production", "Enable the APNS connector in Production mode").
				Envar(g("APNS_PRODUCTION")).
				Bool(),
			CertificateFileName: kingpin.Flag("apns-cert-file", "The APNS certificate file name").
				Envar(g("APNS_CERT_FILE")).
				String(),
			CertificateBytes: kingpin.Flag("apns-cert-bytes", "The APNS certificate bytes, as a string of hex-values").
				Envar(g("APNS_CERT_BYTES")).
				HexBytes(),
			CertificatePassword: kingpin.Flag("apns-cert-password", "The APNS certificate password").
				Envar(g("APNS_CERT_PASSWORD")).
				String(),
			AppTopic: kingpin.Flag("apns-app-topic", "The APNS topic (as used by the mobile application)").
				Envar(g("APNS_APP_TOPIC")).
				String(),
			Prefix: kingpin.Flag("apns-prefix", "The APNS prefix / endpoint").
				Envar(g("APNS_PREFIX")).
				Default("/apns/").
				String(),
			Workers: kingpin.Flag("apns-workers", "The number of workers handling traffic with APNS (default: number of CPUs)").
				Default(strconv.Itoa(runtime.NumCPU())).
				Envar(g("APNS_WORKERS")).
				Int(),
			IntervalMetrics: &defaultAPNSMetrics,
		},
		Cluster: ClusterConfig{
			NodeID: kingpin.Flag("node-id", "(cluster mode) This guble node's own ID: a strictly positive integer number which must be unique in cluster").
				Envar(g("NODE_ID")).
				Uint8(),
			NodePort: kingpin.Flag("node-port", "(cluster mode) This guble node's own local port: a strictly positive integer number").
				Default(defaultNodePort).
				Envar(g("NODE_PORT")).
				Int(),
			Remotes: tcpAddrListParser(kingpin.Flag("remotes", `(cluster mode) The list of TCP addresses of some other guble nodes (format: "IP:port")`).
				Envar(g("NODE_REMOTES"))),
		},
		SMS: sms.Config{
			Enabled: kingpin.Flag("sms", "Enable the SMS gateway").
				Envar(g("SMS")).
				Bool(),
			APIKey: kingpin.Flag("sms-api-key", "The Nexmo API Key for Sending sms").
				Envar(g("SMS_API_KEY")).
				String(),
			APISecret: kingpin.Flag("sms-api-secret", "The Nexmo API Secret for Sending sms").
				Envar(g("SMS_API_SECRET")).
				String(),
			SMSTopic: kingpin.Flag("sms-topic", "The topic for sms route").
				Envar(g("SMS_TOPIC")).
				Default(sms.SMSDefaultTopic).
				String(),
			Toggleable: kingpin.Flag("sms-toggleable", "If sms gateway should be able to be stopped and restarted at runtime").
				Envar(g("SMS_TOGGLEABLE")).
				Bool(),
			Workers: kingpin.Flag("sms-workers", "The number of workers handling traffic with Nexmo sms endpoint").
				Default(strconv.Itoa(runtime.NumCPU())).
				Envar(g("SMS_WORKERS")).
				Int(),
			IntervalMetrics: &defaultSMSMetrics,
		},
		WS: websocket.Config{
			Enabled: kingpin.Flag("ws", "Enable the websocket module").
				Envar(g("WS")).
				Bool(),
			Prefix: kingpin.Flag("ws-prefix", "The Websocket prefix").
				Envar(g("WS_PREFIX")).
				Default("/stream/").
				String(),
		},
		KafkaProducer: kafka.Config{
			Brokers: configstring.NewFromKingpin(
				kingpin.Flag("kafka-brokers", `The list Kafka brokers to which Guble should connect (formatted as host:port, separated by spaces or commas)`).
					Envar(g("KAFKA_BROKERS"))),
		},
		KafkaReportingConfig: KafkaReportingConfig{
			SmsReportingTopic: kingpin.Flag("sms-kafka-topic", "The name of the SMS-Reporting Kafka topic").
				Envar("GUBLE_SMS_KAFKA_TOPIC").
				String(),
			SubscribeUnsubscribeReportingTopic: kingpin.Flag("subscribe-kafka-topic", "The name of the  Subscribe/Unsubscribe Reporting Kafka topic").
				Envar("GUBLE_SUBSCRIBE_KAFKA_TOPIC").
				String(),
			ApnsReportingTopic: kingpin.Flag("apns-kafka-topic", "The name of the Apns-Reporting Kafka topic").
				Envar("GUBLE_APNS_KAFKA_TOPIC").
				String(),
			FcmReportingTopic: kingpin.Flag("fcm-kafka-topic", "The name of the fcm-Reporting Kafka topic").
				Envar("GUBLE_FCM_KAFKA_TOPIC").
				String(),
		},
	}
)
View Source
var CreateKVStore = func() kvstore.KVStore {
	switch *Config.KVS {
	case "memory":
		return kvstore.NewMemoryKVStore()
	case "file":
		db := kvstore.NewSqliteKVStore(path.Join(*Config.StoragePath, "kv-store.db"), true)
		if err := db.Open(); err != nil {
			logger.WithError(err).Panic("Could not open sqlite database connection")
		}
		return db
	case "postgres":
		db := kvstore.NewPostgresKVStore(kvstore.PostgresConfig{
			ConnParams: map[string]string{
				"host":     *Config.Postgres.Host,
				"port":     strconv.Itoa(*Config.Postgres.Port),
				"user":     *Config.Postgres.User,
				"password": *Config.Postgres.Password,
				"dbname":   *Config.Postgres.DbName,
				"sslmode":  "disable",
			},
			MaxIdleConns: 1,
			MaxOpenConns: runtime.GOMAXPROCS(0),
		})
		if err := db.Open(); err != nil {
			logger.WithError(err).Panic("Could not open postgres database connection")
		}
		return db
	default:
		panic(fmt.Errorf("Unknown key-value backend: %q", *Config.KVS))
	}
}

CreateKVStore is a func which returns a kvstore.KVStore implementation (currently, based on guble configuration).

View Source
var CreateMessageStore = func() store.MessageStore {
	switch *Config.MS {
	case "none", "memory", "":
		return dummystore.New(kvstore.NewMemoryKVStore())
	case "file":
		logger.WithField("storagePath", *Config.StoragePath).Info("Using FileMessageStore in directory")
		return filestore.New(*Config.StoragePath)
	default:
		panic(fmt.Errorf("Unknown message-store backend: %q", *Config.MS))
	}
}

CreateMessageStore is a func which returns a store.MessageStore implementation (currently, based on guble configuration).

View Source
var CreateModules = func(router router.Router) (modules []interface{}) {

	modules = append(modules, rest.NewRestMessageAPI(router, "/api/"))

	var kafkaProducer kafka.Producer
	if (*Config.KafkaProducer.Brokers).IsEmpty() {
		logger.Info("KafkaProducer: disabled")
	} else {
		logger.Info("KafkaProducer: enabled")
		var errKafka error
		kafkaProducer, errKafka = kafka.NewProducer(Config.KafkaProducer)
		if errKafka != nil {
			logger.WithError(errKafka).Error("Could not create KafkaProducer")
		} else {
			modules = append(modules, kafkaProducer)
		}
	}

	if *Config.WS.Enabled {
		if wsHandler, err := websocket.NewWSHandler(router, *Config.WS.Prefix); err != nil {
			logger.WithError(err).Error("Error loading WSHandler module")
		} else {
			modules = append(modules, wsHandler)
		}
	}

	if *Config.FCM.Enabled {
		logger.Info("Firebase Cloud Messaging: enabled")
		if *Config.FCM.APIKey == "" {
			logger.Panic("The API Key has to be provided when Firebase Cloud Messaging is enabled")
		}
		Config.FCM.AfterMessageDelivery = AfterMessageDelivery
		*Config.FCM.IntervalMetrics = true
		if Config.FCM.Endpoint != nil {
			gcm.GcmSendEndpoint = *Config.FCM.Endpoint
		}
		sender := fcm.NewSender(*Config.FCM.APIKey)
		if fcmConn, err := fcm.New(router, sender, Config.FCM, kafkaProducer,
			*Config.KafkaReportingConfig.SubscribeUnsubscribeReportingTopic,
			*Config.KafkaReportingConfig.FcmReportingTopic); err != nil {
			logger.WithError(err).Error("Error creating FCM connector")
		} else {
			modules = append(modules, fcmConn)
		}
	} else {
		logger.Info("Firebase Cloud Messaging: disabled")
	}

	if *Config.APNS.Enabled {
		if *Config.APNS.Production {
			logger.Info("APNS: enabled in production mode")
		} else {
			logger.Info("APNS: enabled in development mode")
		}
		logger.Info("APNS: enabled")
		if *Config.APNS.CertificateFileName == "" && Config.APNS.CertificateBytes == nil {
			logger.Panic("The certificate (as filename or bytes) has to be provided when APNS is enabled")
		}
		if *Config.APNS.CertificatePassword == "" {
			logger.Panic("A non-empty password has to be provided when APNS is enabled")
		}
		if *Config.APNS.AppTopic == "" {
			logger.Panic("The Mobile App Topic (usually the bundle-id) has to be provided when APNS is enabled")
		}
		apnsSender, err := apns.NewSender(Config.APNS)
		if err != nil {
			logger.Panic("APNS Sender could not be created")
		}
		*Config.APNS.IntervalMetrics = true
		if apnsConn, err := apns.New(router,
			apnsSender, Config.APNS, kafkaProducer,
			*Config.KafkaReportingConfig.SubscribeUnsubscribeReportingTopic,
			*Config.KafkaReportingConfig.ApnsReportingTopic); err != nil {
			logger.WithError(err).Error("Error creating APNS connector")
		} else {
			modules = append(modules, apnsConn)
		}
	} else {
		logger.Info("APNS: disabled")
	}

	if *Config.SMS.Enabled {
		logger.Info("SMS: enabled")
		if *Config.SMS.APIKey == "" || *Config.SMS.APISecret == "" {
			logger.Panic("The API Key and Secret have to be provided when NEXMO SMS connector is enabled")
		}
		nexmoSender, err := sms.NewNexmoSender(*Config.SMS.APIKey, *Config.SMS.APISecret, kafkaProducer, *Config.KafkaReportingConfig.SmsReportingTopic)
		if err != nil {
			logger.WithError(err).Error("Error creating Nexmo Sender")
		}
		smsGateway, err := sms.New(router, nexmoSender, Config.SMS)
		if err != nil {
			logger.WithError(err).Error("Error creating SMS Gateway")
		} else {
			modules = append(modules, smsGateway)
		}
	} else {
		logger.Info("SMS: disabled")
	}

	return
}

CreateModules is a func which returns a slice of modules which should be used by the service (currently, based on guble configuration); see package `service` for terminological details.

View Source
var ValidateStoragePath = func() error {
	if *Config.KVS == fileOption || *Config.MS == fileOption {
		testfile := path.Join(*Config.StoragePath, "write-test-file")
		f, err := os.Create(testfile)
		if err != nil {
			logger.WithError(err).WithField("storagePath", *Config.StoragePath).Error("Storage path not present/writeable.")
			return err
		}
		f.Close()
		os.Remove(testfile)
	}
	return nil
}

ValidateStoragePath validates the guble configuration with regard to the storagePath (which can be used by MessageStore and/or KVStore implementations).

Functions

func Main

func Main()

Main is the entry-point of the guble server.

func StartService

func StartService() *service.Service

StartService starts a server.Service after first creating the router (and its dependencies), the webserver.

Types

type ClusterConfig

type ClusterConfig struct {
	NodeID   *uint8
	NodePort *int
	Remotes  *tcpAddrList
}

ClusterConfig is used for configuring the cluster component.

type GubleConfig

type GubleConfig struct {
	Log                  *string
	EnvName              *string
	HttpListen           *string
	KVS                  *string
	MS                   *string
	StoragePath          *string
	HealthEndpoint       *string
	MetricsEndpoint      *string
	PrometheusEndpoint   *string
	TogglesEndpoint      *string
	Profile              *string
	Postgres             PostgresConfig
	FCM                  fcm.Config
	APNS                 apns.Config
	SMS                  sms.Config
	WS                   websocket.Config
	KafkaProducer        kafka.Config
	Cluster              ClusterConfig
	KafkaReportingConfig KafkaReportingConfig
}

GubleConfig is used for configuring Guble server (including its modules / connectors).

type KafkaReportingConfig

type KafkaReportingConfig struct {
	SmsReportingTopic                  *string
	SubscribeUnsubscribeReportingTopic *string
	FcmReportingTopic                  *string
	ApnsReportingTopic                 *string
}

type PostgresConfig

type PostgresConfig struct {
	Host     *string
	Port     *int
	User     *string
	Password *string
	DbName   *string
}

PostgresConfig is used for configuring the Postgresql connection.

Directories

Path Synopsis
filestore
Package filestore is a filesystem-based implementation of the MessageStore interface.
Package filestore is a filesystem-based implementation of the MessageStore interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL