README

dedup

A Streaming Deduplication package for Go

This package implements streaming deduplication, allowing you to remove duplicated data in streams. It implements variable block sizes and automatic content block adaptation. It has a fully streaming mode and an indexed mode, that has significantly reduced memory requirements.

For an introduction to deduplication read this blog post Fast Stream Deduplication in Go.

Package home: https://github.com/klauspost/dedup

Godoc: https://godoc.org/github.com/klauspost/dedup

Build Status GoDoc

Installation

To get the package use the standard:

go get -u github.com/klauspost/dedup

Usage

If you haven't already, you should read the Fast Stream Deduplication in Go blog post, since it will introduce different aspects and help you make choices for your setup.

There are two symmetric functions NewWriter/NewReader and NewStreamWriter/NewStreamReader`. The first pair creates an indexed stream, which will write the index and data to two separate streams. This allows to decode the deduplicated stream with much less memory. The second pair will write all data to a single stream. This allows for on-the-fly transfers, but will require more memory in the receiving end.

When you create a deduplicating stream, you can specify between fixed or dynamic block sizes. The dynamic blocks adapt block splits to the incoming content, but is slower than fixed size, and has to use more conservative memory estimations.

Here is an example of a full roundtrip with indexed streams. For more examples see the godoc examples.

package main

import (
  "bytes"
  "fmt"
  "io"

  "github.com/klauspost/dedup"
)

// This will deduplicate a buffer of zeros to an indexed stream
func main() {
	// We will write out deduplicated data to these
	idx := bytes.Buffer{}
	data := bytes.Buffer{}

	// This is our input:
	input := bytes.NewBuffer(make([]byte, 50000))

	// Create a new writer, with each block being 1000 bytes fixed size.
	w, err := dedup.NewWriter(&idx, &data, dedup.ModeFixed, 1000, 0)
	if err != nil {
		panic(err)
	}
	// Copy our input to the writer.
	io.Copy(w, input)

	// Close to flush the remaining buffers
	err = w.Close()
	if err != nil {
		panic(err)
	}

	// Create a new indexed stream reader:
	r, err := dedup.NewReader(&idx, &data)
	if err != nil {
		panic(err)
	}

	// Inspect how much memory it will use.
	fmt.Println("Memory use:", r.MaxMem())

	var dst bytes.Buffer

	// Read everything
	_, err = io.Copy(&dst, r)
	if err != nil && err != io.EOF {
		panic(err)
	}

	// Let us inspect what was written:
	fmt.Println("Returned data length:", dst.Len())
	fmt.Println("Everything zero:", 0 == bytes.Compare(dst.Bytes(), make([]byte, 50000)))
}

Note that there is no error resilience built in. If any data is corrupted in any way, it will probably not be detected, and there is no way to recover corrupted data. So if you are in an environment where that could occur, you should add additional checks to ensure that data is recoverable.

Input Splitting

If you want to simply split the input, that functionality is also exposed.

This can be useful in the case that you want to deduplicate to a your own key-value store. In this case, you simply feed the input to a NewSplitter. This will return the individual fragments along with a hash. This will allow you to store your files as a stream of hashes, and separate the data into your data store.

See the examples attached to the NewSplitter function on how to use this.

Hash collisions

The encoder uses SHA-1 to identify and "remember" unique blocks. No hash is secure from collisions, but SHA-1 offers 160 bits of entropy.

For example, the chance of a random hash collision to occur when encoding 1 TB data in 1KB blocks is 3.94×10^-31 : 1, or one in "2.5 thousand billion billion billion". This of course assumes a uniform hash distribution and no deliberate hash collision attacks.

If SHA-1 doesn't provide sufficient security, it has been made very easy for you to create a stronger version. It is possible for you to create a stronger version by simply changing the import:

import 	hasher "crypto/sha1"

You can use sha256, sha512 for stronger hashes, or md5 for a faster hash.

To help you calculate the birthday problem likelyhood with a given number of blocks, I have provided the BirthdayProblem function.

Why is this not compression?

Deduplication does the same as compression but on a higher level. Instead of looking for small matches, it attempts to find the "bigger" matches. It will attempt to match and eliminate blocks where all content matches.

This can be useful when backing up disk images or other content where you have duplicated files, etc.

Deduplication is a good step before compression. You will still be able to compress your data, since unique blocks are passed through as-is, in order and without any modification.

License

This code is published under an MIT license. See LICENSE file for more information.

Documentation

Overview

depdup: A Streaming Deduplication package

This package implements streaming deduplication, allowing you to remove duplicated data in streams. It implements variable block sizes and automatic content block adaptation. It has a fully streaming mode and an indexed mode, that has significantly reduced memory requirements.

Read for an introduction to deduplication: https://blog.klauspost.com/fast-stream-deduplication-in-go

Package home: https://github.com/klauspost/dedup

Godoc: https://godoc.org/github.com/klauspost/dedup

Index

Examples

Constants

View Source
const (
	// Fixed block size
	//
	// This is by far the fastest mode, and checks for duplicates
	// In fixed block sizes.
	// It can be helpful to use the "Split" function to reset offset, which
	// will reset duplication search at the position you are at.
	ModeFixed Mode = 0

	// Dynamic block size.
	//
	// This mode will create a deduplicator that will split the contents written
	// to it into dynamically sized blocks.
	// The size given indicates the maximum block size. Average size is usually maxSize/4.
	// Minimum block size is maxSize/64.
	ModeDynamic = 1

	// Dynamic block size.
	//
	// This mode will create a deduplicator that will split the contents written
	// to it into dynamically sized blocks.
	// The size given indicates the maximum block size. Average size is usually maxSize/4.
	// Minimum block size is maxSize/64.
	ModeDynamicEntropy = 2
)
View Source
const HashSize = hasher.Size

Size of the underlying hash in bytes for those interested.

View Source
const MinBlockSize = 512

The smallest "maximum" block size allowed.

Variables

View Source
var ErrMaxMemoryTooSmall = errors.New("there must be at be space for 1 block")

ErrMaxMemoryTooSmall is returned if the encoder isn't allowed to store even 1 block.

View Source
var ErrSizeTooSmall = errors.New("maximum block size too small. must be at least 512 bytes")

ErrSizeTooSmall is returned if the requested block size is smaller than hash size.

View Source
var ErrUnknownFormat = errors.New("unknown index format")

Functions

func BirthdayProblem

func BirthdayProblem(blocks int) string

Returns an approximate Birthday probability calculation based on the number of blocks given and the hash size.

It uses the simplified calculation: p = k(k-1) / (2N)

From http://preshing.com/20110504/hash-collision-probabilities/

Example

This shows an example of a birthday problem calculation. We calculate the probability of a collision of SHA1 hashes on 1 Terabyte data, using 1 Kilobyte blocks. With SHA-1, that gives a 1 in 2535301202817642046627252275200 chance of a collision occurring.

package main

import (
	"fmt"

	"github.com/klauspost/dedup"
)

func main() {
	fmt.Println("Hash size is", dedup.HashSize*8, "bits")
	fmt.Println("1TiB, 1KiB blocks:")
	fmt.Println(dedup.BirthdayProblem((1 << 40) / (1 << 10)))
}
Output:

Hash size is 160 bits
1TiB, 1KiB blocks:
Collision probability is ~ 1/2535301202817642046627252275200 ~ 3.944304522431639e-31

Types

type Fragment

type Fragment struct {
	Hash    [HashSize]byte // Hash of the fragment
	Payload []byte         // Data of the fragment.
	New     bool           // Will be true, if the data hasn't been encountered before.
	N       uint           // Sequencially incrementing number for each segment.
}

Fragment is a file fragment. It is the data returned by the NewSplitter.

type IndexedReader

type IndexedReader interface {
	Reader

	// Blocksizes will return the sizes of each block.
	// Will be available if an index was provided.
	BlockSizes() []int
}

IndexedReader gives access to internal information on block sizes available on indexed streams.

func NewReader

func NewReader(index io.Reader, blocks io.Reader) (IndexedReader, error)

NewReader returns a reader that will decode the supplied index and data stream.

This is compatible content from the NewWriter function. The function will decode the index before returning.

When you are done with the Reader, use Close to release resources.

Example

This will deduplicate a buffer of zeros to an indexed stream

package main

import (
	"bytes"
	"fmt"
	"io"

	"github.com/klauspost/dedup"
)

func main() {
	// Create data we can read.
	var idx, data bytes.Buffer
	input := bytes.NewBuffer(make([]byte, 50000))
	w, _ := dedup.NewWriter(&idx, &data, dedup.ModeFixed, 1000, 0)
	_, _ = io.Copy(w, input)
	_ = w.Close()

	// Create a new reader.
	r, err := dedup.NewReader(&idx, &data)
	if err != nil {
		panic(err)
	}

	// Inspect how much memory it will use.
	fmt.Println("Memory use:", r.MaxMem())

	var dst bytes.Buffer

	// Read everything
	_, err = io.Copy(&dst, r)
	if err != nil && err != io.EOF {
		panic(err)
	}

	// Let us inspect what was written:
	fmt.Println("Returned data length:", dst.Len())
	fmt.Println("Everything zero:", 0 == bytes.Compare(dst.Bytes(), make([]byte, 50000)))

}
Output:

Memory use: 1000
Returned data length: 50000
Everything zero: true

func NewSeekReader

func NewSeekReader(index io.Reader, blocks io.ReadSeeker) (IndexedReader, error)

NewSeekRead returns a reader that will decode the supplied index and data stream.

This is compatible content from the NewWriter function.

No blocks will be kept in memory, but the block data input must be seekable. The function will decode the index before returning.

When you are done with the Reader, use Close to release resources.

type Mode

type Mode int

Deduplication mode used to determine how input is split.

type Reader

type Reader interface {
	io.ReadCloser

	io.WriterTo

	// MaxMem returns the *maximum* memory required to decode the stream.
	MaxMem() int
}

A Reader will decode a deduplicated stream and return the data as it was encoded. Use Close when done to release resources.

func NewStreamReader

func NewStreamReader(in io.Reader) (Reader, error)

NewStreamReader returns a reader that will decode the supplied data stream.

This is compatible content from the NewStreamWriter function.

When you are done with the Reader, use Close to release resources.

Example

This will deduplicate a buffer of zeros to an indexed stream

package main

import (
	"bytes"
	"fmt"
	"io"

	"github.com/klauspost/dedup"
)

func main() {
	// Create data we can read.
	var data bytes.Buffer
	input := bytes.NewBuffer(make([]byte, 50000))
	// Set the memory limit to 10000 bytes
	w, _ := dedup.NewStreamWriter(&data, dedup.ModeFixed, 1000, 10000)
	_, _ = io.Copy(w, input)
	_ = w.Close()

	// Create a new stream reader:
	r, err := dedup.NewStreamReader(&data)
	if err != nil {
		panic(err)
	}

	// Inspect how much memory it will use.
	// Since this is a stream, it will print the worst possible scenario
	fmt.Println("Memory use:", r.MaxMem())

	var dst bytes.Buffer

	// Read everything
	_, err = io.Copy(&dst, r)
	if err != nil && err != io.EOF {
		panic(err)
	}

	// Let us inspect what was written:
	fmt.Println("Returned data length:", dst.Len())
	fmt.Println("Everything zero:", 0 == bytes.Compare(dst.Bytes(), make([]byte, 50000)))

}
Output:

Memory use: 10000
Returned data length: 50000
Everything zero: true

type Writer

type Writer interface {
	io.WriteCloser

	// Split content, so a new block begins with next write.
	Split()

	// MemUse returns an approximate maximum memory use in bytes for
	// encoder (Writer) and decoder (Reader) for the given number of bytes.
	MemUse(bytes int) (encoder, decoder int64)

	// Returns the current number of blocks.
	// Blocks may still be processing.
	Blocks() int
}

func NewSplitter

func NewSplitter(fragments chan<- Fragment, mode Mode, maxSize uint) (Writer, error)

NewSplitter will return a writer you can write data to, and the file will be split into separate fragments.

You must supply a fragment channel, that will output fragments for the data you have written. The channel must accept data while you write to the spliter.

For each fragment the SHA-1 hash of the data section is returned, along with the raw data of this segment.

When you call Close on the returned Writer, the final fragments will be sent and the channel will be closed.

Example

This will deduplicate a buffer of zeros, and return each block on a channel in order.

package main

import (
	"bytes"
	"fmt"
	"io"

	"github.com/klauspost/dedup"
)

func main() {
	// We will write to this
	// We set a small buffer
	out := make(chan dedup.Fragment, 10)

	// This will consume our blocks as they are returned
	// and send information about what was received.
	info := make(chan int, 0)
	go func() {
		n := 0
		size := 0
		for f := range out {
			n++
			if f.New {
				size += len(f.Payload)
			}
		}
		info <- n
		info <- size
	}()

	// This is our input:
	input := bytes.NewBuffer(make([]byte, 50050))

	// Create a new writer, with each block being 1000 bytes,
	w, err := dedup.NewSplitter(out, dedup.ModeFixed, 1000)
	if err != nil {
		panic(err)
	}
	// Copy our input to the writer.
	io.Copy(w, input)

	// Close the writer
	err = w.Close()
	if err != nil {
		panic(err)
	}

	// Let us inspect what was written:
	fmt.Println("Blocks:", <-info)
	// Size of one (repeated) block + 50 bytes for last.
	fmt.Println("Data size:", <-info)

}
Output:

Blocks: 51
Data size: 1050
Example (Entropy)

This will deduplicate a file and return each block on a channel in order.

package main

import (
	"encoding/hex"
	"fmt"
	"io"
	"os"
	"sync"

	"github.com/klauspost/dedup"
)

func main() {
	// Our input
	f, _ := os.Open("testdata/sampledata.zip")
	defer f.Close()

	// We will receive fragments on this channel
	ch := make(chan dedup.Fragment, 10)

	var wg sync.WaitGroup
	wg.Add(1)

	// Start a goroutine that will consume the fragments
	go func() {
		defer wg.Done()
		for {
			select {
			case f, ok := <-ch:
				if !ok {
					return
				}
				if f.New {
					fmt.Printf("Got NEW fragment #%d, size %d, hash:%s\n", f.N, len(f.Payload), hex.EncodeToString(f.Hash[:]))
					// Insert payload into data store
				} else {
					fmt.Printf("Got OLD fragment #%d, size %d, hash:%s\n", f.N, len(f.Payload), hex.EncodeToString(f.Hash[:]))
				}
				// Add hash to list of hashes required to reconstruct the file.
			}
		}
	}()

	// Create a dynamic splitter with average size of 1024 bytes.
	w, _ := dedup.NewSplitter(ch, dedup.ModeDynamicEntropy, 4*1024)

	// Copy data to the splitter
	_, _ = io.Copy(w, f)

	// Flush the remaining fragments
	_ = w.Close()

	// Wait for input to be received.
	wg.Wait()

}
Output:

Got NEW fragment #0, size 521, hash:0c5989843e85f31aed26f249bd203240dd72f77a
Got NEW fragment #1, size 1563, hash:308ff2e0b4776c2a08fe549422c7ebfbf646bb22
Got NEW fragment #2, size 919, hash:9d68759ef33ae919b656faf52bb1177e803f810b
Got NEW fragment #3, size 1326, hash:c272c26dff010417ca2120a8e82addfdadb4efeb
Got NEW fragment #4, size 1284, hash:9bbe891ccb1b141e0e122110e730e8df9743331e
Got NEW fragment #5, size 1220, hash:5019f56fa9395060fbe2e957ad518a35cd667f9b
Got NEW fragment #6, size 3509, hash:e0d7c8acfdd5b399a92b5e495a0794ffa842ee73
Got OLD fragment #7, size 919, hash:9d68759ef33ae919b656faf52bb1177e803f810b
Got OLD fragment #8, size 1326, hash:c272c26dff010417ca2120a8e82addfdadb4efeb
Got OLD fragment #9, size 1284, hash:9bbe891ccb1b141e0e122110e730e8df9743331e
Got OLD fragment #10, size 1220, hash:5019f56fa9395060fbe2e957ad518a35cd667f9b
Got NEW fragment #11, size 1569, hash:5ae2760535662c13b336d1ae4a0a7fdcba789d83
Example (File)

This will deduplicate a file and return each block on a channel in order.

package main

import (
	"encoding/hex"
	"fmt"
	"io"
	"os"
	"sync"

	"github.com/klauspost/dedup"
)

func main() {
	// Our input
	f, _ := os.Open("testdata/sampledata.zip")
	defer f.Close()

	// We will receive fragments on this channel
	ch := make(chan dedup.Fragment, 10)

	var wg sync.WaitGroup
	wg.Add(1)

	// Start a goroutine that will consume the fragments
	go func() {
		defer wg.Done()
		for {
			select {
			case f, ok := <-ch:
				if !ok {
					return
				}
				if f.New {
					fmt.Printf("Got NEW fragment #%d, size %d, hash:%s\n", f.N, len(f.Payload), hex.EncodeToString(f.Hash[:]))
					// Insert payload into data store
				} else {
					fmt.Printf("Got OLD fragment #%d, size %d, hash:%s\n", f.N, len(f.Payload), hex.EncodeToString(f.Hash[:]))
				}
				// Add hash to list of hashes required to reconstruct the file.
			}
		}
	}()

	// Create a dynamic splitter with average size of 1024 bytes.
	w, _ := dedup.NewSplitter(ch, dedup.ModeDynamic, 4*1024)

	// Copy data to the splitter
	_, _ = io.Copy(w, f)

	// Flush the remaining fragments
	_ = w.Close()

	// Wait for input to be received.
	wg.Wait()

}
Output:

Got NEW fragment #0, size 893, hash:7f8455127e82f90ea7e97716ccaefa9317279b4b
Got NEW fragment #1, size 559, hash:b554708bbfda24f1eb8fcd75a155d23bd36939d3
Got NEW fragment #2, size 3482, hash:59bca870477e14e97ae8650e74ef52abcb6340e8
Got NEW fragment #3, size 165, hash:6fb05a63e28a1bb2e880e051940f517115e7b16c
Got NEW fragment #4, size 852, hash:6671826ffff6edd32951a0e774efccb5101ba629
Got NEW fragment #5, size 3759, hash:0fae545a20195720d8e9bb9540069418d7db0873
Got OLD fragment #6, size 3482, hash:59bca870477e14e97ae8650e74ef52abcb6340e8
Got OLD fragment #7, size 165, hash:6fb05a63e28a1bb2e880e051940f517115e7b16c
Got OLD fragment #8, size 852, hash:6671826ffff6edd32951a0e774efccb5101ba629
Got NEW fragment #9, size 2380, hash:1507aa13e215517ce982b9235a0221018128ed4e
Got NEW fragment #10, size 71, hash:f262fcf4af26ee75ff3045db2af21f2acca235cd

func NewStreamWriter

func NewStreamWriter(out io.Writer, mode Mode, maxSize, maxMemory uint) (Writer, error)

NewStreamWriter will create a deduplicator that will split the contents written to it into blocks and de-duplicate these.

The output is delivered as a single stream, and memory use will remain stable for both writing and reading the stream.

This function returns data that is compatible with the NewStreamReader function.

You can must set the maximum memory for the decoder to use. This limits the length a match can be made. If you use dynamic blocks, also note that the average size is 1/4th of the maximum block size.

The returned writer must be closed to flush the remaining data.

Example

This will deduplicate a buffer of zeros to an non-indexed stream

package main

import (
	"bytes"
	"fmt"
	"io"

	"github.com/klauspost/dedup"
)

func main() {
	// We will write to this
	data := bytes.Buffer{}

	// This is our input:
	input := bytes.NewBuffer(make([]byte, 50000))

	// Create a new writer, with each block being 1000 bytes,
	// And allow it to use 10000 bytes of memory
	w, err := dedup.NewStreamWriter(&data, dedup.ModeFixed, 1000, 10000)
	if err != nil {
		panic(err)
	}
	// Copy our input to the writer.
	io.Copy(w, input)

	// Close the writer
	err = w.Close()
	if err != nil {
		panic(err)
	}

	// Let us inspect what was written:
	fmt.Println("Blocks:", w.Blocks())
	fmt.Println("Data size:", data.Len())

}
Output:

Blocks: 50
Data size: 1068
Example (File)

This will deduplicate a buffer of zeros to an non-indexed stream written to a file. It is not recommended to use a single stream when you are writing to a stream.

package main

import (
	"bytes"
	"fmt"
	"io"
	"os"

	"github.com/klauspost/dedup"
)

func main() {
	// We will write to this
	data, err := os.Create("outputstream.data")
	if err != nil {
		panic(err)
	}
	// Close, print stats and remove it
	defer func() {
		data.Close()
		stat, _ := os.Stat("outputstream.data")
		fmt.Println("Stream size:", stat.Size())
		os.Remove("outputstream.data")
	}()

	// This is our input:
	input := bytes.NewBuffer(make([]byte, 500000))

	// Create a new writer, with each block being 1000 bytes,
	// And allow it to use 10000 bytes of memory
	w, err := dedup.NewStreamWriter(data, dedup.ModeFixed, 1000, 10000)
	if err != nil {
		panic(err)
	}
	defer w.Close()

	// Copy our input to the writer.
	io.Copy(w, input)

	// Print the number of blocks written
	fmt.Println("Blocks:", w.Blocks())

}
Output:

Blocks: 500
Stream size: 1518

func NewWriter

func NewWriter(index io.Writer, blocks io.Writer, mode Mode, maxSize, maxMemory uint) (Writer, error)

NewWriter will create a deduplicator that will split the contents written to it into blocks and de-duplicate these.

The output is delivered as two streams, an index stream and a block stream.

The index stream will contain information about which blocks are deduplicated and the block stream will contain uncompressed data blocks.

You can set the maximum memory for the decoder to use. This limits the length a match can be made. This is very conservative, so you can set this at the absolute limit of memory available. If you use dynamic blocks, also note that the average size is 1/4th of the maximum block size. Set maxMemory to 0 to disable decoder memory limit.

This function returns data that is compatible with the NewReader function. The returned writer must be closed to flush the remaining data.

Example

This will deduplicate a buffer of zeros to an indexed stream

package main

import (
	"bytes"
	"fmt"
	"io"

	"github.com/klauspost/dedup"
)

func main() {
	// We will write to these
	idx := bytes.Buffer{}
	data := bytes.Buffer{}

	// This is our input:
	input := bytes.NewBuffer(make([]byte, 50000))

	// Create a new writer, with each block being 1000 bytes
	w, err := dedup.NewWriter(&idx, &data, dedup.ModeFixed, 1000, 0)
	if err != nil {
		panic(err)
	}

	// Copy our input to the writer.
	io.Copy(w, input)

	// Close the writer
	err = w.Close()
	if err != nil {
		panic(err)
	}

	// Let us inspect what was written:
	fmt.Println("Blocks:", w.Blocks())
	fmt.Println("Index size:", idx.Len())
	fmt.Println("Data size:", data.Len())

}
Output:

Blocks: 50
Index size: 67
Data size: 1000
Example (File)

This example will show how to write data to two files. Running this example will deduplicate an empty byte slice of 500000 bytes into an 'output.data' and 'output.idx' file.

In the real world, you would likely want to add a bufio.NewWriter to the output, but to keep it simple, we don't do that here.

package main

import (
	"bytes"
	"fmt"
	"io"
	"os"

	"github.com/klauspost/dedup"
)

func main() {
	data, err := os.Create("output.data")
	if err != nil {
		panic(err)
	}
	// Close, print stats and remove it
	defer func() {
		data.Close()
		stat, _ := os.Stat("output.data")
		fmt.Println("Data size:", stat.Size())
		os.Remove("output.data")
	}()

	idx, err := os.Create("output.idx")
	if err != nil {
		panic(err)
	}
	// Close, print stats and remove it
	defer func() {
		idx.Close()
		stat, _ := os.Stat("output.idx")
		fmt.Println("Index size:", stat.Size())
		os.Remove("output.idx")
	}()

	// This is our input:
	input := bytes.NewBuffer(make([]byte, 500000))

	// Create a new writer, with each block being 1000 bytes fixed size.
	w, err := dedup.NewWriter(idx, data, dedup.ModeFixed, 1000, 0)
	if err != nil {
		panic(err)
	}
	defer w.Close()

	// Copy our input to the writer.
	io.Copy(w, input)

	// Print the number of blocks written
	fmt.Println("Blocks:", w.Blocks())

}
Output:

Blocks: 500
Index size: 517
Data size: 1000

Directories

Path Synopsis