Skip to content

Commit 52ecffb

Browse files
Return lifetimes to Scope and friends (#785)
* Add scope lifetime to foundational types * Batch2 operators; those that return Self * Batch3: operator traits that must reflect lifetimes * Batch4: infrastructural operators/traits * Further tidying * Simplifications * Relax bounds on enter/leave
1 parent 2a4cb25 commit 52ecffb

40 files changed

Lines changed: 253 additions & 332 deletions

mdbook/src/chapter_4/chapter_4_4.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ One nice aspect of `capture_into` is that it really does reveal everything that
5656
At *its* core, `replay_into` takes some sequence of `Event<T, D>` items and reproduces the stream, as it was recorded. It is also fairly simple, and we can just look at its implementation as well:
5757

5858
```rust,ignore
59-
fn replay_into<T: Timestamp>(self, scope: &mut Scope<T>) -> Stream<T, C>{
59+
fn replay_into<'scope, T: Timestamp>(self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, C>{
6060
6161
let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());
6262
let (targets, stream) = builder.new_output();

timely/examples/unionfind.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ trait UnionFind {
5454
fn union_find(self) -> Self;
5555
}
5656

57-
impl<T: Timestamp> UnionFind for StreamVec<T, (usize, usize)> {
58-
fn union_find(self) -> StreamVec<T, (usize, usize)> {
57+
impl<'scope, T: Timestamp> UnionFind for StreamVec<'scope, T, (usize, usize)> {
58+
fn union_find(self) -> Self {
5959

6060
self.unary(Pipeline, "UnionFind", |_,_| {
6161

timely/src/dataflow/operators/core/capture/capture.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ pub trait Capture<T: Timestamp, C: Container> : Sized {
115115
}
116116
}
117117

118-
impl<T: Timestamp, C: Container> Capture<T, C> for Stream<T, C> {
118+
impl<T: Timestamp, C: Container> Capture<T, C> for Stream<'_, T, C> {
119119
fn capture_into<P: EventPusher<T, C>+'static>(self, mut event_pusher: P) {
120120

121121
let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());

timely/src/dataflow/operators/core/capture/replay.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,24 +51,24 @@ use crate::dataflow::channels::Message;
5151

5252
/// Replay a capture stream into a scope with the same timestamp.
5353
pub trait Replay<T: Timestamp, C> : Sized {
54-
/// Replays `self` into the provided scope, as a `Stream<T, C>`.
55-
fn replay_into(self, scope: &mut Scope<T>) -> Stream<T, C> {
54+
/// Replays `self` into the provided scope, as a `Stream<'scope, T, C>`.
55+
fn replay_into<'scope>(self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, C> {
5656
self.replay_core(scope, Some(std::time::Duration::new(0, 0)))
5757
}
58-
/// Replays `self` into the provided scope, as a `Stream<T, C>`.
58+
/// Replays `self` into the provided scope, as a `Stream<'scope, T, C>`.
5959
///
6060
/// The `period` argument allows the specification of a re-activation period, where the operator
6161
/// will re-activate itself every so often. The `None` argument instructs the operator not to
6262
/// re-activate itself.
63-
fn replay_core(self, scope: &mut Scope<T>, period: Option<std::time::Duration>) -> Stream<T, C>;
63+
fn replay_core<'scope>(self, scope: &mut Scope<'scope, T>, period: Option<std::time::Duration>) -> Stream<'scope, T, C>;
6464
}
6565

6666
impl<T: Timestamp, C: Container+Clone, I> Replay<T, C> for I
6767
where
6868
I : IntoIterator,
6969
<I as IntoIterator>::Item: EventIterator<T, C>+'static,
7070
{
71-
fn replay_core(self, scope: &mut Scope<T>, period: Option<std::time::Duration>) -> Stream<T, C>{
71+
fn replay_core<'scope>(self, scope: &mut Scope<'scope, T>, period: Option<std::time::Duration>) -> Stream<'scope, T, C>{
7272

7373
let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());
7474

timely/src/dataflow/operators/core/concat.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::dataflow::channels::pact::Pipeline;
66
use crate::dataflow::{Stream, Scope};
77

88
/// Merge the contents of two streams.
9-
pub trait Concat<T: Timestamp, C> {
9+
pub trait Concat{
1010
/// Merge the contents of two streams.
1111
///
1212
/// # Examples
@@ -22,17 +22,17 @@ pub trait Concat<T: Timestamp, C> {
2222
/// .inspect(|x| println!("seen: {:?}", x));
2323
/// });
2424
/// ```
25-
fn concat(self, other: Stream<T, C>) -> Stream<T, C>;
25+
fn concat(self, other: Self) -> Self;
2626
}
2727

28-
impl<T: Timestamp, C: Container> Concat<T, C> for Stream<T, C> {
29-
fn concat(self, other: Stream<T, C>) -> Stream<T, C> {
28+
impl<'scope, T: Timestamp, C: Container> Concat for Stream<'scope, T, C> {
29+
fn concat(self, other: Self) -> Self {
3030
self.scope().concatenate([self, other])
3131
}
3232
}
3333

3434
/// Merge the contents of multiple streams.
35-
pub trait Concatenate<T: Timestamp, C> {
35+
pub trait Concatenate<'scope, T: Timestamp> {
3636
/// Merge the contents of multiple streams.
3737
///
3838
/// # Examples
@@ -50,15 +50,15 @@ pub trait Concatenate<T: Timestamp, C> {
5050
/// .inspect(|x| println!("seen: {:?}", x));
5151
/// });
5252
/// ```
53-
fn concatenate<I>(&self, sources: I) -> Stream<T, C>
53+
fn concatenate<I, C: Container>(&self, sources: I) -> Stream<'scope, T, C>
5454
where
55-
I: IntoIterator<Item=Stream<T, C>>;
55+
I: IntoIterator<Item=Stream<'scope, T, C>>;
5656
}
5757

58-
impl<T: Timestamp, C: Container> Concatenate<T, C> for Scope<T> {
59-
fn concatenate<I>(&self, sources: I) -> Stream<T, C>
58+
impl<'scope, T: Timestamp> Concatenate<'scope, T> for Scope<'scope, T> {
59+
fn concatenate<I, C: Container>(&self, sources: I) -> Stream<'scope, T, C>
6060
where
61-
I: IntoIterator<Item=Stream<T, C>>
61+
I: IntoIterator<Item=Stream<'scope, T, C>>
6262
{
6363

6464
// create an operator builder.

timely/src/dataflow/operators/core/enterleave.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::worker::AsWorker;
3434
use crate::dataflow::{Stream, Scope};
3535

3636
/// Extension trait to move a `Stream` into a child of its current `Scope`.
37-
pub trait Enter<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, C> {
37+
pub trait Enter<'outer, TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, C> {
3838
/// Moves the `Stream` argument into a child of its current `Scope`.
3939
///
4040
/// The destination scope must be a child of the stream's scope.
@@ -51,16 +51,16 @@ pub trait Enter<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, C> {
5151
/// });
5252
/// });
5353
/// ```
54-
fn enter(self, inner: &Scope<TInner>) -> Stream<TInner, C>;
54+
fn enter<'inner>(self, inner: &Scope<'inner, TInner>) -> Stream<'inner, TInner, C>;
5555
}
5656

57-
impl<TOuter, TInner, C> Enter<TOuter, TInner, C> for Stream<TOuter, C>
57+
impl<'outer, TOuter, TInner, C> Enter<'outer, TOuter, TInner, C> for Stream<'outer, TOuter, C>
5858
where
5959
TOuter: Timestamp,
6060
TInner: Timestamp + Refines<TOuter>,
6161
C: Container,
6262
{
63-
fn enter(self, inner: &Scope<TInner>) -> Stream<TInner, C> {
63+
fn enter<'inner>(self, inner: &Scope<'inner, TInner>) -> Stream<'inner, TInner, C> {
6464

6565
use crate::scheduling::Scheduler;
6666

@@ -103,7 +103,7 @@ where
103103
}
104104

105105
/// Extension trait to move a `Stream` to the parent of its current `Scope`.
106-
pub trait Leave<TOuter: Timestamp, C> {
106+
pub trait Leave<'inner, TInner: Timestamp, C> {
107107
/// Moves a `Stream` to the parent of its current `Scope`.
108108
///
109109
/// The parent scope must be supplied as an argument.
@@ -122,16 +122,15 @@ pub trait Leave<TOuter: Timestamp, C> {
122122
/// });
123123
/// });
124124
/// ```
125-
fn leave(self, outer: &Scope<TOuter>) -> Stream<TOuter, C>;
125+
fn leave<'outer, TOuter: Timestamp>(self, outer: &Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter>;
126126
}
127127

128-
impl<TOuter, TInner, C> Leave<TOuter, C> for Stream<TInner, C>
128+
impl<'inner, TInner, C> Leave<'inner, TInner, C> for Stream<'inner, TInner, C>
129129
where
130-
TOuter: Timestamp,
131-
TInner: Timestamp + Refines<TOuter>,
130+
TInner: Timestamp,
132131
C: Container,
133132
{
134-
fn leave(self, outer: &Scope<TOuter>) -> Stream<TOuter, C> {
133+
fn leave<'outer, TOuter: Timestamp>(self, outer: &Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter> {
135134

136135
let scope = self.scope();
137136

timely/src/dataflow/operators/core/exchange.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub trait Exchange<C: DrainContainer> {
3030
for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static;
3131
}
3232

33-
impl<T: Timestamp, C> Exchange<C> for Stream<T, C>
33+
impl<T: Timestamp, C> Exchange<C> for Stream<'_, T, C>
3434
where
3535
C: Container
3636
+ SizableContainer
@@ -39,7 +39,7 @@ where
3939
+ crate::dataflow::channels::ContainerBytes
4040
+ for<'a> PushInto<C::Item<'a>>,
4141
{
42-
fn exchange<F>(self, route: F) -> Stream<T, C>
42+
fn exchange<F>(self, route: F) -> Self
4343
where
4444
for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static,
4545
{

timely/src/dataflow/operators/core/feedback.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::progress::frontier::Antichain;
1010
use crate::progress::{Timestamp, PathSummary};
1111

1212
/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
13-
pub trait Feedback<T: Timestamp> {
13+
pub trait Feedback<'scope, T: Timestamp> {
1414

1515
/// Creates a [Stream] and a [Handle] to later bind the source of that `Stream`.
1616
///
@@ -35,11 +35,11 @@ pub trait Feedback<T: Timestamp> {
3535
/// .connect_loop(handle);
3636
/// });
3737
/// ```
38-
fn feedback<C: Container>(&mut self, summary: <T as Timestamp>::Summary) -> (Handle<T, C>, Stream<T, C>);
38+
fn feedback<C: Container>(&mut self, summary: <T as Timestamp>::Summary) -> (Handle<'scope, T, C>, Stream<'scope, T, C>);
3939
}
4040

4141
/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
42-
pub trait LoopVariable<TOuter: Timestamp, TInner: Timestamp> {
42+
pub trait LoopVariable<'scope, TOuter: Timestamp, TInner: Timestamp> {
4343
/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
4444
///
4545
/// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
@@ -65,12 +65,12 @@ pub trait LoopVariable<TOuter: Timestamp, TInner: Timestamp> {
6565
/// });
6666
/// });
6767
/// ```
68-
fn loop_variable<C: Container>(&mut self, summary: TInner::Summary) -> (Handle<Product<TOuter, TInner>, C>, Stream<Product<TOuter, TInner>, C>);
68+
fn loop_variable<C: Container>(&mut self, summary: TInner::Summary) -> (Handle<'scope, Product<TOuter, TInner>, C>, Stream<'scope, Product<TOuter, TInner>, C>);
6969
}
7070

71-
impl<T: Timestamp> Feedback<T> for Scope<T> {
71+
impl<'scope, T: Timestamp> Feedback<'scope, T> for Scope<'scope, T> {
7272

73-
fn feedback<C: Container>(&mut self, summary: <T as Timestamp>::Summary) -> (Handle<T, C>, Stream<T, C>) {
73+
fn feedback<C: Container>(&mut self, summary: <T as Timestamp>::Summary) -> (Handle<'scope, T, C>, Stream<'scope, T, C>) {
7474

7575
let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
7676
let (output, stream) = builder.new_output();
@@ -79,14 +79,14 @@ impl<T: Timestamp> Feedback<T> for Scope<T> {
7979
}
8080
}
8181

82-
impl<TOuter: Timestamp, TInner: Timestamp> LoopVariable<TOuter, TInner> for Iterative<TOuter, TInner> {
83-
fn loop_variable<C: Container>(&mut self, summary: TInner::Summary) -> (Handle<Product<TOuter, TInner>, C>, Stream<Product<TOuter, TInner>, C>) {
82+
impl<'scope, TOuter: Timestamp, TInner: Timestamp> LoopVariable<'scope, TOuter, TInner> for Iterative<'scope, TOuter, TInner> {
83+
fn loop_variable<C: Container>(&mut self, summary: TInner::Summary) -> (Handle<'scope, Product<TOuter, TInner>, C>, Stream<'scope, Product<TOuter, TInner>, C>) {
8484
self.feedback(Product::new(Default::default(), summary))
8585
}
8686
}
8787

8888
/// Connect a `Stream` to the input of a loop variable.
89-
pub trait ConnectLoop<T: Timestamp, C: Container> {
89+
pub trait ConnectLoop<'scope, T: Timestamp, C: Container> {
9090
/// Connect a `Stream` to be the input of a loop variable.
9191
///
9292
/// # Examples
@@ -106,11 +106,11 @@ pub trait ConnectLoop<T: Timestamp, C: Container> {
106106
/// .connect_loop(handle);
107107
/// });
108108
/// ```
109-
fn connect_loop(self, handle: Handle<T, C>);
109+
fn connect_loop(self, handle: Handle<'scope, T, C>);
110110
}
111111

112-
impl<T: Timestamp, C: Container> ConnectLoop<T, C> for Stream<T, C> {
113-
fn connect_loop(self, handle: Handle<T, C>) {
112+
impl<'scope, T: Timestamp, C: Container> ConnectLoop<'scope, T, C> for Stream<'scope, T, C> {
113+
fn connect_loop(self, handle: Handle<'scope, T, C>) {
114114

115115
let mut builder = handle.builder;
116116
let summary = handle.summary;
@@ -133,8 +133,8 @@ impl<T: Timestamp, C: Container> ConnectLoop<T, C> for Stream<T, C> {
133133

134134
/// A handle used to bind the source of a loop variable.
135135
#[derive(Debug)]
136-
pub struct Handle<T: Timestamp, C: Container> {
137-
builder: OperatorBuilder<T>,
136+
pub struct Handle<'scope, T: Timestamp, C: Container> {
137+
builder: OperatorBuilder<'scope, T>,
138138
summary: <T as Timestamp>::Summary,
139139
output: crate::dataflow::channels::pushers::Output<T, C>,
140140
}

timely/src/dataflow/operators/core/filter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ pub trait Filter<C: DrainContainer> {
2424
fn filter<P: FnMut(&C::Item<'_>)->bool+'static>(self, predicate: P) -> Self;
2525
}
2626

27-
impl<T: Timestamp, C: Container + SizableContainer + DrainContainer> Filter<C> for Stream<T, C>
27+
impl<T: Timestamp, C: Container + SizableContainer + DrainContainer> Filter<C> for Stream<'_, T, C>
2828
where
2929
for<'a> C: PushInto<C::Item<'a>>
3030
{
31-
fn filter<P: FnMut(&C::Item<'_>)->bool+'static>(self, mut predicate: P) -> Stream<T, C> {
31+
fn filter<P: FnMut(&C::Item<'_>)->bool+'static>(self, mut predicate: P) -> Self {
3232
self.unary(Pipeline, "Filter", move |_,_| move |input, output| {
3333
input.for_each_time(|time, data| {
3434
output.session(&time)

timely/src/dataflow/operators/core/input.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::dataflow::channels::Message;
2626
// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
2727

2828
/// Create a new `Stream` and `Handle` through which to supply input.
29-
pub trait Input {
29+
pub trait Input<'scope> {
3030
/// The timestamp at which this input scope operates.
3131
type Timestamp: Timestamp;
3232

@@ -63,7 +63,7 @@ pub trait Input {
6363
/// }
6464
/// });
6565
/// ```
66-
fn new_input<C: Container+Clone>(&mut self) -> (Handle<Self::Timestamp, CapacityContainerBuilder<C>>, Stream<Self::Timestamp, C>);
66+
fn new_input<C: Container+Clone>(&mut self) -> (Handle<Self::Timestamp, CapacityContainerBuilder<C>>, Stream<'scope, Self::Timestamp, C>);
6767

6868
/// Create a new [Stream] and [Handle] through which to supply input.
6969
///
@@ -100,7 +100,7 @@ pub trait Input {
100100
/// }
101101
/// });
102102
/// ```
103-
fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&mut self) -> (Handle<Self::Timestamp, CB>, Stream<Self::Timestamp, CB::Container>);
103+
fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&mut self) -> (Handle<Self::Timestamp, CB>, Stream<'scope, Self::Timestamp, CB::Container>);
104104

105105
/// Create a new stream from a supplied interactive handle.
106106
///
@@ -133,25 +133,25 @@ pub trait Input {
133133
/// }
134134
/// });
135135
/// ```
136-
fn input_from<CB: ContainerBuilder<Container: Clone>>(&mut self, handle: &mut Handle<Self::Timestamp, CB>) -> Stream<Self::Timestamp, CB::Container>;
136+
fn input_from<CB: ContainerBuilder<Container: Clone>>(&mut self, handle: &mut Handle<Self::Timestamp, CB>) -> Stream<'scope, Self::Timestamp, CB::Container>;
137137
}
138138

139139
use crate::order::TotalOrder;
140-
impl<T: Timestamp + TotalOrder> Input for Scope<T> {
140+
impl<'scope, T: Timestamp + TotalOrder> Input<'scope> for Scope<'scope, T> {
141141
type Timestamp = T;
142-
fn new_input<C: Container+Clone>(&mut self) -> (Handle<T, CapacityContainerBuilder<C>>, Stream<T, C>) {
142+
fn new_input<C: Container+Clone>(&mut self) -> (Handle<T, CapacityContainerBuilder<C>>, Stream<'scope, T, C>) {
143143
let mut handle = Handle::new();
144144
let stream = self.input_from(&mut handle);
145145
(handle, stream)
146146
}
147147

148-
fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&mut self) -> (Handle<T, CB>, Stream<T, CB::Container>) {
148+
fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&mut self) -> (Handle<T, CB>, Stream<'scope, T, CB::Container>) {
149149
let mut handle = Handle::new_with_builder();
150150
let stream = self.input_from(&mut handle);
151151
(handle, stream)
152152
}
153153

154-
fn input_from<CB: ContainerBuilder<Container: Clone>>(&mut self, handle: &mut Handle<T, CB>) -> Stream<T, CB::Container> {
154+
fn input_from<CB: ContainerBuilder<Container: Clone>>(&mut self, handle: &mut Handle<T, CB>) -> Stream<'scope, T, CB::Container> {
155155
let (output, registrar) = Tee::<T, CB::Container>::new();
156156
let counter = Counter::new(output);
157157
let produced = Rc::clone(counter.produced());
@@ -338,7 +338,7 @@ impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB> {
338338
/// }
339339
/// });
340340
/// ```
341-
pub fn to_stream(&mut self, scope: &mut Scope<T>) -> Stream<T, CB::Container>
341+
pub fn to_stream<'scope>(&mut self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, CB::Container>
342342
where
343343
T: TotalOrder,
344344
{

0 commit comments

Comments
 (0)