Skip to content

Commit 549a2af

Browse files
committed
Implement Copy for Scope
1 parent 734d37f commit 549a2af

16 files changed

Lines changed: 49 additions & 63 deletions

File tree

timely/src/dataflow/operators/core/capture/replay.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,25 +51,25 @@ use crate::dataflow::channels::Message;
5151
/// Replay a capture stream into a scope with the same timestamp.
5252
pub trait Replay<T: Timestamp, C> : Sized {
5353
/// Replays `self` into the provided scope, as a `Stream<'scope, T, C>`.
54-
fn replay_into<'scope>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, C> {
54+
fn replay_into<'scope>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C> {
5555
self.replay_core(scope, Some(std::time::Duration::new(0, 0)))
5656
}
5757
/// Replays `self` into the provided scope, as a `Stream<'scope, T, C>`.
5858
///
5959
/// The `period` argument allows the specification of a re-activation period, where the operator
6060
/// will re-activate itself every so often. The `None` argument instructs the operator not to
6161
/// re-activate itself.
62-
fn replay_core<'scope>(self, scope: &Scope<'scope, T>, period: Option<std::time::Duration>) -> Stream<'scope, T, C>;
62+
fn replay_core<'scope>(self, scope: Scope<'scope, T>, period: Option<std::time::Duration>) -> Stream<'scope, T, C>;
6363
}
6464

6565
impl<T: Timestamp, C: Container+Clone, I> Replay<T, C> for I
6666
where
6767
I : IntoIterator,
6868
<I as IntoIterator>::Item: EventIterator<T, C>+'static,
6969
{
70-
fn replay_core<'scope>(self, scope: &Scope<'scope, T>, period: Option<std::time::Duration>) -> Stream<'scope, T, C>{
70+
fn replay_core<'scope>(self, scope: Scope<'scope, T>, period: Option<std::time::Duration>) -> Stream<'scope, T, C>{
7171

72-
let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());
72+
let mut builder = OperatorBuilder::new("Replay".to_owned(), scope);
7373

7474
let address = builder.operator_info().address;
7575
let activator = scope.activator_for(address);

timely/src/dataflow/operators/core/concat.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl<'scope, T: Timestamp> Concatenate<'scope, T> for Scope<'scope, T> {
6363

6464
// create an operator builder.
6565
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
66-
let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
66+
let mut builder = OperatorBuilder::new("Concatenate".to_string(), *self);
6767

6868
// create new input handles for each input stream.
6969
let mut handles = sources.into_iter().map(|s| builder.new_input(s, Pipeline)).collect::<Vec<_>>();

timely/src/dataflow/operators/core/enterleave.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub trait Enter<'outer, TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, C>
5050
/// });
5151
/// });
5252
/// ```
53-
fn enter<'inner>(self, inner: &Scope<'inner, TInner>) -> Stream<'inner, TInner, C>;
53+
fn enter<'inner>(self, inner: Scope<'inner, TInner>) -> Stream<'inner, TInner, C>;
5454
}
5555

5656
impl<'outer, TOuter, TInner, C> Enter<'outer, TOuter, TInner, C> for Stream<'outer, TOuter, C>
@@ -59,7 +59,7 @@ where
5959
TInner: Timestamp + Refines<TOuter>,
6060
C: Container,
6161
{
62-
fn enter<'inner>(self, inner: &Scope<'inner, TInner>) -> Stream<'inner, TInner, C> {
62+
fn enter<'inner>(self, inner: Scope<'inner, TInner>) -> Stream<'inner, TInner, C> {
6363

6464
// Validate that `inner` is a child of `self`'s scope.
6565
let inner_addr = inner.addr();
@@ -91,11 +91,7 @@ where
9191
self.connect_to(input, ingress, channel_id);
9292
}
9393

94-
Stream::new(
95-
Source::new(0, input.port),
96-
registrar,
97-
inner.clone(),
98-
)
94+
Stream::new(Source::new(0, input.port), registrar, inner)
9995
}
10096
}
10197

@@ -119,15 +115,15 @@ pub trait Leave<'inner, TInner: Timestamp, C> {
119115
/// });
120116
/// });
121117
/// ```
122-
fn leave<'outer, TOuter: Timestamp>(self, outer: &Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter>;
118+
fn leave<'outer, TOuter: Timestamp>(self, outer: Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter>;
123119
}
124120

125121
impl<'inner, TInner, C> Leave<'inner, TInner, C> for Stream<'inner, TInner, C>
126122
where
127123
TInner: Timestamp,
128124
C: Container,
129125
{
130-
fn leave<'outer, TOuter: Timestamp>(self, outer: &Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter> {
126+
fn leave<'outer, TOuter: Timestamp>(self, outer: Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter> {
131127

132128
let scope = self.scope();
133129

@@ -156,11 +152,7 @@ where
156152
self.connect_to(target, egress, channel_id);
157153
}
158154

159-
Stream::new(
160-
output,
161-
registrar,
162-
outer.clone(),
163-
)
155+
Stream::new(output, registrar, outer)
164156
}
165157
}
166158

timely/src/dataflow/operators/core/feedback.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl<'scope, T: Timestamp> Feedback<'scope, T> for Scope<'scope, T> {
7272

7373
fn feedback<C: Container>(&self, summary: <T as Timestamp>::Summary) -> (Handle<'scope, T, C>, Stream<'scope, T, C>) {
7474

75-
let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
75+
let mut builder = OperatorBuilder::new("Feedback".to_owned(), *self);
7676
let (output, stream) = builder.new_output();
7777

7878
(Handle { builder, summary, output }, stream)

timely/src/dataflow/operators/core/input.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ impl<'scope, T: Timestamp + TotalOrder> Input<'scope> for Scope<'scope, T> {
175175
copies,
176176
}));
177177

178-
Stream::new(Source::new(index, 0), registrar, self.clone())
178+
Stream::new(Source::new(index, 0), registrar, *self)
179179
}
180180
}
181181

@@ -336,7 +336,7 @@ impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB> {
336336
/// }
337337
/// });
338338
/// ```
339-
pub fn to_stream<'scope>(&mut self, scope: &Scope<'scope, T>) -> Stream<'scope, T, CB::Container>
339+
pub fn to_stream<'scope>(&mut self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>
340340
where
341341
T: TotalOrder,
342342
{

timely/src/dataflow/operators/core/to_stream.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ pub trait ToStreamBuilder<CB: ContainerBuilder> {
2929
///
3030
/// assert_eq!(data1.extract(), data2.extract());
3131
/// ```
32-
fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, CB::Container>;
32+
fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>;
3333
}
3434

3535
impl<CB: ContainerBuilder, I: IntoIterator+'static> ToStreamBuilder<CB> for I where CB: PushInto<I::Item> {
36-
fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, CB::Container> {
36+
fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container> {
3737

3838
source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| {
3939

@@ -79,11 +79,11 @@ pub trait ToStream<C> {
7979
///
8080
/// assert_eq!(data1.extract(), data2.extract());
8181
/// ```
82-
fn to_stream<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, C>;
82+
fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C>;
8383
}
8484

8585
impl<C: Container + SizableContainer, I: IntoIterator+'static> ToStream<C> for I where C: PushInto<I::Item> {
86-
fn to_stream<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, C> {
86+
fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C> {
8787
ToStreamBuilder::<CapacityContainerBuilder<C>>::to_stream_with_builder(self, scope)
8888
}
8989
}

timely/src/dataflow/operators/core/unordered_input.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ impl<'scope, T: Timestamp> UnorderedInput<'scope, T> for Scope<'scope, T> {
105105
peers,
106106
}));
107107

108-
((helper, cap), Stream::new(Source::new(index, 0), registrar, self.clone()))
108+
((helper, cap), Stream::new(Source::new(index, 0), registrar, *self))
109109
}
110110
}
111111

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
135135
self.shape.outputs += 1;
136136
let (target, registrar) = Tee::new();
137137
let source = Source::new(self.slot.index(), new_output);
138-
let stream = Stream::new(source, registrar, self.scope.clone());
138+
let stream = Stream::new(source, registrar, self.scope);
139139

140140
for (input, entry) in connection {
141141
self.summary[input].add_port(new_output, entry);

timely/src/dataflow/operators/generic/operator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -537,13 +537,13 @@ impl<'scope, T: Timestamp, C1: Container> Operator<'scope, T, C1> for Stream<'sc
537537
/// .inspect(|x| println!("number: {:?}", x));
538538
/// });
539539
/// ```
540-
pub fn source<'scope, T: Timestamp, CB, B, L>(scope: &Scope<'scope, T>, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
540+
pub fn source<'scope, T: Timestamp, CB, B, L>(scope: Scope<'scope, T>, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
541541
where
542542
CB: ContainerBuilder,
543543
B: FnOnce(Capability<T>, OperatorInfo) -> L,
544544
L: FnMut(&mut OutputBuilderSession<'_, T, CB>)+'static {
545545

546-
let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone());
546+
let mut builder = OperatorBuilder::new(name.to_owned(), scope);
547547
let operator_info = builder.operator_info();
548548

549549
let (output, stream) = builder.new_output();
@@ -581,7 +581,7 @@ where
581581
///
582582
/// });
583583
/// ```
584-
pub fn empty<'scope, T: Timestamp, C: Container>(scope: &Scope<'scope, T>) -> Stream<'scope, T, C> {
584+
pub fn empty<'scope, T: Timestamp, C: Container>(scope: Scope<'scope, T>) -> Stream<'scope, T, C> {
585585
source::<_, CapacityContainerBuilder<C>, _, _>(scope, "Empty", |_capability, _info| |_output| {
586586
// drop capability, do nothing
587587
})

timely/src/dataflow/operators/vec/flow_controlled.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub fn iterator_source<
7575
DI: IntoIterator<Item=D>,
7676
I: IntoIterator<Item=(T, DI)>,
7777
F: FnMut(&T)->Option<IteratorSourceInput<T, D, DI, I>>+'static>(
78-
scope: &Scope<'scope, T>,
78+
scope: Scope<'scope, T>,
7979
name: &str,
8080
mut input_f: F,
8181
probe: Handle<T>,

0 commit comments

Comments
 (0)