ekafka

package module
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2024 License: MIT Imports: 36 Imported by: 0

README

ekafka 组件使用指南

goproxy.cn Release Example Doc

Table of contents

注意事项

如果生产、消费kafka比较慢,并不一定是kafka慢,而是你的配置不正确,请认真查看文档:https://github.com/segmentio/kafka-go/issues/417

  • producer比较慢,可能是用的默认配置batchTimeout,1s发送

以下列举kafka producer客户端比较关键的默认配置

  • BatchTimeout 批量发送消息的周期,默认1s
  • BatchSize 批量发送的消息数量,默认100条
  • BatchBytes 批量发送的消息大小,默认1MB
  • Async 设置成true时会导致WriteMessages非阻塞,会导致调用WriteMessages方法获取不到error
  • RequiredAcks ACK配置
    • RequireNone (0) fire-and-forget,producer不等待来自broker同步完成的确认后,就可以发送下一批消息
    • RequireOne (1) producer在leader已成功收到的数据并得到确认后,才发送下一批消息
    • RequireAll (-1) producer在所有follower副本确认接收到数据后,才发送下一批消息

更改以上默认配置,可以让速度变快。当然默认配置是可以让你的服务性能损耗耕地,或者可靠性更高,所以你自己需要权衡业务需求,更改配置。

基本组件

kafka-go 进行了轻量封装,并提供了以下功能:

  • 规范了标准配置格式,提供了统一的 Load().Build() 方法。
  • 支持自定义拦截器
  • 提供了默认的 Debug 拦截器,开启 Debug 后可输出 Request、Response 至终端。
快速上手

生产者消费者使用样例可参考 example

ConsumerGroup

相对于 Consumer(对 kafka-go 的封装)来说,ConsumerGroup 则提供了更加易用的 API。这是一个简单的例子:

首先添加所需配置:

[kafka]
brokers = ["127.0.0.1:9092", "127.0.0.1:9093", "127.0.0.1:9094"]

[kafka.client]
timeout = "3s"

[kafka.consumerGroups.cg1]
JoinGroupBackoff = "1s"
groupID = "group-1"
topic = "my-topic"
package main

import "github.com/ego-component/ekafka"

func main() {
	cmp := ekafka.Load("kafka").Build()
	// 获取实例(第一次调用时初始化,再次获取时会复用)
	cg := cmp.ConsumerGroup("cg1")

	for {
		pollCtx, _ := context.WithTimeout(ctx, 1*time.Minute)
		// 拉取事件,可能是消息、Rebalancing 事件或者错误等
		event, err := consumerGroup.Poll(pollCtx)
		if err != nil {
			elog.Panic("poll error")
			return
		}
		switch e := event.(type) {
		case ekafka.Message:
			// 按需处理消息
		case ekafka.AssignedPartitions:
			// 在 Kafka 完成分区分配时触发一次
		case ekafka.RevokedPartitions:
			// 在当前 Generation 结束时触发一次
		case error:
			// 错误处理

			// 结束
			if err := cg.Close(); err != nil {
				elog.Panic("关闭ConsumerGroup失败")
			}
		default:
			// ...
		}
	}
}
配置说明
[kafka.consumerGroups.cg1]
# GroupID is the name of the consumer group.
groupID = "group-1"
# The topic to read messages from.
topic = "my-topic"
# HeartbeatInterval sets the optional frequency at which the reader sends the consumer
# group heartbeat update.
#
# Default: 3s
heartbeatInterval = "3s"
# PartitionWatchInterval indicates how often a reader checks for partition changes.
# If a reader sees a partition change (such as a partition add) it will rebalance the group
# picking up new partitions.
#
# Default: 5s
partitionWatchInterval = "5s"
# WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
# polling the brokers and rebalancing if any partition changes happen to the topic.
watchPartitionChanges = false
# SessionTimeout optionally sets the length of time that may pass without a heartbeat
# before the coordinator considers the consumer dead and initiates a rebalance.
#
# Default: 30s
sessionTimeout = "30s"
# RebalanceTimeout optionally sets the length of time the coordinator will wait
# for members to join as part of a rebalance.  For kafka servers under higher
# load, it may be useful to set this value higher.
#
# Default: 30s
rebalanceTimeout = "30s"
# JoinGroupBackoff optionally sets the length of time to wait before re-joining
# the consumer group after an error.
#
# Default: 5s
joinGroupBackoff = "5s"
# StartOffset determines from whence the consumer group should begin
# consuming when it finds a partition without a committed offset.  If
# non-zero, it must be set to one of FirstOffset or LastOffset.
#
# Default: `-2` (FirstOffset)
startOffset = "-2"
# RetentionTime optionally sets the length of time the consumer group will
# be saved by the broker.  -1 will disable the setting and leave the
# retention up to the broker's offsets.retention.minutes property.  By
# default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >=
# 2.0.
#
# Default: -1
retentionTime = "-1"
# Timeout is the network timeout used when communicating with the consumer
# group coordinator.  This value should not be too small since errors
# communicating with the broker will generally cause a consumer group
# rebalance, and it's undesirable that a transient network error intoduce
# that overhead.  Similarly, it should not be too large or the consumer
# group may be slow to respond to the coordinator failing over to another
# broker.
#
# Default: 5s
timeout = "5s"
# MinBytes indicates to the broker the minimum batch size that the consumer
# will accept. Setting a high minimum when consuming from a low-volume topic
# may result in delayed delivery when the broker does not have enough data to
# satisfy the defined minimum.
#
# Default: 1
minBytes = 1
# MaxBytes indicates to the broker the maximum batch size that the consumer
# will accept. The broker will truncate a message to satisfy this maximum, so
# choose a value that is high enough for your largest message size.
#
# Default: 1MB
maxBytes = 1048576
# Maximum amount of time to wait for new data to come when fetching batches
# of messages from kafka.
#
# Default: 10s
maxWait = "10s"
# ReadLagInterval sets the frequency at which the reader lag is updated.
# Setting this field to a negative value disables lag reporting.
#
# Default: 60s
readLagInterval = "60s"
# CommitInterval indicates the interval at which offsets are committed to
# the broker.  If 0, commits will be handled synchronously.
#
# Default: 0
commitInterval = "0"
# BackoffDelayMin optionally sets the smallest amount of time the reader will wait before
# polling for new messages
#
# Default: 100ms
readBackoffMin = "100ms"
# BackoffDelayMax optionally sets the maximum amount of time the reader will wait before
# polling for new messages
#
# Default: 1s
readBackoffMax = "1s"

Consumer Server 组件

必须配合 ConsumerGroup 使用。

基本组件通常是搭配其他服务模块(如 HTTP 服务)一起使用的,如果只想使用 Ego 做单纯的 Kafka 消费应用,可以使用 Consumer Server 组件

Consumer Server 组件依赖于基本组件提供 Kafka 消费者实例,同时实现了 ego.Server 接口以达到常驻运行的目的,并且和 Ego 框架共享生命周期。

Consumer Server 支持两种消费模式:

  • 逐条消费:会在处理消息的回调执行完成之后立即 commit。但实际 commit 时机依赖于 kafka-go 的配置,kafka-go 支持设置定时提交来提高性能
  • 手动消费:Consumer Server 提供一个生命周期的 context 和 consumer 对象,开发者自行决定如何消费
配置说明
[kafka]
debug=true
brokers=["localhost:9094"]

[kafka.client]
timeout="3s"

[kafka.consumers.c1]
topic="sre-infra-test"
groupID="group-1"
# 指定 Consumer Group 未曾提交过 offset 时从何处开始消费
startOffset = -1
# 默认为同步提交,可以配置自动批量提交间隔来提高性能
commitInterval = "1s"

[kafka.consumerGroups.cg1]
topic="sre-infra-test"
groupID="group-1"
# 指定 Consumer Group 未曾提交过 offset 时从何处开始消费
startOffset = -1
# 默认为同步提交,可以配置自动批量提交间隔来提高性能
commitInterval = "1s"

[kafkaConsumerServers.s1]
debug=true
# 使用 ekafka 中注册的哪一个 Consumer,对应 `kafka.consumers.[name]` 配置项
consumerName="c1"
# 也可以配合 ConsumerGroup 使用
consumerGroupName="cg1"

StartOffset

Ref

配置值 说明
-1 LastOffset 从最新位置
-2 FirstOffset 从最旧位置 (default)
示例

逐条消费:

package main

import (
	"github.com/gotomicro/ego"
	"github.com/ego-component/ekafka"
	"github.com/ego-component/ekafka/consumerserver"
	"github.com/gotomicro/ego/core/elog"
	"github.com/gotomicro/ego/server/egovernor"
	"github.com/segmentio/kafka-go"
)

func main() {
	app := ego.New().Serve(
		// 可以搭配其他服务模块一起使用
		egovernor.Load("server.governor").Build(),

		// 初始化 Consumer Server
		func() *consumerserver.Component {
			// 依赖 `ekafka` 管理 Kafka consumer
			ec := ekafka.Load("kafka").Build()
			cs := consumerserver.Load("kafkaConsumerServers.s1").Build(
				consumerserver.WithEkafka(ec),
			)

			// 用来接收、处理 `kafka-go` 和处理消息的回调产生的错误
			consumptionErrors := make(chan error)

			// 注册处理消息的回调函数
			cs.OnEachMessage(consumptionErrors, func(ctx context.Context, message kafka.Message) error {
				elog.Infof("got a message: %s\n", string(message.Value))
				// 如果返回错误则会被转发给 `consumptionErrors`,默认出现任何错误都会导致消费终止、
				// ConsumerGroup 退出;但可以将错误标记为 Retryable 以实现重试,ConsumerGroup 最多重试 3 次
				// 如:
				// return fmt.Errorf("%w 写入数据库时发生错误", consumerserver.ErrRecoverableError)

				return nil
			})

			return cs
		}(),
		// 还可以启动多个 ConsumerServer
	)
	if err := app.Run(); err != nil {
		elog.Panic("startup", elog.Any("err", err))
	}
}

获取 Consumer 实例后手动消费:

package main

import (
	"github.com/gotomicro/ego"
	"github.com/ego-component/ekafka"
	"github.com/ego-component/ekafka/consumerserver"
	"github.com/gotomicro/ego/core/elog"
)

func main() {
	app := ego.New().Serve(
		// 可以搭配其他服务模块一起使用
		egovernor.Load("server.governor").Build(),

		// 初始化 ConsumerServer
		func() *consumerserver.Component {
			// 依赖 `ekafka` 管理 Kafka Consumer
			ec := ekafka.Load("kafka").Build()
			cs := consumerserver.Load("kafkaConsumerServers.s1").Build(
				consumerserver.WithEkafka(ec),
			)

			// 注册处理消息的回调函数
			cs.OnStart(func(ctx context.Context, consumer *ekafka.Consumer) error {
				// 编写自己的消费逻辑...

				return nil
			})

			return cs
		}(),
		// 还可以启动多个 ConsumerServer
	)
	if err := app.Run(); err != nil {
		elog.Panic("startup", elog.Any("err", err))
	}
}

获取 ConsumerGroup 实例后手动消费:

package main

import (
	"github.com/gotomicro/ego"
	"github.com/ego-component/ekafka"
	"github.com/ego-component/ekafka/consumerserver"
	"github.com/gotomicro/ego/core/elog"
)

func main() {
	app := ego.New().Serve(
		// 可以搭配其他服务模块一起使用
		egovernor.Load("server.governor").Build(),

		// 初始化 ConsumerServer
		func() *consumerserver.Component {
			// 依赖 `ekafka` 管理 Kafka ConsumerGroup
			ec := ekafka.Load("kafka").Build()
			cs := consumerserver.Load("kafkaConsumerServers.s1").Build(
				consumerserver.WithEkafka(ec),
			)

			// 注册处理消息的回调函数
			cs.OnConsumerGroupStart(func(ctx context.Context, consumerGroup *ekafka.ConsumerGroup) error {
				// 编写自己的消费逻辑...

				return nil
			})

			return cs
		}(),
		// 还可以启动多个 ConsumerServer
	)
	if err := app.Run(); err != nil {
		elog.Panic("startup", elog.Any("err", err))
	}
}

Producer

以下是简单使用 ekafka.Producer 的例子。

所需配置

[kafka]
	brokers=["localhost:9091","localhost:9092","localhost:9093"]
	[kafka.client]
		timeout="3s"
	[kafka.producers.p1]        # 定义了名字为 p1 的 producer
		topic="test_topic"        # 指定生产消息的 topic
package main

import (
	"context"
	"fmt"
	"log"
	"strings"

	"github.com/BurntSushi/toml"
	"github.com/ego-component/ekafka"
	"github.com/gotomicro/ego/core/econf"
)

// produce 生产消息
func main() {
	// 假设你配置的toml如下所示
	conf := `
[kafka]
	debug=true
	brokers=["localhost:9091","localhost:9092","localhost:9093"]
	[kafka.client]
		timeout="3s"
	[kafka.producers.p1]        # 定义了名字为 p1 的 producer
		topic="sre-infra-test"  # 指定生产消息的 topic
`
	// 加载配置文件
	err := econf.LoadFromReader(strings.NewReader(conf), toml.Unmarshal)
	if err != nil {
		panic("LoadFromReader fail," + err.Error())
	}

	// 初始化 ekafka 组件
	cmp := ekafka.Load("kafka").Build()

	// 使用 p1 生产者生产消息
	producerClient := cmp.Producer("p1")

	// 生产3条消息
	err = producerClient.WriteMessages(
		context.Background(),
		ekafka.Message{Value: []byte("Hello World!")},
		ekafka.Message{Value: []byte("One!")},
		ekafka.Message{Value: []byte("Two!")},
	)

	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	if err := producerClient.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}

	fmt.Println(`produce message success --------------->`)
}

TLS 配置

[kafka]
	debug=true
	brokers=["localhost:9091","localhost:9092","localhost:9093"]
    [kafka.authentication]
        [kafka.authentication.tls]
            enabled=false
            CAFile=""
            CertFile="./cert/tls.pem"
            KeyFile="./cert/tls.key"
            InsecureSkipVerify=true
	[kafka.client]
		timeout="3s"
	[kafka.producers.p1]        # 定义了名字为 p1 的 producer
		topic="sre-infra-test"  # 指定生产消息的 topic

Compression

以下是压缩的相关配置

[kafka]
	brokers=["localhost:9091","localhost:9092","localhost:9093"]
	[kafka.client]
		timeout="3s"
	[kafka.producers.p1]        # 定义了名字为 p1 的 producer
		topic="test_topic"        # 指定生产消息的 topic
		compression=1 #指定压缩类型

目前支持的compression如下:

配置值 说明
1 Gzip
2 Snappy
3 Lz4
4 Zstd

SASL Support

以下是相关配置样例

[kafka]
	debug=true
	brokers=["localhost:9091","localhost:9092","localhost:9093"]
	saslMechanism="PLAIN" #SASL Authentication Types
	saslUserName="username" #用户名
	saslPassword="password" #密码
	[kafka.client]
        timeout="3s"
	[kafka.producers.p1]
		topic="sre-infra-test"
	[kafka.consumers.c1]
		topic="sre-infra-test"
		groupID="group-1"

saslMechanism 相关配置枚举如下:

配置值 说明
PLAIN Plain
SCRAM-SHA-256 Scram
SCRAM-SHA-512 Scram

测试

E2E 测试

运行 E2E 测试需要准备 Kafka 环境,推荐 3 个 broker、每 topic 3 个 partition,否则有些测试会报错。

首先将 test/e2e/config/example.toml 复制为 test/e2e/config/e2e.toml 并按实际情况修改,该文件即是运行 E2E 测试的配置文件。

ekafka/ 目录下执行命令运行测试:

$ make test-e2e

Documentation

Index

Constants

View Source
const PackageName = "component.ekafka"

Variables

View Source
var (
	ErrRecoverableError = errors.New("recoverable error is retryable")
	ErrDoNotCommit      = errors.New("do not commit")
)

Functions

func DefaultConfig

func DefaultConfig() *config

DefaultConfig 返回默认配置

func NewMechanism added in v1.0.2

func NewMechanism(saslMechanism, saslUserName, saslPassword string) (sasl.Mechanism, error)

func Register added in v1.0.4

func Register(comp Compressor)

Types

type AssignedPartitions

type AssignedPartitions struct {
	Partitions []TopicPartition
}

type Authentication added in v1.0.2

type Authentication struct {
	// TLS authentication
	TLS *TLSConfig
}

func (*Authentication) ConfigureDialerAuthentication added in v1.0.2

func (config *Authentication) ConfigureDialerAuthentication(opts *kafka.Dialer) (err error)

func (*Authentication) ConfigureTransportAuthentication added in v1.0.2

func (config *Authentication) ConfigureTransportAuthentication(opts *kafka.Transport) (err error)

type Balancer

type Balancer = kafka.Balancer

type Client

type Client struct {
	// contains filtered or unexported fields
}

func (*Client) CreateTopics

func (wc *Client) CreateTopics(ctx context.Context, req *kafka.CreateTopicsRequest) (res *kafka.CreateTopicsResponse, err error)

func (*Client) DeleteTopics

func (wc *Client) DeleteTopics(ctx context.Context, req *kafka.DeleteTopicsRequest) (res *kafka.DeleteTopicsResponse, err error)

func (*Client) ListOffsets

func (wc *Client) ListOffsets(ctx context.Context, req *kafka.ListOffsetsRequest) (res *kafka.ListOffsetsResponse, err error)

func (*Client) Metadata

func (wc *Client) Metadata(ctx context.Context, req *kafka.MetadataRequest) (res *kafka.MetadataResponse, err error)

func (*Client) OffsetFetch

func (wc *Client) OffsetFetch(ctx context.Context, req *kafka.OffsetFetchRequest) (res *kafka.OffsetFetchResponse, err error)

type ClientInterceptor

type ClientInterceptor func(oldProcessFn clientProcessFn) (newProcessFn clientProcessFn)

func InterceptorClientChain

func InterceptorClientChain(interceptors ...ClientInterceptor) ClientInterceptor

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component kafka 组件,包含Client、Producers、Consumers

func (*Component) Client

func (cmp *Component) Client() *Client

Client 返回kafka Client

func (*Component) Consumer

func (cmp *Component) Consumer(name string) *Consumer

Consumer 返回指定名称的kafka Consumer

func (*Component) ConsumerGroup

func (cmp *Component) ConsumerGroup(name string) *ConsumerGroup

ConsumerGroup 返回指定名称的 ConsumerGroup

func (*Component) GetCompName

func (cmp *Component) GetCompName() string

func (*Component) Producer

func (cmp *Component) Producer(name string) *Producer

Producer 返回指定名称的kafka Producer

type Compressor added in v1.0.4

type Compressor interface {
	Compress(input []byte) (output []byte, err error)
	DeCompress(input []byte) (output []byte, err error)
	ContentEncoding() string
}

func GetCompressor added in v1.0.4

func GetCompressor(encoding string) Compressor

type Consumer

type Consumer struct {
	Config  consumerConfig
	Brokers []string `json:"brokers" toml:"brokers"`
	// contains filtered or unexported fields
}

Consumer 消费者/消费者组,

func (*Consumer) Close

func (r *Consumer) Close() error

func (*Consumer) CommitMessages

func (r *Consumer) CommitMessages(ctx context.Context, msgs ...*Message) (err error)

func (*Consumer) FetchMessage

func (r *Consumer) FetchMessage(ctx context.Context) (msg Message, ctxOutput context.Context, err error)

func (*Consumer) Lag

func (r *Consumer) Lag() int64

func (*Consumer) Offset

func (r *Consumer) Offset() int64

func (*Consumer) ReadLag

func (r *Consumer) ReadLag(ctx context.Context) (lag int64, err error)

func (*Consumer) ReadMessage

func (r *Consumer) ReadMessage(ctx context.Context) (msg Message, ctxOutput context.Context, err error)

func (*Consumer) SetOffset

func (r *Consumer) SetOffset(offset int64) (err error)

func (*Consumer) SetOffsetAt

func (r *Consumer) SetOffsetAt(ctx context.Context, t time.Time) (err error)

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

func NewConsumerGroup

func NewConsumerGroup(options ConsumerGroupOptions) (*ConsumerGroup, error)

func (*ConsumerGroup) Close

func (cg *ConsumerGroup) Close() error

func (*ConsumerGroup) CommitMessages

func (cg *ConsumerGroup) CommitMessages(ctx context.Context, messages ...Message) error

func (*ConsumerGroup) Poll

func (cg *ConsumerGroup) Poll(ctx context.Context) (msg interface{}, err error)

func (*ConsumerGroup) PollV2 added in v1.0.4

func (cg *ConsumerGroup) PollV2(ctx context.Context) (msg interface{}, err error)

type ConsumerGroupOptions

type ConsumerGroupOptions struct {
	Logger                 *elog.Component
	Brokers                []string
	GroupID                string
	Topic                  string
	HeartbeatInterval      time.Duration
	PartitionWatchInterval time.Duration
	WatchPartitionChanges  bool
	SessionTimeout         time.Duration
	RebalanceTimeout       time.Duration
	JoinGroupBackoff       time.Duration
	StartOffset            int64
	RetentionTime          time.Duration
	Timeout                time.Duration
	Reader                 readerOptions
	EnableAutoRun          bool

	SASLUserName  string
	SASLPassword  string
	SASLMechanism string
	// contains filtered or unexported fields
}

type Container

type Container struct {
	// contains filtered or unexported fields
}

func DefaultContainer

func DefaultContainer() *Container

DefaultContainer 返回默认Container

func Load

func Load(key string) *Container

Load 载入配置,初始化Container

func (*Container) Build

func (c *Container) Build(options ...Option) *Component

Build 构建Container

type CtxMessage added in v1.0.4

type CtxMessage = struct {
	*kafka.Message
	Ctx context.Context
}

type Message

type Message = kafka.Message

type Messages

type Messages []*Message

func (Messages) ToLog

func (m Messages) ToLog() []logMessage

func (Messages) ToNoPointer

func (m Messages) ToNoPointer() []Message

type Option

type Option func(c *Container)

func WithBrokers

func WithBrokers(brokers ...string) Option

WithBrokers 注入brokers配置

func WithClientInterceptor

func WithClientInterceptor(interceptors ...ClientInterceptor) Option

WithClientInterceptor 注入拦截器

func WithDebug

func WithDebug(debug bool) Option

WithDebug 注入Debug配置

func WithRegisterBalancer

func WithRegisterBalancer(balancerName string, balancer Balancer) Option

WithRegisterBalancer 注册名字为<balancerName>的balancer 注册之后可通过在producer配置文件中可通过<balancerName>来指定使用此balancer

func WithServerInterceptor

func WithServerInterceptor(interceptors ...ServerInterceptor) Option

WithServerInterceptor 注入拦截器

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) WriteMessages

func (p *Producer) WriteMessages(ctx context.Context, msgs ...*Message) error

type RevokedPartitions

type RevokedPartitions struct {
	Partitions []TopicPartition
}

type ServerInterceptor

type ServerInterceptor func(oldProcessFn serverProcessFn) (newProcessFn serverProcessFn)

func InterceptorServerChain

func InterceptorServerChain(interceptors ...ServerInterceptor) ServerInterceptor

type TLSConfig added in v1.0.2

type TLSConfig struct {
	// Enable TLS
	Enabled bool
	// Path to the CA cert. For a client this verifies the server certificate.
	CAFile string
	// Path to the TLS cert to use for TLS required connections. (optional)
	CertFile string
	// Path to the TLS key to use for TLS required connections. (optional)
	KeyFile string
	// InsecureSkipVerify will enable TLS but not verify the certificate
	InsecureSkipVerify bool
	// MinVersion sets the minimum TLS version that is acceptable.
	// If not set, TLS 1.2 will be used. (optional)
	MinVersion string
	// MaxVersion sets the maximum TLS version that is acceptable.
	// If not set, refer to crypto/tls for defaults. (optional)
	MaxVersion string
}

TLSConfig is the interface used to configure a tcp client or server from a `Config`

func (*TLSConfig) LoadTLSConfig added in v1.0.2

func (c *TLSConfig) LoadTLSConfig() (*tls.Config, error)

type TopicPartition

type TopicPartition struct {
	Topic     string
	Partition int
	Offset    int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL