11import uuid
22from datetime import datetime
3+ import logging
34
45from pynumaflow .shared .asynciter import NonBlockingIterator
56from pynumaflow .sourcer import (
1213 get_default_partitions ,
1314 Sourcer ,
1415 SourceAsyncServer ,
16+ NackRequest ,
1517)
1618
19+ logging .basicConfig (
20+ level = logging .INFO ,
21+ format = "%(asctime)s %(levelname)-8s %(message)s" ,
22+ datefmt = "%Y-%m-%d %H:%M:%S" ,
23+ )
24+ logger = logging .getLogger (__name__ )
25+
1726
1827class AsyncSource (Sourcer ):
1928 """
2029 AsyncSource is a class for User Defined Source implementation.
2130 """
2231
2332 def __init__ (self ):
24- """
25- to_ack_set: Set to maintain a track of the offsets yet to be acknowledged
26- read_idx : the offset idx till where the messages have been read
27- """
28- self . to_ack_set = set ()
29- self .read_idx = 0
33+ # The offset idx till where the messages have been read
34+ self . read_idx : int = 0
35+ # Set to maintain a track of the offsets yet to be acknowledged
36+ self . to_ack_set : set [ int ] = set ()
37+ # Set to maintain a track of the offsets that have been negatively acknowledged
38+ self .nacked : set [ int ] = set ()
3039
3140 async def read_handler (self , datum : ReadRequest , output : NonBlockingIterator ):
3241 """
@@ -38,25 +47,42 @@ async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator):
3847 return
3948
4049 for x in range (datum .num_records ):
50+ # If there are any nacked offsets, re-deliver them
51+ if self .nacked :
52+ idx = self .nacked .pop ()
53+ else :
54+ idx = self .read_idx
55+ self .read_idx += 1
4156 headers = {"x-txn-id" : str (uuid .uuid4 ())}
4257 await output .put (
4358 Message (
4459 payload = str (self .read_idx ).encode (),
45- offset = Offset .offset_with_default_partition_id (str (self . read_idx ).encode ()),
60+ offset = Offset .offset_with_default_partition_id (str (idx ).encode ()),
4661 event_time = datetime .now (),
4762 headers = headers ,
4863 )
4964 )
50- self .to_ack_set .add (str (self .read_idx ))
51- self .read_idx += 1
65+ self .to_ack_set .add (idx )
5266
5367 async def ack_handler (self , ack_request : AckRequest ):
5468 """
5569 The ack handler is used acknowledge the offsets that have been read, and remove them
5670 from the to_ack_set
5771 """
5872 for req in ack_request .offsets :
59- self .to_ack_set .remove (str (req .offset , "utf-8" ))
73+ offset = int (req .offset )
74+ self .to_ack_set .remove (offset )
75+
76+ async def nack_handler (self , ack_request : NackRequest ):
77+ """
78+ Add the offsets that have been negatively acknowledged to the nacked set
79+ """
80+
81+ for req in ack_request .offsets :
82+ offset = int (req .offset )
83+ self .to_ack_set .remove (offset )
84+ self .nacked .add (offset )
85+ logger .info ("Negatively acknowledged offsets: %s" , self .nacked )
6086
6187 async def pending_handler (self ) -> PendingResponse :
6288 """
@@ -74,4 +100,5 @@ async def partitions_handler(self) -> PartitionsResponse:
74100if __name__ == "__main__" :
75101 ud_source = AsyncSource ()
76102 grpc_server = SourceAsyncServer (ud_source )
103+ logger .info ("Starting grpc server" )
77104 grpc_server .start ()
0 commit comments