This repository was archived by the owner on Dec 5, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathiterator_chunk.go
93 lines (80 loc) · 2.32 KB
/
iterator_chunk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package ftdc
import (
"context"
"io"
"github.com/deciduosity/birch"
"github.com/deciduosity/ftdc/util"
)
// ChunkIterator is a simple iterator for reading off of an FTDC data
// source (e.g. file). The iterator processes chunks batches of
// metrics lazily, reading form the io.Reader every time the iterator
// is advanced.
//
// Use the iterator as follows:
//
// iter := ReadChunks(ctx, file)
//
// for iter.Next() {
// chunk := iter.Chunk()
//
// // <manipulate chunk>
//
// }
//
// if err := iter.Err(); err != nil {
// return err
// }
//
// You MUST call the Chunk() method no more than once per iteration.
//
// You shoule check the Err() method when iterator is complete to see
// if there were any issues encountered when decoding chunks.
type ChunkIterator struct {
pipe chan *Chunk
next *Chunk
cancel context.CancelFunc
closed bool
catcher util.Catcher
}
// ReadChunks creates a ChunkIterator from an underlying FTDC data
// source.
func ReadChunks(ctx context.Context, r io.Reader) *ChunkIterator {
iter := &ChunkIterator{
catcher: util.NewCatcher(),
pipe: make(chan *Chunk, 2),
}
ipc := make(chan *birch.Document)
ctx, iter.cancel = context.WithCancel(ctx)
go func() {
iter.catcher.Add(readDiagnostic(ctx, r, ipc))
}()
go func() {
iter.catcher.Add(readChunks(ctx, ipc, iter.pipe))
}()
return iter
}
// Next advances the iterator and returns true if the iterator has a
// chunk that is unprocessed. Use the Chunk() method to access the
// iterator.
func (iter *ChunkIterator) Next() bool {
next, ok := <-iter.pipe
if !ok {
return false
}
iter.next = next
return true
}
// Chunk returns a copy of the chunk processed by the iterator. You
// must call Chunk no more than once per iteration. Additional
// accesses to Chunk will panic.
func (iter *ChunkIterator) Chunk() *Chunk {
return iter.next
}
// Close releases resources of the iterator. Use this method to
// release those resources if you stop iterating before the iterator
// is exhausted. Canceling the context that you used to create the
// iterator has the same effect.
func (iter *ChunkIterator) Close() { iter.cancel(); iter.closed = true }
// Err returns a non-nil error if the iterator encountered any errors
// during iteration.
func (iter *ChunkIterator) Err() error { return iter.catcher.Resolve() }