Skip to content

Commit 3e01da2

Browse files
authored
chore: Datum keys as property (#239)
Signed-off-by: Takashi Menjo <takashi.menjo+github@gmail.com>
1 parent 4982414 commit 3e01da2

15 files changed

Lines changed: 23 additions & 17 deletions

File tree

pynumaflow/accumulator/_dtypes.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def __init__(
7676
self._headers = headers or {}
7777
self._id = id_
7878

79+
@property
7980
def keys(self) -> list[str]:
8081
"""Returns the keys of the event.
8182
@@ -488,7 +489,7 @@ def from_datum(cls, datum: Datum):
488489
"""
489490
return cls(
490491
value=datum.value,
491-
keys=datum.keys(),
492+
keys=datum.keys,
492493
watermark=datum.watermark,
493494
event_time=datum.event_time,
494495
headers=datum.headers,

pynumaflow/accumulator/servicer/task_manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ async def close_task(self, req):
106106
4. Remove the task from the tracker
107107
"""
108108
d = req.payload
109-
keys = d.keys()
109+
keys = d.keys
110110
unified_key = build_unique_key_name(keys)
111111
curr_task = self.tasks.get(unified_key, None)
112112

@@ -128,7 +128,7 @@ async def create_task(self, req):
128128
it creates a new task or appends the request to the existing task.
129129
"""
130130
d = req.payload
131-
keys = d.keys()
131+
keys = d.keys
132132
unified_key = build_unique_key_name(keys)
133133
curr_task = self.tasks.get(unified_key, None)
134134

@@ -179,7 +179,7 @@ async def send_datum_to_task(self, req):
179179
If the task does not exist, create it.
180180
"""
181181
d = req.payload
182-
keys = d.keys()
182+
keys = d.keys
183183
unified_key = build_unique_key_name(keys)
184184
result = self.tasks.get(unified_key, None)
185185
if not result:

pynumaflow/batchmapper/_dtypes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ def __init__(
9999
self._watermark = watermark
100100
self._headers = headers or {}
101101

102+
@property
102103
def keys(self) -> list[str]:
103104
"""Returns the keys of the event"""
104105
return self._keys

pynumaflow/mapstreamer/_dtypes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ def __init__(
151151
self._watermark = watermark
152152
self._headers = headers or {}
153153

154+
@property
154155
def keys(self) -> list[str]:
155156
"""Returns the keys of the event"""
156157
return self._keys

pynumaflow/reducer/_dtypes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ def __init__(
167167
self._watermark = watermark
168168
self._headers = headers or {}
169169

170+
@property
170171
def keys(self) -> list[str]:
171172
"""Returns the keys of the event"""
172173
return self._keys

pynumaflow/reducer/servicer/task_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ async def create_task(self, req):
108108
raise UDFError("reduce create operation error: invalid number of windows")
109109

110110
d = req.payload
111-
keys = d.keys()
111+
keys = d.keys
112112
unified_key = build_unique_key_name(keys, req.windows[0])
113113
result = self.tasks.get(unified_key, None)
114114

@@ -137,7 +137,7 @@ async def append_task(self, req):
137137
if len(req.windows) != 1:
138138
raise UDFError("reduce create operation error: invalid number of windows")
139139
d = req.payload
140-
keys = d.keys()
140+
keys = d.keys
141141
unified_key = build_unique_key_name(keys, req.windows[0])
142142
result = self.tasks.get(unified_key, None)
143143
if not result:

pynumaflow/reducestreamer/_dtypes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ def __init__(
7373
self._watermark = watermark
7474
self._headers = headers or {}
7575

76+
@property
7677
def keys(self) -> list[str]:
7778
"""Returns the keys of the event"""
7879
return self._keys

pynumaflow/reducestreamer/servicer/task_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ async def create_task(self, req):
110110
raise UDFError("reduce create operation error: invalid number of windows")
111111

112112
d = req.payload
113-
keys = d.keys()
113+
keys = d.keys
114114
unified_key = build_unique_key_name(keys, req.windows[0])
115115
curr_task = self.tasks.get(unified_key, None)
116116

@@ -161,7 +161,7 @@ async def send_datum_to_task(self, req):
161161
if len(req.windows) != 1:
162162
raise UDFError("reduce append operation error: invalid number of windows")
163163
d = req.payload
164-
keys = d.keys()
164+
keys = d.keys
165165
unified_key = build_unique_key_name(keys, req.windows[0])
166166
result = self.tasks.get(unified_key, None)
167167
if not result:

pynumaflow/sourcetransformer/_dtypes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ def __init__(
160160
self._watermark = watermark
161161
self._headers = headers or {}
162162

163+
@property
163164
def keys(self) -> list[str]:
164165
"""Returns the keys of the event"""
165166
return self._keys

tests/accumulator/test_async_accumulator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,15 +140,15 @@ async def handler(self, datums: AsyncIterable[Datum], output: NonBlockingIterato
140140
async for datum in datums:
141141
self.counter += 1
142142
msg = f"counter:{self.counter}"
143-
await output.put(Message(str.encode(msg), keys=datum.keys(), tags=[]))
143+
await output.put(Message(str.encode(msg), keys=datum.keys, tags=[]))
144144

145145

146146
async def accumulator_handler_func(datums: AsyncIterable[Datum], output: NonBlockingIterator):
147147
counter = 0
148148
async for datum in datums:
149149
counter += 1
150150
msg = f"counter:{counter}"
151-
await output.put(Message(str.encode(msg), keys=datum.keys(), tags=[]))
151+
await output.put(Message(str.encode(msg), keys=datum.keys, tags=[]))
152152

153153

154154
def NewAsyncAccumulator():

0 commit comments

Comments
 (0)