-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathparallel.go
131 lines (118 loc) · 2.92 KB
/
parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package parallel
import (
"sync"
"time"
)
// Parallel instance, which executes pipelines by parallel
type Parallel struct {
wg *sync.WaitGroup
pipes []*Pipeline
wgChild *sync.WaitGroup
children []*Parallel
exception *Handler
}
// NewParallel creates a new Parallel instance
func NewParallel() *Parallel {
res := new(Parallel)
res.wg = new(sync.WaitGroup)
res.wgChild = new(sync.WaitGroup)
res.pipes = make([]*Pipeline, 0, 10)
return res
}
// Except set the exception handling routine, when unexpected panic occur
// this routine will be executed.
func (p *Parallel) Except(f interface{}, args ...interface{}) *Handler {
h := NewHandler(f, args...)
p.exception = h
return h
}
// Register add a new pipeline with a single handler info parallel
func (p *Parallel) Register(f interface{}, args ...interface{}) *Handler {
return p.NewPipeline().Register(f, args...)
}
// NewPipeline create a new pipeline of parallel
func (p *Parallel) NewPipeline() *Pipeline {
pipe := NewPipeline()
p.Add(pipe)
return pipe
}
// Add add new pipelines to parallel
func (p *Parallel) Add(pipes ...*Pipeline) *Parallel {
p.wg.Add(len(pipes))
p.pipes = append(p.pipes, pipes...)
return p
}
// NewChild create a new child of p
func (p *Parallel) NewChild() *Parallel {
child := NewParallel()
child.exception = p.exception
p.AddChildren(child)
return child
}
// AddChildren add children to parallel to handle dependency
func (p *Parallel) AddChildren(children ...*Parallel) *Parallel {
p.wgChild.Add(len(children))
p.children = append(p.children, children...)
return p
}
// Run start up all the jobs
func (p *Parallel) Run() {
for _, child := range p.children {
// this func will never panic
go func(ch *Parallel) {
ch.Run()
p.wgChild.Done()
}(child)
}
p.wgChild.Wait()
p.do()
p.wg.Wait()
}
// Do just do it
func (p *Parallel) do() {
// if only one pipeline no need go routines
if len(p.pipes) == 1 {
p.secure(p.pipes[0])
return
}
for _, pipe := range p.pipes {
go p.secure(pipe)
}
}
// exec pipeline safely
func (p *Parallel) secure(pipe *Pipeline) {
defer func() {
err := recover()
if err != nil {
if err == ErrArgNotFunction || err == ErrInArgLenNotMatch || err == ErrOutArgLenNotMatch || err == ErrRecvArgTypeNotPtr || err == ErrRecvArgNil {
panic(err)
}
if p.exception != nil {
// deep copy Handler and args
exception := &Handler{
f: p.exception.f,
args: make([]interface{}, 0, len(p.exception.args)),
receivers: p.exception.receivers,
}
for _, arg := range p.exception.args {
exception.args = append(exception.args, arg)
}
exception.OnExcept(err)
}
}
p.wg.Done()
}()
pipe.Do()
}
// RunWithTimeOut start up all the jobs, and time out after d duration
func (p *Parallel) RunWithTimeOut(d time.Duration) {
success := make(chan struct{}, 1)
go func() {
p.Run()
success <- struct{}{}
}()
select {
case <-success:
case <-time.After(d):
}
}