-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathsimple_source.py
More file actions
130 lines (105 loc) · 3.93 KB
/
simple_source.py
File metadata and controls
130 lines (105 loc) · 3.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import asyncio
import logging
import signal
from datetime import datetime, timezone
from collections.abc import AsyncIterator
from pynumaflow_lite import sourcer
from pynumaflow_lite._source_dtypes import Sourcer
# Configure logging
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
class SimpleSource(Sourcer):
"""
Simple source that generates messages with incrementing numbers.
This is a class-based user-defined source implementation.
"""
def __init__(self):
self.counter = 0
self.partition_idx = 0
async def read_handler(
self, datum: sourcer.ReadRequest
) -> AsyncIterator[sourcer.Message]:
"""
The simple source generates messages with incrementing numbers.
"""
_LOGGER.info(
f"Read request: num_records={datum.num_records}, timeout_ms={datum.timeout_ms}"
)
# Generate the requested number of messages
for i in range(datum.num_records):
# Create message payload
payload = f"message-{self.counter}".encode("utf-8")
# Create offset
offset = sourcer.Offset(
offset=str(self.counter).encode("utf-8"),
partition_id=self.partition_idx,
)
# Create message
message = sourcer.Message(
payload=payload,
offset=offset,
event_time=datetime.now(timezone.utc),
keys=["key1"],
headers={"source": "simple"},
)
_LOGGER.info(f"Generated message: {self.counter}")
self.counter += 1
yield message
# Small delay to simulate real source
await asyncio.sleep(0.1)
async def ack_handler(self, request: sourcer.AckRequest) -> None:
"""
The simple source acknowledges the offsets.
"""
_LOGGER.info(f"Acknowledging {len(request.offsets)} offsets")
for offset in request.offsets:
_LOGGER.debug(
f"Acked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}"
)
async def nack_handler(self, request: sourcer.NackRequest) -> None:
"""
The simple source negatively acknowledges the offsets.
"""
_LOGGER.info(f"Negatively acknowledging {len(request.offsets)} offsets")
for offset in request.offsets:
_LOGGER.warning(
f"Nacked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}"
)
async def pending_handler(self) -> sourcer.PendingResponse:
"""
The simple source always returns zero to indicate there is no pending record.
"""
return sourcer.PendingResponse(count=0)
async def partitions_handler(self) -> sourcer.PartitionsResponse:
"""
The simple source always returns default partitions.
"""
return sourcer.PartitionsResponse(partitions=[self.partition_idx])
# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.
signal.signal(signal.SIGINT, signal.default_int_handler)
try:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
except AttributeError:
pass
async def start():
server = sourcer.SourceAsyncServer()
# Create an instance of the source handler
handler = SimpleSource()
# Register loop-level signal handlers to request graceful shutdown
loop = asyncio.get_running_loop()
try:
loop.add_signal_handler(signal.SIGINT, lambda: server.stop())
loop.add_signal_handler(signal.SIGTERM, lambda: server.stop())
except (NotImplementedError, RuntimeError):
pass
try:
await server.start(handler)
print("Shutting down gracefully...")
except asyncio.CancelledError:
try:
server.stop()
except Exception:
pass
return
if __name__ == "__main__":
asyncio.run(start())