-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathRedisConnection.cs
181 lines (155 loc) · 7.47 KB
/
RedisConnection.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Threading.Tasks;
using System.Threading;
namespace mikkyredis
{
public class RedisConnection : IDisposable
{
private long _lastReconnectTicks = DateTimeOffset.MinValue.UtcTicks;
private DateTimeOffset _firstErrorTime = DateTimeOffset.MinValue;
private DateTimeOffset _previousErrorTime = DateTimeOffset.MinValue;
// StackExchange.Redis will also be trying to reconnect internally,
// so limit how often we recreate the ConnectionMultiplexer instance
// in an attempt to reconnect
private readonly TimeSpan ReconnectMinInterval = TimeSpan.FromSeconds(60);
// If errors occur for longer than this threshold, StackExchange.Redis
// may be failing to reconnect internally, so we'll recreate the
// ConnectionMultiplexer instance
private readonly TimeSpan ReconnectErrorThreshold = TimeSpan.FromSeconds(30);
private readonly TimeSpan RestartConnectionTimeout = TimeSpan.FromSeconds(15);
private const int RetryMaxAttempts = 5;
private SemaphoreSlim _reconnectSemaphore = new SemaphoreSlim(initialCount: 1, maxCount: 1);
private readonly string _connectionString;
private ConnectionMultiplexer _connection;
private IDatabase _database;
private RedisConnection(string connectionString)
{
_connectionString = connectionString;
}
public static async Task<RedisConnection> InitializeAsync(string connectionString)
{
var redisConnection = new RedisConnection(connectionString);
await redisConnection.ForceReconnectAsync(initializing: true);
return redisConnection;
}
// In real applications, consider using a framework such as
// Polly to make it easier to customize the retry approach.
// For more info, please see: https://github.com/App-vNext/Polly
public async Task<T> BasicRetryAsync<T>(Func<IDatabase, Task<T>> func)
{
int reconnectRetry = 0;
while (true)
{
try
{
return await func(_database);
}
catch (Exception ex) when (ex is RedisConnectionException || ex is SocketException || ex is ObjectDisposedException)
{
reconnectRetry++;
if (reconnectRetry > RetryMaxAttempts)
{
throw;
}
try
{
await ForceReconnectAsync();
}
catch (ObjectDisposedException) { }
return await func(_database);
}
}
}
/// <summary>
/// Force a new ConnectionMultiplexer to be created.
/// NOTES:
/// 1. Users of the ConnectionMultiplexer MUST handle ObjectDisposedExceptions, which can now happen as a result of calling ForceReconnectAsync().
/// 2. Call ForceReconnectAsync() for RedisConnectionExceptions and RedisSocketExceptions. You can also call it for RedisTimeoutExceptions,
/// but only if you're using generous ReconnectMinInterval and ReconnectErrorThreshold. Otherwise, establishing new connections can cause
/// a cascade failure on a server that's timing out because it's already overloaded.
/// 3. The code will:
/// a. wait to reconnect for at least the "ReconnectErrorThreshold" time of repeated errors before actually reconnecting
/// b. not reconnect more frequently than configured in "ReconnectMinInterval"
/// </summary>
/// <param name="initializing">Should only be true when ForceReconnect is running at startup.</param>
private async Task ForceReconnectAsync(bool initializing = false)
{
long previousTicks = Interlocked.Read(ref _lastReconnectTicks);
var previousReconnectTime = new DateTimeOffset(previousTicks, TimeSpan.Zero);
TimeSpan elapsedSinceLastReconnect = DateTimeOffset.UtcNow - previousReconnectTime;
// We want to limit how often we perform this top-level reconnect, so we check how long it's been since our last attempt.
if (elapsedSinceLastReconnect < ReconnectMinInterval)
{
return;
}
try
{
await _reconnectSemaphore.WaitAsync(RestartConnectionTimeout);
}
catch
{
// If we fail to enter the semaphore, then it is possible that another thread has already done so.
// ForceReconnectAsync() can be retried while connectivity problems persist.
return;
}
try
{
var utcNow = DateTimeOffset.UtcNow;
elapsedSinceLastReconnect = utcNow - previousReconnectTime;
if (_firstErrorTime == DateTimeOffset.MinValue && !initializing)
{
// We haven't seen an error since last reconnect, so set initial values.
_firstErrorTime = utcNow;
_previousErrorTime = utcNow;
return;
}
if (elapsedSinceLastReconnect < ReconnectMinInterval)
{
return; // Some other thread made it through the check and the lock, so nothing to do.
}
TimeSpan elapsedSinceFirstError = utcNow - _firstErrorTime;
TimeSpan elapsedSinceMostRecentError = utcNow - _previousErrorTime;
bool shouldReconnect =
elapsedSinceFirstError >= ReconnectErrorThreshold // Make sure we gave the multiplexer enough time to reconnect on its own if it could.
&& elapsedSinceMostRecentError <= ReconnectErrorThreshold; // Make sure we aren't working on stale data (e.g. if there was a gap in errors, don't reconnect yet).
// Update the previousErrorTime timestamp to be now (e.g. this reconnect request).
_previousErrorTime = utcNow;
if (!shouldReconnect && !initializing)
{
return;
}
_firstErrorTime = DateTimeOffset.MinValue;
_previousErrorTime = DateTimeOffset.MinValue;
if (_connection != null)
{
try
{
await _connection.CloseAsync();
}
catch
{
// Ignore any errors from the old connection
}
}
Interlocked.Exchange(ref _connection, null);
ConnectionMultiplexer newConnection = await ConnectionMultiplexer.ConnectAsync(_connectionString);
Interlocked.Exchange(ref _connection, newConnection);
Interlocked.Exchange(ref _lastReconnectTicks, utcNow.UtcTicks);
IDatabase newDatabase = _connection.GetDatabase();
Interlocked.Exchange(ref _database, newDatabase);
}
finally
{
_reconnectSemaphore.Release();
}
}
public void Dispose()
{
try { _connection?.Dispose(); } catch { }
}
}
}