Documentation ¶
Overview ¶
depdup: A Streaming Deduplication package
This package implements streaming deduplication, allowing you to remove duplicated data in streams. It implements variable block sizes and automatic content block adaptation. It has a fully streaming mode and an indexed mode, that has significantly reduced memory requirements.
Read for an introduction to deduplication: https://blog.klauspost.com/fast-stream-deduplication-in-go
Package home: https://github.com/klauspost/dedup
Index ¶
Examples ¶
Constants ¶
const ( // Fixed block size // // This is by far the fastest mode, and checks for duplicates // In fixed block sizes. // It can be helpful to use the "Split" function to reset offset, which // will reset duplication search at the position you are at. ModeFixed Mode = 0 // Dynamic block size. // // This mode will create a deduplicator that will split the contents written // to it into dynamically sized blocks. // The size given indicates the maximum block size. Average size is usually maxSize/4. // Minimum block size is maxSize/64. ModeDynamic = 1 // Dynamic block size. // // This mode will create a deduplicator that will split the contents written // to it into dynamically sized blocks. // The size given indicates the maximum block size. Average size is usually maxSize/4. // Minimum block size is maxSize/64. ModeDynamicEntropy = 2 )
const HashSize = hasher.Size
Size of the underlying hash in bytes for those interested.
const MinBlockSize = 512
The smallest "maximum" block size allowed.
Variables ¶
var ErrMaxMemoryTooSmall = errors.New("there must be at be space for 1 block")
ErrMaxMemoryTooSmall is returned if the encoder isn't allowed to store even 1 block.
var ErrSizeTooSmall = errors.New("maximum block size too small. must be at least 512 bytes")
ErrSizeTooSmall is returned if the requested block size is smaller than hash size.
var ErrUnknownFormat = errors.New("unknown index format")
Functions ¶
func BirthdayProblem ¶
Returns an approximate Birthday probability calculation based on the number of blocks given and the hash size.
It uses the simplified calculation: p = k(k-1) / (2N)
From http://preshing.com/20110504/hash-collision-probabilities/
Example ¶
This shows an example of a birthday problem calculation. We calculate the probability of a collision of SHA1 hashes on 1 Terabyte data, using 1 Kilobyte blocks. With SHA-1, that gives a 1 in 2535301202817642046627252275200 chance of a collision occurring.
package main import ( "fmt" "github.com/klauspost/dedup" ) func main() { fmt.Println("Hash size is", dedup.HashSize*8, "bits") fmt.Println("1TiB, 1KiB blocks:") fmt.Println(dedup.BirthdayProblem((1 << 40) / (1 << 10))) }
Output: Hash size is 160 bits 1TiB, 1KiB blocks: Collision probability is ~ 1/2535301202817642046627252275200 ~ 3.944304522431639e-31
Types ¶
type Fragment ¶
type Fragment struct { Hash [HashSize]byte // Hash of the fragment Payload []byte // Data of the fragment. New bool // Will be true, if the data hasn't been encountered before. N uint // Sequencially incrementing number for each segment. }
Fragment is a file fragment. It is the data returned by the NewSplitter.
type IndexedReader ¶
type IndexedReader interface { Reader // Blocksizes will return the sizes of each block. // Will be available if an index was provided. BlockSizes() []int }
IndexedReader gives access to internal information on block sizes available on indexed streams.
func NewReader ¶
NewReader returns a reader that will decode the supplied index and data stream.
This is compatible content from the NewWriter function. The function will decode the index before returning.
When you are done with the Reader, use Close to release resources.
Example ¶
This will deduplicate a buffer of zeros to an indexed stream
package main import ( "bytes" "io" "fmt" "github.com/klauspost/dedup" ) func main() { // Create data we can read. var idx, data bytes.Buffer input := bytes.NewBuffer(make([]byte, 50000)) w, _ := dedup.NewWriter(&idx, &data, dedup.ModeFixed, 1000, 0) _, _ = io.Copy(w, input) _ = w.Close() // Create a new reader. r, err := dedup.NewReader(&idx, &data) if err != nil { panic(err) } // Inspect how much memory it will use. fmt.Println("Memory use:", r.MaxMem()) var dst bytes.Buffer // Read everything _, err = io.Copy(&dst, r) if err != nil && err != io.EOF { panic(err) } // Let us inspect what was written: fmt.Println("Returned data length:", dst.Len()) fmt.Println("Everything zero:", 0 == bytes.Compare(dst.Bytes(), make([]byte, 50000))) }
Output: Memory use: 1000 Returned data length: 50000 Everything zero: true
func NewSeekReader ¶
func NewSeekReader(index io.Reader, blocks io.ReadSeeker) (IndexedReader, error)
NewSeekRead returns a reader that will decode the supplied index and data stream.
This is compatible content from the NewWriter function.
No blocks will be kept in memory, but the block data input must be seekable. The function will decode the index before returning.
When you are done with the Reader, use Close to release resources.
type Reader ¶
type Reader interface { io.ReadCloser io.WriterTo // MaxMem returns the *maximum* memory required to decode the stream. MaxMem() int }
A Reader will decode a deduplicated stream and return the data as it was encoded. Use Close when done to release resources.
func NewStreamReader ¶
NewStreamReader returns a reader that will decode the supplied data stream.
This is compatible content from the NewStreamWriter function.
When you are done with the Reader, use Close to release resources.
Example ¶
This will deduplicate a buffer of zeros to an indexed stream
package main import ( "bytes" "io" "fmt" "github.com/klauspost/dedup" ) func main() { // Create data we can read. var data bytes.Buffer input := bytes.NewBuffer(make([]byte, 50000)) // Set the memory limit to 10000 bytes w, _ := dedup.NewStreamWriter(&data, dedup.ModeFixed, 1000, 10000) _, _ = io.Copy(w, input) _ = w.Close() // Create a new stream reader: r, err := dedup.NewStreamReader(&data) if err != nil { panic(err) } // Inspect how much memory it will use. // Since this is a stream, it will print the worst possible scenario fmt.Println("Memory use:", r.MaxMem()) var dst bytes.Buffer // Read everything _, err = io.Copy(&dst, r) if err != nil && err != io.EOF { panic(err) } // Let us inspect what was written: fmt.Println("Returned data length:", dst.Len()) fmt.Println("Everything zero:", 0 == bytes.Compare(dst.Bytes(), make([]byte, 50000))) }
Output: Memory use: 10000 Returned data length: 50000 Everything zero: true
type Writer ¶
type Writer interface { io.WriteCloser // Split content, so a new block begins with next write. Split() // MemUse returns an approximate maximum memory use in bytes for // encoder (Writer) and decoder (Reader) for the given number of bytes. MemUse(bytes int) (encoder, decoder int64) // Returns the current number of blocks. // Blocks may still be processing. Blocks() int }
func NewSplitter ¶
NewSplitter will return a writer you can write data to, and the file will be split into separate fragments.
You must supply a fragment channel, that will output fragments for the data you have written. The channel must accept data while you write to the spliter.
For each fragment the SHA-1 hash of the data section is returned, along with the raw data of this segment.
When you call Close on the returned Writer, the final fragments will be sent and the channel will be closed.
Example ¶
This will deduplicate a buffer of zeros, and return each block on a channel in order.
package main import ( "bytes" "fmt" "io" "github.com/klauspost/dedup" ) func main() { // We will write to this // We set a small buffer out := make(chan dedup.Fragment, 10) // This will consume our blocks as they are returned // and send information about what was received. info := make(chan int, 0) go func() { n := 0 size := 0 for f := range out { n++ if f.New { size += len(f.Payload) } } info <- n info <- size }() // This is our input: input := bytes.NewBuffer(make([]byte, 50050)) // Create a new writer, with each block being 1000 bytes, w, err := dedup.NewSplitter(out, dedup.ModeFixed, 1000) if err != nil { panic(err) } // Copy our input to the writer. io.Copy(w, input) // Close the writer err = w.Close() if err != nil { panic(err) } // Let us inspect what was written: fmt.Println("Blocks:", <-info) // Size of one (repeated) block + 50 bytes for last. fmt.Println("Data size:", <-info) }
Output: Blocks: 51 Data size: 1050
Example (Entropy) ¶
This will deduplicate a file and return each block on a channel in order.
package main import ( "encoding/hex" "fmt" "io" "os" "sync" "github.com/klauspost/dedup" ) func main() { // Our input f, _ := os.Open("testdata/sampledata.zip") defer f.Close() // We will receive fragments on this channel ch := make(chan dedup.Fragment, 10) var wg sync.WaitGroup wg.Add(1) // Start a goroutine that will consume the fragments go func() { defer wg.Done() for { select { case f, ok := <-ch: if !ok { return } if f.New { fmt.Printf("Got NEW fragment #%d, size %d, hash:%s\n", f.N, len(f.Payload), hex.EncodeToString(f.Hash[:])) // Insert payload into data store } else { fmt.Printf("Got OLD fragment #%d, size %d, hash:%s\n", f.N, len(f.Payload), hex.EncodeToString(f.Hash[:])) } // Add hash to list of hashes required to reconstruct the file. } } }() // Create a dynamic splitter with average size of 1024 bytes. w, _ := dedup.NewSplitter(ch, dedup.ModeDynamicEntropy, 4*1024) // Copy data to the splitter _, _ = io.Copy(w, f) // Flush the remaining fragments _ = w.Close() // Wait for input to be received. wg.Wait() }
Output: Got NEW fragment #0, size 521, hash:0c5989843e85f31aed26f249bd203240dd72f77a Got NEW fragment #1, size 1563, hash:308ff2e0b4776c2a08fe549422c7ebfbf646bb22 Got NEW fragment #2, size 919, hash:9d68759ef33ae919b656faf52bb1177e803f810b Got NEW fragment #3, size 1326, hash:c272c26dff010417ca2120a8e82addfdadb4efeb Got NEW fragment #4, size 1284, hash:9bbe891ccb1b141e0e122110e730e8df9743331e Got NEW fragment #5, size 1220, hash:5019f56fa9395060fbe2e957ad518a35cd667f9b Got NEW fragment #6, size 3509, hash:e0d7c8acfdd5b399a92b5e495a0794ffa842ee73 Got OLD fragment #7, size 919, hash:9d68759ef33ae919b656faf52bb1177e803f810b Got OLD fragment #8, size 1326, hash:c272c26dff010417ca2120a8e82addfdadb4efeb Got OLD fragment #9, size 1284, hash:9bbe891ccb1b141e0e122110e730e8df9743331e Got OLD fragment #10, size 1220, hash:5019f56fa9395060fbe2e957ad518a35cd667f9b Got NEW fragment #11, size 1569, hash:5ae2760535662c13b336d1ae4a0a7fdcba789d83
Example (File) ¶
This will deduplicate a file and return each block on a channel in order.
package main import ( "encoding/hex" "fmt" "io" "os" "sync" "github.com/klauspost/dedup" ) func main() { // Our input f, _ := os.Open("testdata/sampledata.zip") defer f.Close() // We will receive fragments on this channel ch := make(chan dedup.Fragment, 10) var wg sync.WaitGroup wg.Add(1) // Start a goroutine that will consume the fragments go func() { defer wg.Done() for { select { case f, ok := <-ch: if !ok { return } if f.New { fmt.Printf("Got NEW fragment #%d, size %d, hash:%s\n", f.N, len(f.Payload), hex.EncodeToString(f.Hash[:])) // Insert payload into data store } else { fmt.Printf("Got OLD fragment #%d, size %d, hash:%s\n", f.N, len(f.Payload), hex.EncodeToString(f.Hash[:])) } // Add hash to list of hashes required to reconstruct the file. } } }() // Create a dynamic splitter with average size of 1024 bytes. w, _ := dedup.NewSplitter(ch, dedup.ModeDynamic, 4*1024) // Copy data to the splitter _, _ = io.Copy(w, f) // Flush the remaining fragments _ = w.Close() // Wait for input to be received. wg.Wait() }
Output: Got NEW fragment #0, size 893, hash:7f8455127e82f90ea7e97716ccaefa9317279b4b Got NEW fragment #1, size 559, hash:b554708bbfda24f1eb8fcd75a155d23bd36939d3 Got NEW fragment #2, size 3482, hash:59bca870477e14e97ae8650e74ef52abcb6340e8 Got NEW fragment #3, size 165, hash:6fb05a63e28a1bb2e880e051940f517115e7b16c Got NEW fragment #4, size 852, hash:6671826ffff6edd32951a0e774efccb5101ba629 Got NEW fragment #5, size 3759, hash:0fae545a20195720d8e9bb9540069418d7db0873 Got OLD fragment #6, size 3482, hash:59bca870477e14e97ae8650e74ef52abcb6340e8 Got OLD fragment #7, size 165, hash:6fb05a63e28a1bb2e880e051940f517115e7b16c Got OLD fragment #8, size 852, hash:6671826ffff6edd32951a0e774efccb5101ba629 Got NEW fragment #9, size 2380, hash:1507aa13e215517ce982b9235a0221018128ed4e Got NEW fragment #10, size 71, hash:f262fcf4af26ee75ff3045db2af21f2acca235cd
func NewStreamWriter ¶
NewStreamWriter will create a deduplicator that will split the contents written to it into blocks and de-duplicate these.
The output is delivered as a single stream, and memory use will remain stable for both writing and reading the stream.
This function returns data that is compatible with the NewStreamReader function.
You can must set the maximum memory for the decoder to use. This limits the length a match can be made. If you use dynamic blocks, also note that the average size is 1/4th of the maximum block size.
The returned writer must be closed to flush the remaining data.
Example ¶
This will deduplicate a buffer of zeros to an non-indexed stream
package main import ( "bytes" "fmt" "io" "github.com/klauspost/dedup" ) func main() { // We will write to this data := bytes.Buffer{} // This is our input: input := bytes.NewBuffer(make([]byte, 50000)) // Create a new writer, with each block being 1000 bytes, // And allow it to use 10000 bytes of memory w, err := dedup.NewStreamWriter(&data, dedup.ModeFixed, 1000, 10000) if err != nil { panic(err) } // Copy our input to the writer. io.Copy(w, input) // Close the writer err = w.Close() if err != nil { panic(err) } // Let us inspect what was written: fmt.Println("Blocks:", w.Blocks()) fmt.Println("Data size:", data.Len()) }
Output: Blocks: 50 Data size: 1068
Example (File) ¶
This will deduplicate a buffer of zeros to an non-indexed stream written to a file. It is not recommended to use a single stream when you are writing to a stream.
package main import ( "bytes" "fmt" "io" "os" "github.com/klauspost/dedup" ) func main() { // We will write to this data, err := os.Create("outputstream.data") if err != nil { panic(err) } // Close, print stats and remove it defer func() { data.Close() stat, _ := os.Stat("outputstream.data") fmt.Println("Stream size:", stat.Size()) os.Remove("outputstream.data") }() // This is our input: input := bytes.NewBuffer(make([]byte, 500000)) // Create a new writer, with each block being 1000 bytes, // And allow it to use 10000 bytes of memory w, err := dedup.NewStreamWriter(data, dedup.ModeFixed, 1000, 10000) if err != nil { panic(err) } defer w.Close() // Copy our input to the writer. io.Copy(w, input) // Print the number of blocks written fmt.Println("Blocks:", w.Blocks()) }
Output: Blocks: 500 Stream size: 1518
func NewWriter ¶
func NewWriter(index io.Writer, blocks io.Writer, mode Mode, maxSize, maxMemory uint) (Writer, error)
NewWriter will create a deduplicator that will split the contents written to it into blocks and de-duplicate these.
The output is delivered as two streams, an index stream and a block stream.
The index stream will contain information about which blocks are deduplicated and the block stream will contain uncompressed data blocks.
You can set the maximum memory for the decoder to use. This limits the length a match can be made. This is very conservative, so you can set this at the absolute limit of memory available. If you use dynamic blocks, also note that the average size is 1/4th of the maximum block size. Set maxMemory to 0 to disable decoder memory limit.
This function returns data that is compatible with the NewReader function. The returned writer must be closed to flush the remaining data.
Example ¶
This will deduplicate a buffer of zeros to an indexed stream
package main import ( "bytes" "fmt" "io" "github.com/klauspost/dedup" ) func main() { // We will write to these idx := bytes.Buffer{} data := bytes.Buffer{} // This is our input: input := bytes.NewBuffer(make([]byte, 50000)) // Create a new writer, with each block being 1000 bytes w, err := dedup.NewWriter(&idx, &data, dedup.ModeFixed, 1000, 0) if err != nil { panic(err) } // Copy our input to the writer. io.Copy(w, input) // Close the writer err = w.Close() if err != nil { panic(err) } // Let us inspect what was written: fmt.Println("Blocks:", w.Blocks()) fmt.Println("Index size:", idx.Len()) fmt.Println("Data size:", data.Len()) }
Output: Blocks: 50 Index size: 67 Data size: 1000
Example (File) ¶
This example will show how to write data to two files. Running this example will deduplicate an empty byte slice of 500000 bytes into an 'output.data' and 'output.idx' file.
In the real world, you would likely want to add a bufio.NewWriter to the output, but to keep it simple, we don't do that here.
package main import ( "bytes" "fmt" "io" "os" "github.com/klauspost/dedup" ) func main() { data, err := os.Create("output.data") if err != nil { panic(err) } // Close, print stats and remove it defer func() { data.Close() stat, _ := os.Stat("output.data") fmt.Println("Data size:", stat.Size()) os.Remove("output.data") }() idx, err := os.Create("output.idx") if err != nil { panic(err) } // Close, print stats and remove it defer func() { idx.Close() stat, _ := os.Stat("output.idx") fmt.Println("Index size:", stat.Size()) os.Remove("output.idx") }() // This is our input: input := bytes.NewBuffer(make([]byte, 500000)) // Create a new writer, with each block being 1000 bytes fixed size. w, err := dedup.NewWriter(idx, data, dedup.ModeFixed, 1000, 0) if err != nil { panic(err) } defer w.Close() // Copy our input to the writer. io.Copy(w, input) // Print the number of blocks written fmt.Println("Blocks:", w.Blocks()) }
Output: Blocks: 500 Index size: 517 Data size: 1000