data

package
v0.0.0-...-517a707 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2024 License: MIT Imports: 20 Imported by: 0

README

Data

Documentation

Index

Constants

This section is empty.

Variables

ProviderSet is data providers.

Functions

func NewDB

func NewDB(conf *conf.Data, logger log.Logger) *gorm.DB

func NewKafkaManager

func NewKafkaManager(d *conf.Data, logger log.Logger) (*biz.KafkaManager, error)

func NewKafkaReader

func NewKafkaReader(addr, topic, groupId string) *biz.KafkaReader

func NewKafkaSender

func NewKafkaSender(addr, topic string) (*biz.KafkaSender, error)

func NewLianjiaRepo

func NewLianjiaRepo(data *Data, logger log.Logger) biz.LianjiaRepo

NewLianjiaRepo .

func NewRedis

func NewRedis(c *conf.Data, logger log.Logger) (redisConn *redis.Client, err error)

func NewTransaction

func NewTransaction(d *Data) biz.Transaction

Types

type BaseModel

type BaseModel struct {
	ID         int64      `gorm:"primary_key;type:bigint;" json:"id"`
	CreateTime *time.Time `gorm:"column:create_time;type:datetime not null;default:CURRENT_TIMESTAMP();comment:创建时间" json:"created_at"`
	CrawlDate  *time.Time `gorm:"column:crawl_date;type:date;default:(CURRENT_DATE);comment:爬取日期" json:"crawl_date"`
}

type ChanQueue

type ChanQueue struct {
	ErshoufangChan chan []biz.Message
	LoupanChan     chan []biz.Message
	CommercialChan chan []biz.Message
	ZufangChan     chan []biz.Message
	// contains filtered or unexported fields
}

func NewChannelQueue

func NewChannelQueue(logger log.Logger) *ChanQueue

func (*ChanQueue) Receive

func (c *ChanQueue) Receive(ctx context.Context) error

func (*ChanQueue) Send

func (c *ChanQueue) Send(ctx context.Context, msg []biz.Message, houseType biz.HoueseType) error

type Data

type Data struct {
	// contains filtered or unexported fields
}

Data .

func NewData

func NewData(c *conf.Data, logger log.Logger, db *gorm.DB, km *biz.KafkaManager, q Queue) (*Data, func(), error)

NewData .

func (*Data) DB

func (d *Data) DB(ctx context.Context) *gorm.DB

func (*Data) Tx

func (d *Data) Tx(ctx context.Context, fn func(ctx context.Context) error) error

type KafkaQueue

type KafkaQueue struct {
	// contains filtered or unexported fields
}

func NewKafkaQueue

func NewKafkaQueue(km *biz.KafkaManager, logger log.Logger) *KafkaQueue

func (*KafkaQueue) Receive

func (kq *KafkaQueue) Receive(ctx context.Context) error

func (*KafkaQueue) Send

func (kq *KafkaQueue) Send(ctx context.Context, msgs []biz.Message, houseType biz.HoueseType) error

type KafkaService

type KafkaService interface {
	SendMessage(topic, key string, value []byte) error
	ReceiveMessages(ctx context.Context, topic, groupID string, handler func(key, value []byte) error)
}

KafkaService 接口定义

func NewKafkaService

func NewKafkaService(brokers []string) KafkaService

NewKafkaService 创建 KafkaService 的实例

type KafkaServiceImpl

type KafkaServiceImpl struct {
	// contains filtered or unexported fields
}

KafkaServiceImpl 结构体实现 KafkaService 接口

func (*KafkaServiceImpl) ReceiveMessages

func (ks *KafkaServiceImpl) ReceiveMessages(ctx context.Context, topic, groupID string, handler func(key, value []byte) error)

ReceiveMessages 实现 KafkaService 接口的 ReceiveMessages 方法

func (*KafkaServiceImpl) SendMessage

func (ks *KafkaServiceImpl) SendMessage(topic, key string, value []byte) error

SendMessage 实现 KafkaService 接口的 SendMessage 方法

type Queue

type Queue interface {
	Send(ctx context.Context, msg []biz.Message, houseType biz.HoueseType) error
	Receive(ctx context.Context) error
}

func NewQueue

func NewQueue(c *conf.Data, logger log.Logger, kafkaQueue *KafkaQueue, chanQueue *ChanQueue) (q Queue, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL