1
1
//! Read from sync, send to tokio sender
2
- use std:: result;
2
+ use std:: { future :: Future , result} ;
3
3
4
4
use bytes:: Bytes ;
5
5
use iroh_blake3 as blake3;
6
6
use iroh_blake3:: guts:: parent_cv;
7
+ use serde:: { Deserialize , Serialize } ;
7
8
use smallvec:: SmallVec ;
8
9
9
10
use super :: { sync:: Outboard , EncodeError , Leaf , Parent } ;
@@ -13,7 +14,7 @@ use crate::{
13
14
} ;
14
15
15
16
/// A content item for the bao streaming protocol.
16
- #[ derive( Debug ) ]
17
+ #[ derive( Debug , Serialize , Deserialize ) ]
17
18
pub enum EncodedItem {
18
19
/// total data size, will be the first item
19
20
Size ( u64 ) ,
@@ -45,28 +46,52 @@ impl From<EncodeError> for EncodedItem {
45
46
}
46
47
}
47
48
49
+ /// Abstract sender trait for sending encoded items
50
+ pub trait Sender {
51
+ /// Error type
52
+ type Error ;
53
+ /// Send an item
54
+ fn send (
55
+ & mut self ,
56
+ item : EncodedItem ,
57
+ ) -> impl Future < Output = std:: result:: Result < ( ) , Self :: Error > > + ' _ ;
58
+ }
59
+
60
+ impl Sender for tokio:: sync:: mpsc:: Sender < EncodedItem > {
61
+ type Error = tokio:: sync:: mpsc:: error:: SendError < EncodedItem > ;
62
+ fn send (
63
+ & mut self ,
64
+ item : EncodedItem ,
65
+ ) -> impl Future < Output = std:: result:: Result < ( ) , Self :: Error > > + ' _ {
66
+ tokio:: sync:: mpsc:: Sender :: send ( self , item)
67
+ }
68
+ }
69
+
48
70
/// Traverse ranges relevant to a query from a reader and outboard to a stream
49
71
///
50
72
/// This function validates the data before writing.
51
73
///
52
74
/// It is possible to encode ranges from a partial file and outboard.
53
75
/// This will either succeed if the requested ranges are all present, or fail
54
76
/// as soon as a range is missing.
55
- pub async fn traverse_ranges_validated < D : ReadBytesAt , O : Outboard > (
77
+ pub async fn traverse_ranges_validated < D , O , F > (
56
78
data : D ,
57
79
outboard : O ,
58
80
ranges : & ChunkRangesRef ,
59
- encoded : & tokio:: sync:: mpsc:: Sender < EncodedItem > ,
60
- ) {
61
- encoded
62
- . send ( EncodedItem :: Size ( outboard. tree ( ) . size ( ) ) )
63
- . await
64
- . ok ( ) ;
65
- let res = match traverse_ranges_validated_impl ( data, outboard, ranges, encoded) . await {
66
- Ok ( ( ) ) => EncodedItem :: Done ,
81
+ send : & mut F ,
82
+ ) -> std:: result:: Result < ( ) , F :: Error >
83
+ where
84
+ D : ReadBytesAt ,
85
+ O : Outboard ,
86
+ F : Sender ,
87
+ {
88
+ send. send ( EncodedItem :: Size ( outboard. tree ( ) . size ( ) ) ) . await ?;
89
+ let res = match traverse_ranges_validated_impl ( data, outboard, ranges, send) . await {
90
+ Ok ( Ok ( ( ) ) ) => EncodedItem :: Done ,
67
91
Err ( cause) => EncodedItem :: Error ( cause) ,
92
+ Ok ( Err ( err) ) => return Err ( err) ,
68
93
} ;
69
- encoded . send ( res) . await . ok ( ) ;
94
+ send . send ( res) . await
70
95
}
71
96
72
97
/// Encode ranges relevant to a query from a reader and outboard to a writer
@@ -76,14 +101,19 @@ pub async fn traverse_ranges_validated<D: ReadBytesAt, O: Outboard>(
76
101
/// It is possible to encode ranges from a partial file and outboard.
77
102
/// This will either succeed if the requested ranges are all present, or fail
78
103
/// as soon as a range is missing.
79
- async fn traverse_ranges_validated_impl < D : ReadBytesAt , O : Outboard > (
104
+ async fn traverse_ranges_validated_impl < D , O , F > (
80
105
data : D ,
81
106
outboard : O ,
82
107
ranges : & ChunkRangesRef ,
83
- encoded : & tokio:: sync:: mpsc:: Sender < EncodedItem > ,
84
- ) -> result:: Result < ( ) , EncodeError > {
108
+ send : & mut F ,
109
+ ) -> result:: Result < std:: result:: Result < ( ) , F :: Error > , EncodeError >
110
+ where
111
+ D : ReadBytesAt ,
112
+ O : Outboard ,
113
+ F : Sender ,
114
+ {
85
115
if ranges. is_empty ( ) {
86
- return Ok ( ( ) ) ;
116
+ return Ok ( Ok ( ( ) ) ) ;
87
117
}
88
118
let mut stack: SmallVec < [ _ ; 10 ] > = SmallVec :: < [ blake3:: Hash ; 10 ] > :: new ( ) ;
89
119
stack. push ( outboard. root ( ) ) ;
@@ -112,16 +142,13 @@ async fn traverse_ranges_validated_impl<D: ReadBytesAt, O: Outboard>(
112
142
if left {
113
143
stack. push ( l_hash) ;
114
144
}
115
- encoded
116
- . send (
117
- Parent {
118
- node,
119
- pair : ( l_hash, r_hash) ,
120
- }
121
- . into ( ) ,
122
- )
123
- . await
124
- . ok ( ) ;
145
+ let item = Parent {
146
+ node,
147
+ pair : ( l_hash, r_hash) ,
148
+ } ;
149
+ if let Err ( e) = send. send ( item. into ( ) ) . await {
150
+ return Ok ( Err ( e) ) ;
151
+ }
125
152
}
126
153
BaoChunk :: Leaf {
127
154
start_chunk,
@@ -152,7 +179,9 @@ async fn traverse_ranges_validated_impl<D: ReadBytesAt, O: Outboard>(
152
179
return Err ( EncodeError :: LeafHashMismatch ( start_chunk) ) ;
153
180
}
154
181
for item in out_buf. into_iter ( ) {
155
- encoded. send ( item) . await . ok ( ) ;
182
+ if let Err ( e) = send. send ( item) . await {
183
+ return Ok ( Err ( e) ) ;
184
+ }
156
185
}
157
186
} else {
158
187
let actual = hash_subtree ( start_chunk. 0 , & buffer, is_root) ;
@@ -164,12 +193,14 @@ async fn traverse_ranges_validated_impl<D: ReadBytesAt, O: Outboard>(
164
193
data : buffer,
165
194
offset : start_chunk. to_bytes ( ) ,
166
195
} ;
167
- encoded. send ( item. into ( ) ) . await . ok ( ) ;
196
+ if let Err ( e) = send. send ( item. into ( ) ) . await {
197
+ return Ok ( Err ( e) ) ;
198
+ }
168
199
} ;
169
200
}
170
201
}
171
202
}
172
- Ok ( ( ) )
203
+ Ok ( Ok ( ( ) ) )
173
204
}
174
205
175
206
/// Encode ranges relevant to a query from a slice and outboard to a buffer.
@@ -299,11 +330,13 @@ mod tests {
299
330
async fn smoke ( ) {
300
331
let data = [ 0u8 ; 100000 ] ;
301
332
let outboard = PreOrderMemOutboard :: create ( data, BlockSize :: from_chunk_log ( 4 ) ) ;
302
- let ( tx, mut rx) = tokio:: sync:: mpsc:: channel ( 10 ) ;
333
+ let ( mut tx, mut rx) = tokio:: sync:: mpsc:: channel ( 10 ) ;
303
334
let mut encoded = Vec :: new ( ) ;
304
335
encode_ranges_validated ( & data[ ..] , & outboard, & ChunkRanges :: empty ( ) , & mut encoded) . unwrap ( ) ;
305
336
tokio:: spawn ( async move {
306
- traverse_ranges_validated ( & data[ ..] , & outboard, & ChunkRanges :: empty ( ) , & tx) . await ;
337
+ traverse_ranges_validated ( & data[ ..] , & outboard, & ChunkRanges :: empty ( ) , & mut tx)
338
+ . await
339
+ . unwrap ( ) ;
307
340
} ) ;
308
341
let mut res = Vec :: new ( ) ;
309
342
while let Some ( item) = rx. recv ( ) . await {
0 commit comments