-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexec.go
120 lines (104 loc) · 2.87 KB
/
exec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package mysql
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"os"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/go-sql-driver/mysql"
)
// exec executes a query and nothing more
// newQuery is true if this is a new query, false if it's a replay of a query in a transaction
func (db *Database) exec(conn handlerWithContext, ctx context.Context, tx *Tx, newQuery bool, query string, params ...any) (sql.Result, error) {
replacedQuery, normalizedParams, err := db.interpolateParams(query, params...)
if err != nil {
return nil, fmt.Errorf("failed to interpolate params: %w", err)
}
if db.die {
fmt.Println(replacedQuery)
j, _ := json.MarshalIndent(normalizedParams, "", " ")
fmt.Println(string(j))
os.Exit(0)
}
start := time.Now()
var res sql.Result
var b = backoff.NewExponentialBackOff()
b.MaxElapsedTime = MaxExecutionTime
var attempt int
var rowsAffected int64
exec := func() error {
attempt++
var err error
res, err = conn.ExecContext(ctx, replacedQuery)
if res != nil {
rowsAffected, _ = res.RowsAffected()
}
realTx, _ := conn.(*sql.Tx)
db.callLog(LogDetail{
Query: replacedQuery,
Params: normalizedParams,
Duration: time.Since(start),
RowsAffected: rowsAffected,
Tx: realTx,
Attempt: attempt,
Error: err,
})
if err != nil {
var handleDeadlock func(err error) error
handleDeadlock = func(err error) error {
if tx == nil || !checkDeadlockError(err) {
return nil
}
// if this is a tx replay query already, we don't want it running all of the queries back
// again, so we just return the error immediately to left the top level retry loop handle it
if !newQuery {
return backoff.Permanent(err)
}
// deadlock occurred, which means *every* query in this transaction
// was rolled back, so we need to run them all again
tx.updates.RLock()
defer tx.updates.RUnlock()
for _, q := range tx.updates.queries {
_, err := db.exec(conn, ctx, nil, false, q)
if err := handleDeadlock(err); err != nil {
return err
}
if err != nil {
return err
}
}
// return the original deadlock error to resume regular functionality of the retry loop
return err
}
if err := handleDeadlock(err); err != nil {
return err
}
if checkRetryError(err) {
return err
} else if errors.Is(err, mysql.ErrInvalidConn) {
return db.Test()
} else {
return backoff.Permanent(err)
}
}
return nil
}
err = backoff.Retry(exec, backoff.WithContext(b, ctx))
if err != nil {
return nil, Error{
Err: err,
OriginalQuery: query,
ReplacedQuery: replacedQuery,
Params: normalizedParams,
}
}
if tx != nil && newQuery {
tx.updates.Lock()
defer tx.updates.Unlock()
tx.updates.queries = append(tx.updates.queries, replacedQuery)
}
return res, nil
}