Skip to content

Commit 0ab97bd

Browse files
mayastor-borstiagolobocastro
mayastor-bors
andcommitted
Merge #1581
1581: Rebuild refactoring r=tiagolobocastro a=tiagolobocastro refactor(rebuild): make the rebuild more generic A rebuild is essentially a copy from one bdev to another. However, we can have different variations on this. This change aims to make the rebuild more generic, removing the nexus specific bits from the core rebuild logic and allowing us to compose different types of rebuild in the future, whilst being able to reuse the shared bits. To achieve this the rebuild backend is split into a rebuild manager which is the generic component responsible for running and managing the rebuild and its tasks. We can then implement different rebuild backends as we see fit, example a nexus rebuild which locks ranges or a regular bdev to bdev rebuild. Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com> --- fix(rebuild): send final rebuild state on drop Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com> Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>
2 parents 5ed2160 + dc53c12 commit 0ab97bd

18 files changed

+1064
-603
lines changed

.github/workflows/pr-commitlint.yml

+9
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,14 @@ jobs:
2121
if [ ! ${{ github.ref }} = "refs/heads/staging" ]; then
2222
first_commit=${{ github.event.pull_request.base.sha }}
2323
last_commit=${{ github.event.pull_request.head.sha }}
24+
# Ensure code-review commits don't get merged
25+
sed "s/code-review-rule': \[0/code-review-rule': [2/g" -i commitlint.config.js
2426
npx commitlint --from $first_commit --to $last_commit -V
27+
28+
git log --pretty=format:%s $first_commit..$last_commit > ./subjects
29+
duplicates="$(cat ./subjects | sort | uniq -D)"
30+
if [ "$duplicates" != "" ]; then
31+
echo -e "Duplicate commits found:\n$duplicates" >&2
32+
exit 1
33+
fi
2534
fi

commitlint.config.js

+18-3
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,25 @@ module.exports = {
22
extends: ['@commitlint/config-conventional'],
33
rules: {
44
'type-enum': [2, 'always', ['build', 'chore', 'ci', 'docs', 'feat', 'fix', 'perf', 'refactor', 'revert', 'style', 'test', 'example']],
5+
'code-review-rule': [0, 'always'],
56
},
67
defaultIgnores: false,
78
ignores: [
8-
(message) => message.startsWith('chore(bors): merge pull request #'),
9-
(message) => message.startsWith('Merge #')
10-
]
9+
(message) => message.startsWith('chore(bors): merge pull request #'),
10+
(message) => message.startsWith('Merge #')
11+
],
12+
plugins: [
13+
{
14+
rules: {
15+
'code-review-rule': ({subject}) => {
16+
const REVIEW_COMMENTS = `Please don't merge code-review commits, instead squash them in the parent commit`;
17+
if (subject.includes('code-review')) return [ false, REVIEW_COMMENTS ];
18+
if (subject.includes('review comment')) return [ false, REVIEW_COMMENTS ];
19+
if (subject.includes('address comment')) return [ false, REVIEW_COMMENTS ];
20+
if (subject.includes('addressed comment')) return [ false, REVIEW_COMMENTS ];
21+
return [ true ];
22+
},
23+
},
24+
},
25+
],
1126
}

io-engine-tests/src/lib.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use io_engine::{
1717
core::{MayastorEnvironment, Mthread},
1818
logger,
1919
logger::LogFormat,
20-
rebuild::{RebuildJob, RebuildState},
20+
rebuild::{NexusRebuildJob, RebuildState},
2121
};
2222

2323
pub mod bdev;
@@ -457,7 +457,7 @@ pub async fn wait_for_rebuild(
457457
timeout: Duration,
458458
) {
459459
let (s, r) = unbounded::<()>();
460-
let job = match RebuildJob::lookup(&dst_uri) {
460+
let job = match NexusRebuildJob::lookup(&dst_uri) {
461461
Ok(job) => job,
462462
Err(_) => return,
463463
};
@@ -490,7 +490,7 @@ pub async fn wait_for_rebuild(
490490
error
491491
});
492492
reactor_poll!(r);
493-
if let Ok(job) = RebuildJob::lookup(&dst_uri) {
493+
if let Ok(job) = NexusRebuildJob::lookup(&dst_uri) {
494494
job.stats().await;
495495
}
496496
t.join().unwrap().unwrap();

io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use crate::{
1818
eventing::{EventMetaGen, EventWithMeta},
1919
rebuild::{
2020
HistoryRecord,
21+
NexusRebuildJob,
2122
RebuildError,
22-
RebuildJob,
2323
RebuildJobOptions,
2424
RebuildState,
2525
RebuildStats,
@@ -139,8 +139,8 @@ impl<'n> Nexus<'n> {
139139
self.reconfigure(DrEvent::ChildRebuild).await;
140140

141141
// Stop the I/O log and create a rebuild map from it.
142-
// As this is done after the reconfiguraion, any new write I/Os will
143-
// now reach the destionation child, and no rebuild will be required
142+
// As this is done after the reconfiguration, any new write I/Os will
143+
// now reach the destination child, and no rebuild will be required
144144
// for them.
145145
let map = self
146146
.lookup_child(&dst_child_uri)
@@ -186,7 +186,7 @@ impl<'n> Nexus<'n> {
186186
verify_mode,
187187
};
188188

189-
RebuildJob::new(
189+
NexusRebuildJob::new(
190190
&self.name,
191191
src_child_uri,
192192
dst_child_uri,
@@ -202,7 +202,7 @@ impl<'n> Nexus<'n> {
202202
},
203203
)
204204
.await
205-
.and_then(RebuildJob::store)
205+
.and_then(NexusRebuildJob::store)
206206
.context(nexus_err::CreateRebuild {
207207
child: dst_child_uri.to_owned(),
208208
name: self.name.clone(),
@@ -211,7 +211,7 @@ impl<'n> Nexus<'n> {
211211

212212
/// Translates the job into a new history record and pushes into
213213
/// the history.
214-
fn create_history_record(&self, job: Arc<RebuildJob>) {
214+
fn create_history_record(&self, job: Arc<NexusRebuildJob>) {
215215
let Some(rec) = job.history_record() else {
216216
error!("{self:?}: try to get history record on unfinished job");
217217
return;
@@ -330,7 +330,7 @@ impl<'n> Nexus<'n> {
330330
pub async fn cancel_rebuild_jobs(&self, src_uri: &str) -> Vec<String> {
331331
info!("{:?}: cancel rebuild jobs from '{}'...", self, src_uri);
332332

333-
let src_jobs = RebuildJob::lookup_src(src_uri);
333+
let src_jobs = NexusRebuildJob::lookup_src(src_uri);
334334
let mut terminated_jobs = Vec::new();
335335
let mut rebuilding_children = Vec::new();
336336

@@ -375,8 +375,8 @@ impl<'n> Nexus<'n> {
375375
pub(crate) fn rebuild_job(
376376
&self,
377377
dst_child_uri: &str,
378-
) -> Result<std::sync::Arc<RebuildJob>, Error> {
379-
RebuildJob::lookup(dst_child_uri).map_err(|_| {
378+
) -> Result<std::sync::Arc<NexusRebuildJob>, Error> {
379+
NexusRebuildJob::lookup(dst_child_uri).map_err(|_| {
380380
Error::RebuildJobNotFound {
381381
child: dst_child_uri.to_owned(),
382382
name: self.name.to_owned(),
@@ -389,9 +389,9 @@ impl<'n> Nexus<'n> {
389389
pub(crate) fn rebuild_job_mut(
390390
&self,
391391
dst_child_uri: &str,
392-
) -> Result<Arc<RebuildJob>, Error> {
392+
) -> Result<Arc<NexusRebuildJob>, Error> {
393393
let name = self.name.clone();
394-
RebuildJob::lookup(dst_child_uri).map_err(|_| {
394+
NexusRebuildJob::lookup(dst_child_uri).map_err(|_| {
395395
Error::RebuildJobNotFound {
396396
child: dst_child_uri.to_owned(),
397397
name,

io-engine/src/bdev/nexus/nexus_child.rs

+7-5
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::{
2929
VerboseError,
3030
},
3131
persistent_store::PersistentStore,
32-
rebuild::{RebuildJob, RebuildMap},
32+
rebuild::{NexusRebuildJob, RebuildMap},
3333
};
3434

3535
use crate::{
@@ -1199,13 +1199,15 @@ impl<'c> NexusChild<'c> {
11991199
/// TODO
12001200
pub(super) fn remove_rebuild_job(
12011201
&self,
1202-
) -> Option<std::sync::Arc<RebuildJob>> {
1203-
RebuildJob::remove(&self.name).ok()
1202+
) -> Option<std::sync::Arc<NexusRebuildJob>> {
1203+
NexusRebuildJob::remove(&self.name).ok()
12041204
}
12051205

12061206
/// Return the rebuild job which is rebuilding this child, if rebuilding.
1207-
pub(crate) fn rebuild_job(&self) -> Option<std::sync::Arc<RebuildJob>> {
1208-
RebuildJob::lookup(&self.name).ok()
1207+
pub(crate) fn rebuild_job(
1208+
&self,
1209+
) -> Option<std::sync::Arc<NexusRebuildJob>> {
1210+
NexusRebuildJob::lookup(&self.name).ok()
12091211
}
12101212

12111213
/// Return the rebuild progress on this child, if rebuilding.

io-engine/src/eventing/nexus_events.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ use crate::{
1111
bdev::{nexus, nexus::NexusChild},
1212
core::{MayastorEnvironment, VerboseError},
1313
eventing::{Event, EventMetaGen, EventWithMeta},
14-
rebuild::{RebuildJob, RebuildState},
14+
rebuild::{NexusRebuildJob, RebuildState},
1515
};
1616

17-
impl EventMetaGen for RebuildJob {
17+
impl EventMetaGen for NexusRebuildJob {
1818
fn meta(&self) -> EventMeta {
1919
let rebuild_status = match self.state() {
2020
RebuildState::Init | RebuildState::Running => {

io-engine/src/grpc/v0/nexus_grpc.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::{
2121
PtplFileOps,
2222
},
2323
core::{Protocol, Share},
24-
rebuild::RebuildJob,
24+
rebuild::NexusRebuildJob,
2525
};
2626

2727
fn map_fault_reason(r: FaultReason) -> ChildStateReason {
@@ -137,7 +137,7 @@ impl<'n> Nexus<'n> {
137137
}
138138
children
139139
},
140-
rebuilds: RebuildJob::count() as u32,
140+
rebuilds: NexusRebuildJob::count() as u32,
141141
allowed_hosts: self.allowed_hosts(),
142142
}
143143
}
@@ -165,7 +165,7 @@ impl<'n> Nexus<'n> {
165165
}
166166
children
167167
},
168-
rebuilds: RebuildJob::count() as u32,
168+
rebuilds: NexusRebuildJob::count() as u32,
169169
ana_state: ana_state as i32,
170170
allowed_hosts: self.allowed_hosts(),
171171
}

io-engine/src/rebuild/bdev_rebuild.rs

+144
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
use std::{
2+
ops::{Deref, Range},
3+
rc::Rc,
4+
};
5+
6+
use super::{
7+
rebuild_descriptor::RebuildDescriptor,
8+
rebuild_error::RebuildError,
9+
rebuild_job_backend::RebuildBackend,
10+
rebuild_task::{RebuildTasks, TaskResult},
11+
RebuildJob,
12+
RebuildJobOptions,
13+
SEGMENT_TASKS,
14+
};
15+
16+
use crate::gen_rebuild_instances;
17+
18+
/// A Bdev rebuild job is responsible for managing a rebuild (copy) which reads
19+
/// from source_hdl and writes into destination_hdl from specified start to end.
20+
pub struct BdevRebuildJob(RebuildJob);
21+
22+
impl std::fmt::Debug for BdevRebuildJob {
23+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24+
self.0.fmt(f)
25+
}
26+
}
27+
impl Deref for BdevRebuildJob {
28+
type Target = RebuildJob;
29+
30+
fn deref(&self) -> &Self::Target {
31+
&self.0
32+
}
33+
}
34+
35+
impl BdevRebuildJob {
36+
/// Creates a new RebuildJob which rebuilds from source URI to target URI
37+
/// from start to end (of the data partition); notify_fn callback is called
38+
/// when the rebuild state is updated - with the source and destination
39+
/// bdev URI's as arguments.
40+
pub async fn new(
41+
src_uri: &str,
42+
dst_uri: &str,
43+
range: Option<Range<u64>>,
44+
options: RebuildJobOptions,
45+
notify_fn: fn(&str, &str) -> (),
46+
) -> Result<Self, RebuildError> {
47+
let descriptor =
48+
RebuildDescriptor::new(src_uri, dst_uri, range, options).await?;
49+
let tasks = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?;
50+
let backend =
51+
BdevRebuildJobBackend::new(tasks, notify_fn, descriptor).await?;
52+
53+
RebuildJob::from_backend(backend).await.map(Self)
54+
}
55+
}
56+
57+
gen_rebuild_instances!(BdevRebuildJob);
58+
59+
/// A rebuild job which is responsible for rebuilding from
60+
/// source to target of the `RebuildDescriptor`.
61+
pub(super) struct BdevRebuildJobBackend {
62+
/// The next block to be rebuilt.
63+
next: u64,
64+
/// A pool of tasks which perform the actual data rebuild.
65+
task_pool: RebuildTasks,
66+
/// A generic rebuild descriptor.
67+
descriptor: Rc<RebuildDescriptor>,
68+
/// Notification callback with src and dst uri's.
69+
notify_fn: fn(&str, &str) -> (),
70+
}
71+
72+
#[async_trait::async_trait(?Send)]
73+
impl RebuildBackend for BdevRebuildJobBackend {
74+
fn on_state_change(&mut self) {
75+
(self.notify_fn)(&self.descriptor.src_uri, &self.descriptor.dst_uri);
76+
}
77+
78+
fn common_desc(&self) -> &RebuildDescriptor {
79+
&self.descriptor
80+
}
81+
82+
fn task_pool(&self) -> &RebuildTasks {
83+
&self.task_pool
84+
}
85+
86+
fn schedule_task_by_id(&mut self, id: usize) -> bool {
87+
if self.next >= self.descriptor.range.end {
88+
false
89+
} else {
90+
let next = std::cmp::min(
91+
self.next + self.descriptor.segment_size_blks,
92+
self.descriptor.range.end,
93+
);
94+
self.task_pool.schedule_segment_rebuild(
95+
id,
96+
self.next,
97+
self.descriptor.clone(),
98+
);
99+
self.task_pool.active += 1;
100+
self.next = next;
101+
true
102+
}
103+
}
104+
105+
async fn await_one_task(&mut self) -> Option<TaskResult> {
106+
self.task_pool.await_one_task().await
107+
}
108+
}
109+
110+
impl std::fmt::Debug for BdevRebuildJobBackend {
111+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112+
f.debug_struct("BdevRebuildJob")
113+
.field("next", &self.next)
114+
.finish()
115+
}
116+
}
117+
impl std::fmt::Display for BdevRebuildJobBackend {
118+
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119+
Ok(())
120+
}
121+
}
122+
123+
impl BdevRebuildJobBackend {
124+
/// Creates a new RebuildJob which rebuilds from source URI to target URI
125+
/// from start to end (of the data partition); notify_fn callback is called
126+
/// when the rebuild state is updated - with the source and destination
127+
/// URI as arguments.
128+
pub async fn new(
129+
task_pool: RebuildTasks,
130+
notify_fn: fn(&str, &str) -> (),
131+
descriptor: RebuildDescriptor,
132+
) -> Result<Self, RebuildError> {
133+
let be = Self {
134+
next: descriptor.range.start,
135+
task_pool,
136+
descriptor: Rc::new(descriptor),
137+
notify_fn,
138+
};
139+
140+
info!("{be}: backend created");
141+
142+
Ok(be)
143+
}
144+
}

0 commit comments

Comments
 (0)