Skip to content

Commit 640cfe1

Browse files
committed
fix(lock): make import pool, create replica and share replica operations mutually exclusive
Signed-off-by: Hrudaya <hrudayaranjan.sahoo@datacore.com>
1 parent f2f74e7 commit 640cfe1

File tree

11 files changed

+177
-35
lines changed

11 files changed

+177
-35
lines changed

io-engine/src/bin/io-engine.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@ fn start_tokio_runtime(args: &MayastorCliArgs) {
100100

101101
// Initialize Lock manager.
102102
let cfg = ResourceLockManagerConfig::default()
103-
.with_subsystem(ProtectedSubsystems::NEXUS, 512);
103+
.with_subsystem(ProtectedSubsystems::POOL, 32)
104+
.with_subsystem(ProtectedSubsystems::NEXUS, 512)
105+
.with_subsystem(ProtectedSubsystems::REPLICA, 1024);
104106
ResourceLockManager::initialize(cfg);
105107

106108
Mthread::spawn_unaffinitized(move || {

io-engine/src/core/lock.rs

+17-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use once_cell::sync::OnceCell;
1111
pub struct ProtectedSubsystems;
1212
impl ProtectedSubsystems {
1313
pub const NEXUS: &'static str = "nexus";
14+
pub const POOL: &'static str = "pool";
15+
pub const REPLICA: &'static str = "replica";
1416
}
1517

1618
/// Configuration parameters for initialization of the Lock manager.
@@ -41,6 +43,7 @@ impl ResourceLockManagerConfig {
4143
}
4244

4345
/// Resource subsystem that holds locks for all resources withing this system.
46+
#[derive(Debug)]
4447
pub struct ResourceSubsystem {
4548
id: String,
4649
object_locks: Vec<Mutex<LockStats>>,
@@ -67,22 +70,23 @@ impl ResourceSubsystem {
6770
pub async fn lock(
6871
&self,
6972
wait_timeout: Option<Duration>,
73+
try_lock: bool,
7074
) -> Option<ResourceLockGuard<'_>> {
71-
acquire_lock(&self.subsystem_lock, wait_timeout).await
75+
acquire_lock(&self.subsystem_lock, wait_timeout, try_lock).await
7276
}
7377

7478
/// Lock subsystem resource by its ID and obtain a lock guard.
7579
pub async fn lock_resource<T: AsRef<str>>(
7680
&self,
7781
id: T,
7882
wait_timeout: Option<Duration>,
83+
try_lock: bool,
7984
) -> Option<ResourceLockGuard<'_>> {
8085
// Calculate hash of the object to get the mutex index.
8186
let mut hasher = DefaultHasher::new();
8287
id.as_ref().hash(&mut hasher);
8388
let mutex_id = hasher.finish() as usize % self.object_locks.len();
84-
85-
acquire_lock(&self.object_locks[mutex_id], wait_timeout).await
89+
acquire_lock(&self.object_locks[mutex_id], wait_timeout, try_lock).await
8690
}
8791
}
8892

@@ -122,14 +126,21 @@ static LOCK_MANAGER: OnceCell<ResourceLockManager> = OnceCell::new();
122126
async fn acquire_lock(
123127
lock: &Mutex<LockStats>,
124128
wait_timeout: Option<Duration>,
129+
try_lock: bool,
125130
) -> Option<ResourceLockGuard<'_>> {
126131
let mut lock_guard = if let Some(d) = wait_timeout {
127132
match tokio::time::timeout(d, lock.lock()).await {
128133
Err(_) => return None,
129134
Ok(g) => g,
130135
}
136+
} else if try_lock {
137+
// No timeout, try for taking lock immediately.
138+
match lock.try_lock() {
139+
Some(l) => l,
140+
None => return None,
141+
}
131142
} else {
132-
// No timeout, wait for the lock indefinitely.
143+
// No timeout, wait indefinitely.
133144
lock.lock().await
134145
};
135146

@@ -162,8 +173,9 @@ impl ResourceLockManager {
162173
pub async fn lock(
163174
&self,
164175
wait_timeout: Option<Duration>,
176+
try_lock: bool,
165177
) -> Option<ResourceLockGuard<'_>> {
166-
acquire_lock(&self.mgr_lock, wait_timeout).await
178+
acquire_lock(&self.mgr_lock, wait_timeout, try_lock).await
167179
}
168180

169181
/// Get resource subsystem by its id.

io-engine/src/grpc/mod.rs

+33-5
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,24 @@
1+
use futures::channel::oneshot::Receiver;
2+
use nix::errno::Errno;
3+
pub use server::MayastorGrpcServer;
14
use std::{
25
error::Error,
36
fmt::{Debug, Display},
47
future::Future,
58
time::Duration,
69
};
7-
8-
use futures::channel::oneshot::Receiver;
9-
use nix::errno::Errno;
10-
pub use server::MayastorGrpcServer;
1110
use tokio::sync::RwLock;
1211
use tonic::{Request, Response, Status};
1312

1413
use crate::{
1514
bdev_api::BdevError,
16-
core::{CoreError, Reactor, VerboseError},
15+
core::{
16+
CoreError,
17+
Reactor,
18+
ResourceLockGuard,
19+
ResourceSubsystem,
20+
VerboseError,
21+
},
1722
};
1823

1924
impl From<BdevError> for tonic::Status {
@@ -168,6 +173,29 @@ where
168173
.map_err(|_| Status::resource_exhausted("ENOMEM"))
169174
}
170175

176+
/// Manage locks across multiple grpc services.
177+
pub async fn acquire_subsystem_lock<'a>(
178+
subsystem: &'a ResourceSubsystem,
179+
resource: Option<&str>,
180+
) -> Result<ResourceLockGuard<'a>, Status> {
181+
if let Some(resource) = resource {
182+
match subsystem.lock_resource(resource.to_string(), None, true).await {
183+
Some(lock_guard) => Ok(lock_guard),
184+
None => Err(Status::already_exists(format!(
185+
"Failed to acquire lock for the resource: {resource}, lock already held"
186+
))),
187+
}
188+
} else {
189+
match subsystem.lock(None, true).await {
190+
Some(lock_guard) => Ok(lock_guard),
191+
None => Err(Status::already_exists(format!(
192+
"Failed to acquire subsystem lock: {:?}, lock already held",
193+
subsystem
194+
))),
195+
}
196+
}
197+
}
198+
171199
macro_rules! default_ip {
172200
() => {
173201
"0.0.0.0"

io-engine/src/grpc/v0/mayastor_grpc.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ impl MayastorSvc {
155155
match tokio::spawn(async move {
156156
// Grab global operation lock, if requested.
157157
let _global_guard = if global_operation {
158-
match lock_manager.lock(Some(ctx.timeout)).await {
158+
match lock_manager.lock(Some(ctx.timeout), false).await {
159159
Some(g) => Some(g),
160160
None => return Err(Status::deadline_exceeded(
161161
"Failed to acquire access to object within given timeout"
@@ -169,7 +169,7 @@ impl MayastorSvc {
169169
// Grab per-object lock before executing the future.
170170
let _resource_guard = match lock_manager
171171
.get_subsystem(ProtectedSubsystems::NEXUS)
172-
.lock_resource(nexus_uuid, Some(ctx.timeout))
172+
.lock_resource(nexus_uuid, Some(ctx.timeout), false)
173173
.await {
174174
Some(g) => g,
175175
None => return Err(Status::deadline_exceeded(
@@ -302,6 +302,9 @@ impl From<LvsError> for tonic::Status {
302302
LvsError::WipeFailed {
303303
source,
304304
} => source.into(),
305+
LvsError::ResourceLockFailed {
306+
..
307+
} => Status::aborted(e.to_string()),
305308
_ => Status::internal(e.verbose()),
306309
}
307310
}

io-engine/src/grpc/v1/nexus.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl NexusService {
8181
match tokio::spawn(async move {
8282
// Grab global operation lock, if requested.
8383
let _global_guard = if global_operation {
84-
match lock_manager.lock(Some(ctx.timeout)).await {
84+
match lock_manager.lock(Some(ctx.timeout), false).await {
8585
Some(g) => Some(g),
8686
None => return Err(Status::deadline_exceeded(
8787
"Failed to acquire access to object within given timeout"
@@ -95,7 +95,7 @@ impl NexusService {
9595
// Grab per-object lock before executing the future.
9696
let _resource_guard = match lock_manager
9797
.get_subsystem(ProtectedSubsystems::NEXUS)
98-
.lock_resource(nexus_uuid, Some(ctx.timeout))
98+
.lock_resource(nexus_uuid, Some(ctx.timeout), false)
9999
.await {
100100
Some(g) => g,
101101
None => return Err(Status::deadline_exceeded(

io-engine/src/grpc/v1/pool.rs

+27-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
use crate::{
2-
core::Share,
3-
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer},
2+
core::{
3+
lock::{ProtectedSubsystems, ResourceLockManager},
4+
Share,
5+
},
6+
grpc::{
7+
acquire_subsystem_lock,
8+
rpc_submit,
9+
GrpcClientContext,
10+
GrpcResult,
11+
RWLock,
12+
RWSerializer,
13+
},
414
lvs::{Error as LvsError, Lvs},
515
pool_backend::{PoolArgs, PoolBackend},
616
};
@@ -314,6 +324,21 @@ impl PoolRpc for PoolService {
314324
let args = request.into_inner();
315325
info!("{:?}", args);
316326
let rx = rpc_submit::<_, _, LvsError>(async move {
327+
let pool_subsystem = ResourceLockManager::get_instance()
328+
.get_subsystem(ProtectedSubsystems::POOL);
329+
let _lock_guard = acquire_subsystem_lock(
330+
pool_subsystem,
331+
Some(&args.name),
332+
)
333+
.await
334+
.map_err(|_| {
335+
LvsError::ResourceLockFailed {
336+
msg: format!(
337+
"resource {}, for disk pool {:?}",
338+
&args.name, &args.disks,
339+
),
340+
}
341+
})?;
317342
let pool = Lvs::import_from_args(PoolArgs::try_from(args)?)
318343
.await?;
319344
Ok(Pool::from(pool))

io-engine/src/grpc/v1/replica.rs

+36-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::{
22
bdev::PtplFileOps,
33
bdev_api::BdevError,
44
core::{
5+
lock::{ProtectedSubsystems, ResourceLockManager},
56
logical_volume::LogicalVolume,
67
Bdev,
78
CloneXattrs,
@@ -11,7 +12,14 @@ use crate::{
1112
UntypedBdev,
1213
UpdateProps,
1314
},
14-
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer},
15+
grpc::{
16+
acquire_subsystem_lock,
17+
rpc_submit,
18+
GrpcClientContext,
19+
GrpcResult,
20+
RWLock,
21+
RWSerializer,
22+
},
1523
lvs::{Error as LvsError, Lvol, LvolSpaceUsage, Lvs, LvsLvol, PropValue},
1624
};
1725
use ::function_name::named;
@@ -219,6 +227,20 @@ impl ReplicaRpc for ReplicaService {
219227
}
220228
}
221229
};
230+
let pool_subsystem = ResourceLockManager::get_instance().get_subsystem(ProtectedSubsystems::POOL);
231+
let _lock_guard = acquire_subsystem_lock(
232+
pool_subsystem, Some(lvs.name())
233+
)
234+
.await
235+
.map_err(|_|
236+
LvsError::ResourceLockFailed {
237+
msg: format!(
238+
"resource {}, for pooluuid {}",
239+
lvs.name(),
240+
args.pooluuid
241+
)
242+
}
243+
)?;
222244
// if pooltype is not Lvs, the provided replica uuid need to be added as
223245
match lvs.create_lvol(&args.name, args.size, Some(&args.uuid), args.thin, args.entity_id).await {
224246
Ok(mut lvol)
@@ -401,7 +423,19 @@ impl ReplicaRpc for ReplicaService {
401423
match Bdev::lookup_by_uuid_str(&args.uuid) {
402424
Some(bdev) => {
403425
let mut lvol = Lvol::try_from(bdev)?;
404-
426+
let pool_subsystem = ResourceLockManager::get_instance().get_subsystem(ProtectedSubsystems::POOL);
427+
let _lock_guard = acquire_subsystem_lock(
428+
pool_subsystem,
429+
Some(lvol.lvs().name()),
430+
)
431+
.await
432+
.map_err(|_| LvsError::ResourceLockFailed {
433+
msg: format!(
434+
"resource {}, for lvol {:?}",
435+
lvol.lvs().name(),
436+
lvol
437+
),
438+
})?;
405439
// if we are already shared with the same protocol
406440
if lvol.shared()
407441
== Some(Protocol::try_from(args.share)?)

io-engine/src/grpc/v1/snapshot.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ impl SnapshotService {
241241
match tokio::spawn(async move {
242242
// Grab global operation lock, if requested.
243243
let _global_guard = if global_operation {
244-
match lock_manager.lock(Some(ctx.timeout)).await {
244+
match lock_manager.lock(Some(ctx.timeout), false).await {
245245
Some(g) => Some(g),
246246
None => return Err(Status::deadline_exceeded(
247247
"Failed to acquire access to object within given timeout"
@@ -255,7 +255,7 @@ impl SnapshotService {
255255
// Grab per-object lock before executing the future.
256256
let _resource_guard = match lock_manager
257257
.get_subsystem(ProtectedSubsystems::NEXUS)
258-
.lock_resource(nexus_uuid, Some(ctx.timeout))
258+
.lock_resource(nexus_uuid, Some(ctx.timeout), false)
259259
.await {
260260
Some(g) => g,
261261
None => return Err(Status::deadline_exceeded(

io-engine/src/grpc/v1/stats.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ where
5050
let lock_manager = ResourceLockManager::get_instance();
5151
// For nexus global lock.
5252
let _global_guard =
53-
match lock_manager.lock(Some(ctx.timeout)).await {
53+
match lock_manager.lock(Some(ctx.timeout), false).await {
5454
Some(g) => Some(g),
5555
None => return Err(Status::deadline_exceeded(
5656
"Failed to acquire access to object within given timeout",
@@ -96,7 +96,7 @@ impl StatsService {
9696
let lock_manager = ResourceLockManager::get_instance();
9797
// For nexus global lock.
9898
let _global_guard =
99-
match lock_manager.lock(Some(ctx.timeout)).await {
99+
match lock_manager.lock(Some(ctx.timeout), false).await {
100100
Some(g) => Some(g),
101101
None => return Err(Status::deadline_exceeded(
102102
"Failed to acquire access to object within given timeout",

io-engine/src/lvs/lvs_error.rs

+7
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ pub enum Error {
178178
WipeFailed {
179179
source: crate::core::wiper::Error,
180180
},
181+
#[snafu(display("Failed to acquire resource lock, {}", msg))]
182+
ResourceLockFailed {
183+
msg: String,
184+
},
181185
}
182186

183187
/// Map CoreError to errno code.
@@ -265,6 +269,9 @@ impl ToErrno for Error {
265269
Self::WipeFailed {
266270
..
267271
} => Errno::EINVAL,
272+
Self::ResourceLockFailed {
273+
..
274+
} => Errno::EBUSY,
268275
}
269276
}
270277
}

0 commit comments

Comments
 (0)