-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathsideinput_example.py
More file actions
145 lines (114 loc) · 4.6 KB
/
sideinput_example.py
File metadata and controls
145 lines (114 loc) · 4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""
Side Input Example for pynumaflow-lite.
This module contains both a SideInput retriever and a Mapper that reads from side inputs.
The mode is controlled by the MAPPER environment variable:
- If MAPPER is set to "true", runs as a Mapper that reads side input files
- Otherwise, runs as a SideInput retriever that broadcasts values
"""
import asyncio
import os
import signal
import threading
from threading import Thread
import datetime
from pynumaflow_lite import sideinputer, mapper
from watchfiles import watch
class ExampleSideInput(sideinputer.SideInput):
"""
A SideInput retriever that broadcasts a timestamp message every time.
"""
def __init__(self):
self.counter = 0
async def retrieve_handler(self) -> sideinputer.Response:
"""
This function is called every time the side input is requested.
"""
time_now = datetime.datetime.now()
# val is the value to be broadcasted
val = f"an example: {str(time_now)}"
self.counter += 1
# broadcast_message() is used to indicate that there is a broadcast
return sideinputer.Response.broadcast_message(val.encode("utf-8"))
class SideInputHandler(mapper.Mapper):
"""
A Mapper that reads from side input files and includes the value in its output.
"""
# variable and lock for thread safety
data_value = "no_value"
data_value_lock = threading.Lock()
# Side input file that we are watching
watched_file = "myticker"
async def handler(self, keys: list[str], datum: mapper.Datum) -> mapper.Messages:
with self.data_value_lock:
current_value = self.data_value
messages = mapper.Messages()
messages.append(mapper.Message(str.encode(current_value)))
return messages
def file_watcher(self):
"""
This function is used to watch the side input directory for changes.
"""
path = sideinputer.DIR_PATH
for changes in watch(path):
for change in changes:
change_type, file_path = change
if file_path.endswith(self.watched_file):
with self.data_value_lock:
self.update_data_from_file(file_path)
def init_data_value(self):
"""Read the SIDE INPUT FILE for initial value before starting the server."""
path = os.path.join(sideinputer.DIR_PATH, self.watched_file)
print(f"Initializing side input from: {path}")
self.update_data_from_file(path)
def update_data_from_file(self, path):
try:
with open(path) as file:
value = file.read().strip()
self.data_value = value
print(f"Data value variable set to: {self.data_value}")
except Exception as e:
print(f"Error reading file: {e}")
# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.
signal.signal(signal.SIGINT, signal.default_int_handler)
try:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
except AttributeError:
pass
async def start_sideinput():
"""Start the SideInput retriever server."""
server = sideinputer.SideInputAsyncServer()
side_input = ExampleSideInput()
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, lambda: server.stop())
loop.add_signal_handler(signal.SIGTERM, lambda: server.stop())
try:
await server.start(side_input)
print("SideInput server shutting down gracefully...")
except asyncio.CancelledError:
server.stop()
async def start_mapper():
"""Start the Mapper server that reads from side inputs."""
server = mapper.MapAsyncServer()
handler = SideInputHandler()
# Initialize the data value from the side input file
handler.init_data_value()
# Start the file watcher in a background thread
watcher_thread = Thread(target=handler.file_watcher, daemon=True)
watcher_thread.start()
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, lambda: server.stop())
loop.add_signal_handler(signal.SIGTERM, lambda: server.stop())
try:
await server.start(handler)
print("Mapper server shutting down gracefully...")
except asyncio.CancelledError:
server.stop()
if __name__ == "__main__":
# Check if we should run as a mapper or side input retriever
is_mapper = os.environ.get("MAPPER", "").lower() == "true"
if is_mapper:
print("Starting as Mapper (reading side inputs)...")
asyncio.run(start_mapper())
else:
print("Starting as SideInput retriever...")
asyncio.run(start_sideinput())