-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.go
219 lines (195 loc) · 5.32 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package LFQueue
import (
"reflect"
"runtime"
"sync/atomic"
)
const (
Success = 0
QueueIsFull = 100
QueueIsEmpty = 200
NeedArrayOrSlice = 300
)
type QueError struct {
StatusCode int
}
func (q *QueError) Error() string {
switch q.StatusCode {
case QueueIsFull:
return "queue is full"
case QueueIsEmpty:
return "queue is empty"
case NeedArrayOrSlice:
return "param must be array or slice"
}
return "no err"
}
type LFNode struct {
value interface{}
}
type LFQueue struct {
capacity uint64 // 容量 大小取为2^n, 方便位运算
endIndex uint64 // 数组结束索引
writeCursor uint64 // 写游标
readCursor uint64 // 读游标
ringBuffer []LFNode // 数据组
availableBuffer []int64 // 标记组,默认值-1
}
// 返回值:数据,错误
func (q *LFQueue) Pop() (interface{}, *QueError) {
_, next, err := q.getReadNext(1)
if err != nil {
return nil, err
}
atomic.SwapInt64(&q.availableBuffer[next&(q.endIndex)], -1)
return q.ringBuffer[next&(q.endIndex)].value, nil
}
// 返回值:结束游标,错误
func (q *LFQueue) Push(value interface{}) *QueError {
next, err := q.getWriteNext(1)
if err != nil {
return err
}
atomic.SwapInt64(&q.availableBuffer[next&(q.endIndex)], 1)
q.ringBuffer[next&(q.endIndex)] = LFNode{value: value}
return nil
}
// 返回值: 数据,错误
func (q *LFQueue) PopMore(n uint64) ([]interface{}, *QueError) {
current, next, err := q.getReadNext(n)
if err != nil {
return nil, err
}
values := make([]interface{}, int(next-current))
current++ // 游标后移一位开始读数据
for index, _ := range values {
i := current + uint64(index)
values[index] = q.ringBuffer[i&(q.endIndex)]
q.availableBuffer[i&(q.endIndex)] = -1
}
return values, nil
}
// 返回值: 结束游标,错误
func (q *LFQueue) PushMore(in interface{}) *QueError {
values := reflect.ValueOf(in)
if values.Kind() != reflect.Array && values.Kind() != reflect.Slice {
return &QueError{StatusCode: NeedArrayOrSlice}
}
var n uint64
n = uint64(values.Len())
next, err := q.getWriteNext(n)
if err != nil {
return err
}
current := next - (n - 1)
// 向申请到的区间写入数据
num := values.Len()
for i := 0; i < num; i++ {
q.availableBuffer[current&(q.endIndex)] = 1
q.ringBuffer[current&(q.endIndex)] = LFNode{value: values.Index(i).Interface()}
current++
}
return nil
}
/*
获取下一个写入范围结束端点
n: 申请可写范围
next: 结束点
err: 错误
*/
func (q *LFQueue) getWriteNext(n uint64) (next uint64, err *QueError) {
if n < 1 {
n = 1
}
var current uint64
for {
current = q.writeCursor
next = current + n
// 如果申请的空间已被写入或者队列当前游标和申请的开始不同则等待
if q.checkAvailableCapacity(current, n) && atomic.CompareAndSwapUint64(&q.writeCursor, current, next&q.endIndex) {
break
}
if current == q.writeCursor {
return q.writeCursor, &QueError{QueueIsFull}
}
runtime.Gosched()
}
return next, nil
}
// 检查当前游标开始n空间内是否都可写
func (q *LFQueue) checkAvailableCapacity(current uint64, n uint64) bool {
// 申请的空间都未被标记时才可写入,未被标记时值为默认值-1
end := current + n
current++
for current <= end {
if q.availableBuffer[current&(q.endIndex)] != -1 {
return false
}
current++
}
return true
}
/*
获取当前游标开始n内可读空间
n: 申请可读范围
next: 结束点
num: 实际可读空间
err: 错误
*/
func (q *LFQueue) getReadNext(n uint64) (start uint64, next uint64, err *QueError) {
if n < 1 {
n = 1
}
var current uint64
for {
current = q.readCursor
if q.availableBuffer[(current+1)&(q.endIndex)] == -1 {
return 0, 0, &QueError{StatusCode: QueueIsEmpty}
}
next = q.checkAvailableRead(current, n)
if atomic.CompareAndSwapUint64(&q.readCursor, current, next&q.endIndex) {
break
}
runtime.Gosched()
}
return current, next, nil
}
// 返回n以内最长可读空间
func (q *LFQueue) checkAvailableRead(current uint64, n uint64) uint64 {
end := current + n
current++
for current <= end {
index := current & (q.endIndex)
if q.availableBuffer[index] == -1 {
return current
}
current++
}
return end
}
// 获取最近的2的指数
func getCapacity(in int) int {
in--
in |= in >> 1
in |= in >> 2
in |= in >> 4
in |= in >> 8
in |= in >> 16
in++
return in
}
func NewQue(inCapacity int) *LFQueue {
capacity := uint64(getCapacity(inCapacity))
que := LFQueue{
capacity: capacity,
endIndex: capacity - 1,
writeCursor: 0,
readCursor: 0,
ringBuffer: make([]LFNode, capacity),
availableBuffer: make([]int64, capacity),
}
for index, _ := range que.availableBuffer {
que.availableBuffer[index] = -1
}
return &que
}