-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpid.go
160 lines (126 loc) · 3.92 KB
/
pid.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package quacktors
import (
"fmt"
)
//The Pid struct acts as a reference to an Actor.
//It is completely location transparent, meaning it doesn't
//matter if the Pid is actually on another system. To the
//developer it will look like just another Actor they can
//send messages to.
type Pid struct {
MachineId string
Id string
quitChan chan<- bool
messageChan chan<- interface{}
monitorChan chan<- *Pid
demonitorChan chan<- *Pid
//Stores channels to scheduled tasks (monitors, SendAfter, monitors the actor itself launches but doesn't consume)
scheduled map[string]chan bool
//Stores channels to tell a monitor taks to quit (when a pid is demonitored)
monitorQuitChannels map[string]chan bool
}
func createPid(quitChan chan<- bool, messageChan chan<- interface{}, monitorChan chan<- *Pid, demonitorChan chan<- *Pid, scheduled map[string]chan bool, monitorQuitChannels map[string]chan bool) *Pid {
pid := &Pid{
MachineId: machineId,
Id: "",
quitChan: quitChan,
messageChan: messageChan,
monitorChan: monitorChan,
demonitorChan: demonitorChan,
scheduled: scheduled,
monitorQuitChannels: monitorQuitChannels,
}
registerPid(pid)
return pid
}
//Is compares two PIDs and returns true if their ID and MachineId are the same.
func (pid *Pid) Is(other *Pid) bool {
return pid.Id == other.Id && pid.MachineId == other.MachineId
}
func (pid *Pid) cleanup() {
logger.Debug("cleaning up pid",
"pid", pid.Id)
deletePid(pid.Id)
close(pid.quitChan)
pid.quitChan = nil
close(pid.messageChan)
pid.messageChan = nil
close(pid.monitorChan)
pid.monitorChan = nil
close(pid.demonitorChan)
pid.demonitorChan = nil
if len(pid.scheduled) != 0 {
//Terminate all scheduled events/send down message to monitor tasks
logger.Debug("sending out scheduled events after pid cleanup",
"pid", pid.Id)
for n, ch := range pid.scheduled {
//what if someone aborts the monitor while we attempt to write to it?
//this can never happen because all monitor and demonitor requests go
//through the actor which is currently being closed
ch <- true //this is blocking
close(ch)
delete(pid.scheduled, n)
}
}
if len(pid.monitorQuitChannels) != 0 {
logger.Debug("deleting monitor abort channels",
"pid", pid.Id)
//Delete monitorQuitChannels
for n, c := range pid.monitorQuitChannels {
close(c)
delete(pid.monitorQuitChannels, n)
}
}
pid.monitorQuitChannels = nil
}
func (pid *Pid) setupMonitor(monitor *Pid) {
//there used to be a mutex here but since all monitor and demonitor
//requests go through one actor, we can't run into a concurrent rw
name := monitor.String()
monitorChannel := make(chan bool)
pid.scheduled[name] = monitorChannel
monitorQuitChannel := make(chan bool)
pid.monitorQuitChannels[name] = monitorQuitChannel
go func() {
select {
case <-monitorQuitChannel:
return
case <-monitorChannel:
doSend(monitor, DownMessage{Who: pid}, nil)
}
}()
}
func (pid *Pid) removeMonitor(monitor *Pid) {
name := monitor.String()
pid.monitorQuitChannels[name] <- true
close(pid.monitorQuitChannels[name])
close(pid.scheduled[name])
delete(pid.monitorQuitChannels, name)
delete(pid.scheduled, name)
logger.Info("monitor removed successfully",
"monitored_pid", pid.Id,
"monitor_gpid", monitor.String())
}
func (pid *Pid) String() string {
return fmt.Sprintf("%s@%s", pid.Id, pid.MachineId)
}
//Type returns the Message type of the PID.
//Since PIDs can be sent around without any message wrapper,
//it needs to implement the Message interface (which is why
//Type is needed).
func (pid Pid) Type() string {
return "pid"
}
func (pid *Pid) die() {
defer func() {
if r := recover(); r != nil {
//This happens if we write to the quitChan while the actor is being closed
}
}()
logger.Debug("sending quit command to actor",
"pid", pid.Id)
if pid.quitChan == nil {
return
}
pid.quitChan <- true
}