shard

package module
v0.0.0-...-0597b97 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

README

shard

shard is a lightweight, easy-to-use MapReduce framework for Go. It provides a simple and flexible way to write and run distributed computations on a cluster of machines.

Features

  • Simple API: shard provides a simple and intuitive API for writing MapReduce programs.
  • Pluggable Components: shard allows you to bring your own Mapper, Reducer, Combiner, Partitioner, and Filesystem implementations.
  • Master-Worker Architecture: shard uses a master-worker architecture to distribute and manage tasks.
  • gRPC for Communication: shard uses gRPC for efficient and reliable communication between the master and worker nodes.

Installation

To install shard, use go get:

go get github.com/prxssh/shard

Configuration

shard can be configured using environment variables or through the Config struct.

Environment Variable Config Field Description Default
SHARD_MODE - The mode to run in (master or worker). master
SHARD_MASTER_ADDR MasterAddress The address of the master node. localhost:6969
- InputPath The path to the input file or directory. -
- OutputDir The path to the output directory. ./shard
- NumReducers The number of reduce tasks. 16
- ChunkSize The size of each input split. 64MB
- MaxConcurrency The maximum number of concurrent tasks. runtime.NumCPU() * 2

Check the config.go for complete configuration.

Usage

[!WARNING] This project is written just for learning purposes and breaking changes are to be expected.

Here is an example of how to use shard to implement a word count program:

package main

import (
	"fmt"
	"strconv"
	"strings"

	"github.com/prxssh/shard"
	"github.com/prxssh/shard/api"
	"github.com/prxssh/shard/pkg/filesystem"
)

func main() {
	// Create a new shard config.
	cfg, err := shard.NewConfig(
		shard.WithInputPath("input.txt"),
		shard.WithMapper(Map),
		shard.WithReducer(Reduce),
		shard.WithFilesystem(filesystem.NewLocal()),
	)
	if err != nil {
		panic(err)
	}

	// Run the shard job.
	if err := shard.Run(cfg); err != nil {
		panic(err)
	}
}

// Map is a mapper that emits a count for each word.
func Map(key, value string, emit api.Emitter) error {
	words := strings.Fields(value)
	for _, word := range words {
		if err := emit(word, "1"); err != nil {
			return err
		}
	}
	return nil
}

// Reduce is a reducer that sums the counts for each word.
func Reduce(key string, values api.Iterator, emit api.Emitter) error {
	count := 0
	for {
		_, ok := values.Next()
		if !ok {
			break
		}
		count++
	}

	return emit(key, strconv.Itoa(count))
}

Development

Information for developers, including how to run tests and generate protobuf files.

Running Tests

To run the tests, use the following command:

make test
Generating Protobuf Files

To generate the protobuf files, use the following command:

make gen-proto FILE=path/to/file.proto

License

This project is licensed under the MIT License. See the LICENSE file for details.

Documentation

Overview

Copied from: https://github.com/ThreeDotsLabs/watermill/blob/master/slog.go

Index

Constants

View Source
const (
	DefaultMasterAddress = "localhost:6969"
	DefaultOutputDir     = "./shard"
	DefaultNumReducers   = 16
	DefaultChunkSize     = 64 * 1024 * 1024 // 64MB
)
View Source
const LevelTrace = slog.LevelDebug - 4

LevelTrace must be added, because slog package does not have one by default. Generate it by subtracting 4 levels from slog.Debug following the example of slog.LevelWarn and slog.LevelError which are set to 4 and 8.

Variables

View Source
var DefaultMaxConcurrency = runtime.NumCPU() * 2

DefaultMaxConcurrency is the maximum number of tasks that a worker can execute concurrently.

Functions

func NewSlogLogger

func NewSlogLogger(logger *slog.Logger) api.LoggerAdapter

NewSlogLogger creates an adapter to the standard library's structured logging package. A `nil` logger is substituted for the result of slog.Default.

func NewSlogLoggerWithLevelMapping

func NewSlogLoggerWithLevelMapping(
	logger *slog.Logger,
	shardLevelToSlog map[slog.Level]slog.Level,
) api.LoggerAdapter

NewSlogLoggerWithLevelMapping creates an adapter to the standard library's structured logging package. A `nil` logger is substituted for the result of slog.Default.

The `shardLevelToSlog` parameter is a map that maps Watermill's log levels to the levels of the structured logger. It's helpful, when want to for example log Watermill's info logs as debug in slog.

func Run

func Run(cfg *Config) error

Types

type Config

type Config struct {
	// MasterAddress is the RPC address (host:port) of the coordinator service
	// where worker will register.
	//
	// If not specified, it defaults to DefaultMasterAddress.
	MasterAddress string

	// InputPath specifies the file or directory pattern (glob) to be processed
	// by the Map phase.
	InputPath string

	// OutputDir is the directory where intermediate files and final Reduce
	// outputs will be stored.
	//
	// If not specified, it defaults to DefaultOutputDir.
	OutputDir string

	// NumReducers is the number of reduce tasks (R). This determines the
	// number of output partitions.
	//
	// If not specified, it default to DefaultNumReducers.
	NumReducers int

	// ChunkSize is the maximum size in bytes for a single input split given
	// to a Mapper.
	//
	// If not specified, it defaults to DefaultChunkSize.
	ChunkSize int64

	// MaxConcurrency limits the number of concurrent tasks that can be
	// processed by a single worker.
	//
	// If not specified, it defaults to DefaultMaxConcurrency.
	MaxConcurrency int

	// Mapper is the client provided implementation of the Map function.
	Mapper api.Mapper

	// Reducer is the client provided implementation of the Reduce function.
	Reducer api.Reducer

	// Combiner is the client provided implementation of the Combine function.
	Combiner api.Combiner

	// Partitioner determines which reducer handles a specific key. If nil,
	// a default hash-based partitioner is typically applied.
	Partitioner api.Partitioner

	// Filesystem handles the abstraction of reading and writing files (e.g.,
	// wrapping local disk IO or cloud storage calls).
	Filesystem api.Filesystem

	// Logger is an interface that the logger (e.g., slog, zlog) should satisfy.
	Logger api.LoggerAdapter
	// contains filtered or unexported fields
}

Config holds the runtime configuration for the Shard library. It defines infrastructure settings and the core processing logic.

func NewConfig

func NewConfig(opts ...Option) (*Config, error)

type Option

type Option func(*Config)

func WithCombiner

func WithCombiner(combiner api.Combiner) Option

func WithFilesystem

func WithFilesystem(storer api.Filesystem) Option

func WithInputPath

func WithInputPath(path string) Option

func WithLogger

func WithLogger(logger api.LoggerAdapter) Option

func WithMapSplitSize

func WithMapSplitSize(size int64) Option

func WithMapper

func WithMapper(mapper api.Mapper) Option

func WithMasterAddress

func WithMasterAddress(addr string) Option

func WithMaxConcurrency

func WithMaxConcurrency(limit int) Option

func WithNumReducers

func WithNumReducers(partitions int) Option

func WithOutputDir

func WithOutputDir(path string) Option

func WithPartitioner

func WithPartitioner(partitioner api.Partitioner) Option

func WithReducer

func WithReducer(reducer api.Reducer) Option

type SlogLoggerAdapter

type SlogLoggerAdapter struct {
	// contains filtered or unexported fields
}

SlogLoggerAdapter wraps slog.Logger.

func (*SlogLoggerAdapter) Debug

func (s *SlogLoggerAdapter) Debug(msg string, fields api.LogFields)

Debug logs a message to slog.LevelDebug.

func (*SlogLoggerAdapter) Error

func (s *SlogLoggerAdapter) Error(msg string, err error, fields api.LogFields)

Error logs a message to slog.LevelError.

func (*SlogLoggerAdapter) Info

func (s *SlogLoggerAdapter) Info(msg string, fields api.LogFields)

Info logs a message to slog.LevelInfo.

func (*SlogLoggerAdapter) Trace

func (s *SlogLoggerAdapter) Trace(msg string, fields api.LogFields)

Trace logs a message to LevelTrace.

func (*SlogLoggerAdapter) With

With return a SlogLoggerAdapter with a set of fields injected into all consequent logging messages.

Directories

Path Synopsis
Copied from: https://github.com/ThreeDotsLabs/watermill/blob/master/log.go
Copied from: https://github.com/ThreeDotsLabs/watermill/blob/master/log.go
internal
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL