Skip to content

Commit 7a418d7

Browse files
committed
feat: fetch secret params in existing tokio runtime
Signed-off-by: Diwakar Sharma <diwakar.sharma@datacore.com>
1 parent 2d9316f commit 7a418d7

File tree

1 file changed

+50
-86
lines changed

1 file changed

+50
-86
lines changed

io-engine/src/grpc/v1/pool.rs

+50-86
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
pub use crate::pool_backend::FindPoolArgs as PoolIdProbe;
22
use crate::{
33
bdev::crypto::{Cipher, EncryptionKey as PoolEncKey},
4-
core::{
5-
runtime, NvmfShareProps, ProtectedSubsystems, Protocol, Reactor, ResourceLockGuard,
6-
ResourceLockManager,
7-
},
4+
core::{NvmfShareProps, ProtectedSubsystems, Protocol, ResourceLockGuard, ResourceLockManager},
85
grpc::{acquire_subsystem_lock, GrpcClientContext, GrpcResult, RWLock, RWSerializer},
96
lvs::{BsError, LvsError},
107
pool_backend::{
@@ -13,7 +10,7 @@ use crate::{
1310
},
1411
};
1512
use ::function_name::named;
16-
use futures::{channel::oneshot, FutureExt};
13+
use futures::FutureExt;
1714
use io_engine_api::v1::{
1815
common::{create_pool_request, import_pool_request, Cipher as GrpcCipher, EncryptionData},
1916
pool::*,
@@ -77,41 +74,21 @@ async fn _params_get_from_secret_source(secret_source_name: &str) -> Result<Pool
7774
PlatformType::K8s => {
7875
let client = kube::Client::try_default()
7976
.await
80-
.expect("Should be able to create kube client");
81-
// todo: Add cli arg for namespace as well
77+
.map_err(|e| Status::failed_precondition(e.to_string()))?;
78+
// todo: Add cli arg for namespace as well?
8279
let ns = client.default_namespace().to_string();
8380
let secret_provider = K8sSecretProvider::new(client, &ns);
8481
trace!(
8582
"Platform: K8S. read secret params from 'Secret' {:?}",
8683
secret_source_name
8784
);
88-
let src = secret_source_name.to_string();
89-
let (tx, rx) = oneshot::channel::<Option<PoolEncKey>>();
90-
runtime::spawn(async move {
91-
let secret_params: Option<PoolEncKey> =
92-
match secret_provider.secret_data(src.as_str()).await {
93-
Ok(d) => d,
94-
Err(e) => {
95-
// Make some noise about what went wrong during parsing.
96-
error!("Failed to parse secret data {e:?}");
97-
None
98-
}
99-
};
100-
let fut = Reactor::spawn_at_primary(async move {
101-
if tx.send(secret_params).is_err() {
102-
error!("Failed to send completion for fetch secret request.");
103-
}
104-
})
105-
.unwrap();
106-
let _ = fut.await;
107-
});
10885

109-
let s = rx
86+
let secret_params: Option<PoolEncKey> = secret_provider
87+
.secret_data(secret_source_name)
11088
.await
111-
.map_err(|e| Status::failed_precondition(e.to_string()))?
112-
.ok_or(Status::failed_precondition("Failed to get secret data"))?;
89+
.map_err(|e| Status::failed_precondition(e.to_string()))?;
11390

114-
Some(s)
91+
secret_params
11592
}
11693
// XXX: Assuming this base path for now. Need to implement capturing
11794
// a base path via cli or env.
@@ -121,48 +98,27 @@ async fn _params_get_from_secret_source(secret_source_name: &str) -> Result<Pool
12198
"Platform: Deployer. read secret params from file {:?}",
12299
secret_source_name
123100
);
124-
let (tx, rx) = oneshot::channel::<Option<PoolEncKey>>();
125-
let src = secret_source_name.to_string();
126-
runtime::spawn(async move {
127-
let secret_params: Option<PoolEncKey> =
128-
match file_provider.secret_data(src.as_str()).await {
129-
Ok(d) => d,
130-
Err(e) => {
131-
// Make some noise about what went wrong during parsing.
132-
error!("Failed to parse secret data {e:?}");
133-
None
134-
}
135-
};
136-
137-
let fut = Reactor::spawn_at_primary(async move {
138-
if tx.send(secret_params).is_err() {
139-
error!("Failed to send completion for fetch secret request.");
140-
}
141-
})
142-
.unwrap();
143-
let _ = fut.await;
144-
});
145101

146-
let s = rx
102+
let secret_params: Option<PoolEncKey> = file_provider
103+
.secret_data(secret_source_name)
147104
.await
148-
.map_err(|e| Status::failed_precondition(e.to_string()))?
149-
.ok_or(Status::failed_precondition("Failed to get secret data"))?;
105+
.map_err(|e| Status::failed_precondition(e.to_string()))?;
150106

151-
Some(s)
107+
secret_params
152108
}
153109
PlatformType::None => unreachable!(),
154110
};
111+
112+
// Be very sure that we get the params otherwise always return error.
155113
secret_params.ok_or(Status::failed_precondition(
156-
"failed to parse secret params from source",
114+
"failed to parse secret params from source: None",
157115
))
158116
}
159117

160-
/// Helper routine to extract Encryption params from the
161-
/// Create or Import pool request.
118+
/// Helper routine to extract Encryption params from the Create or Import pool request.
162119
async fn util_fetch_secret_params(
163120
params: &PoolEncryptionParams,
164-
pool_args: &mut PoolArgs,
165-
) -> Result<(), Status> {
121+
) -> Result<Option<PoolEncKey>, Status> {
166122
let enc_key = match params {
167123
PoolEncryptionParams::Create(enc_arg) => {
168124
match enc_arg.clone() {
@@ -188,12 +144,7 @@ async fn util_fetch_secret_params(
188144
PoolEncryptionParams::NoEncryptionParams => None,
189145
};
190146

191-
pool_args.crypto_vbdev_name = enc_key
192-
.as_ref()
193-
.map(|_| format!("crypto_{}", pool_args.name));
194-
pool_args.enc_key = enc_key;
195-
196-
Ok(())
147+
Ok(enc_key)
197148
}
198149

199150
/// RPC service for mayastor pool operations
@@ -655,21 +606,27 @@ impl GrpcPoolFactory {
655606
impl PoolRpc for PoolService {
656607
#[named]
657608
async fn create_pool(&self, request: Request<CreatePoolRequest>) -> GrpcResult<Pool> {
609+
// Check if the pool is required to be encrypted, and fetch the required
610+
// encryption parameters from specified source.
611+
let enc_arg = match request.get_ref().encryption {
612+
Some(ref e) => PoolEncryptionParams::Create(e.clone()),
613+
_ => PoolEncryptionParams::NoEncryptionParams,
614+
};
615+
let enc_key = util_fetch_secret_params(&enc_arg).await?;
616+
658617
self.locked(
659618
GrpcClientContext::new(&request, function_name!()),
660619
async move {
661620
crate::spdk_submit!(async move {
662621
info!("{:?}", request.get_ref());
663-
664-
let req = request.into_inner();
665-
let enc_arg = match req.encryption {
666-
Some(ref e) => PoolEncryptionParams::Create(e.clone()),
667-
_ => PoolEncryptionParams::NoEncryptionParams,
668-
};
669-
let factory = GrpcPoolFactory::new(PoolBackend::try_from(&req.pooltype)?)?;
670-
let mut pool_args = PoolArgs::try_from(req)?;
671-
// This performs async operations hence calling from outside try_from.
672-
util_fetch_secret_params(&enc_arg, &mut pool_args).await?;
622+
let factory =
623+
GrpcPoolFactory::new(PoolBackend::try_from(request.get_ref().pooltype)?)?;
624+
let mut pool_args = PoolArgs::try_from(request.into_inner())?;
625+
626+
pool_args.crypto_vbdev_name = enc_key
627+
.as_ref()
628+
.map(|_| format!("crypto_{}", pool_args.name));
629+
pool_args.enc_key = enc_key;
673630
factory.create(pool_args).await
674631
})
675632
},
@@ -711,21 +668,28 @@ impl PoolRpc for PoolService {
711668

712669
#[named]
713670
async fn import_pool(&self, request: Request<ImportPoolRequest>) -> GrpcResult<Pool> {
671+
// If the pool to be imported is encrypted, fetch the required
672+
// encryption parameters from specified source.
673+
let enc_arg = match request.get_ref().encryption {
674+
Some(ref e) => PoolEncryptionParams::Import(e.clone()),
675+
_ => PoolEncryptionParams::NoEncryptionParams,
676+
};
677+
let enc_key = util_fetch_secret_params(&enc_arg).await?;
678+
714679
self.locked(
715680
GrpcClientContext::new(&request, function_name!()),
716681
async move {
717682
crate::spdk_submit!(async move {
718683
info!("{:?}", request.get_ref());
684+
let factory =
685+
GrpcPoolFactory::new(PoolBackend::try_from(request.get_ref().pooltype)?)?;
686+
let mut pool_args = PoolArgs::try_from(request.into_inner())?;
687+
688+
pool_args.crypto_vbdev_name = enc_key
689+
.as_ref()
690+
.map(|_| format!("crypto_{}", pool_args.name));
691+
pool_args.enc_key = enc_key;
719692

720-
let req = request.into_inner();
721-
let enc_arg = match req.encryption {
722-
Some(ref e) => PoolEncryptionParams::Import(e.clone()),
723-
_ => PoolEncryptionParams::NoEncryptionParams,
724-
};
725-
let factory = GrpcPoolFactory::new(PoolBackend::try_from(&req.pooltype)?)?;
726-
let mut pool_args = PoolArgs::try_from(req)?;
727-
// This performs async operations hence calling from outside try_from.
728-
util_fetch_secret_params(&enc_arg, &mut pool_args).await?;
729693
factory.import(pool_args).await
730694
})
731695
},

0 commit comments

Comments
 (0)