-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathexample.py
More file actions
40 lines (34 loc) · 1.25 KB
/
example.py
File metadata and controls
40 lines (34 loc) · 1.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import os
from pynumaflow.mapper import Messages, Message, Datum, Mapper, AsyncMapMultiprocServer
from pynumaflow._constants import _LOGGER
class FlatMap(Mapper):
"""
This class needs to be of type Mapper class to be used
as a handler for the MapServer class.
Example of a mapper that calculates if a number is prime.
"""
async def handler(self, keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
messages = Messages()
messages.append(Message(val, keys=keys))
_LOGGER.info(f"MY PID {os.getpid()}")
return messages
if __name__ == "__main__":
"""
Example of starting a multiprocessing map vertex.
"""
# To set the env server_count value set the env variable
# NUM_CPU_MULTIPROC="N"
server_count = int(os.getenv("NUM_CPU_MULTIPROC", "2"))
server_type = os.getenv("SERVER_KIND", "tcp")
use_tcp = False
if server_type == "tcp":
use_tcp = True
elif server_type == "uds":
use_tcp = False
_class = FlatMap()
# Server count is the number of server processes to start
grpc_server = AsyncMapMultiprocServer(_class, server_count=server_count, use_tcp=use_tcp)
grpc_server.start()