1- import avro . schema
1+ import json
22import requests
33
4- from avro .errors import AvroException
5- from avro .io import BinaryDecoder , BinaryEncoder , DatumReader , DatumWriter
4+ from fastavro import schemaless_writer , schemaless_reader , parse_schema
65from io import BytesIO
76from nypl_py_utils .functions .log_helper import create_log
87from requests .adapters import HTTPAdapter , Retry
@@ -23,7 +22,7 @@ def __init__(self, platform_schema_url):
2322 self .session = requests .Session ()
2423 self .session .mount ("https://" ,
2524 HTTPAdapter (max_retries = retry_policy ))
26- self .schema = avro . schema . parse (
25+ self .schema = parse_schema (
2726 self .get_json_schema (platform_schema_url ))
2827
2928 def get_json_schema (self , platform_schema_url ):
@@ -52,7 +51,7 @@ def get_json_schema(self, platform_schema_url):
5251
5352 try :
5453 json_response = response .json ()
55- return json_response ["data" ]["schema" ]
54+ return json . loads ( json_response ["data" ]["schema" ])
5655 except (JSONDecodeError , KeyError ) as e :
5756 self .logger .error (
5857 "Retrieved schema is malformed: {errorType} {errorMessage}"
@@ -70,26 +69,28 @@ class AvroEncoder(AvroClient):
7069 Platform API endpoint from which to fetch the schema in JSON format.
7170 """
7271
73- def encode_record (self , record ):
72+ def encode_record (self , record , silent = False ):
7473 """
7574 Encodes a single JSON record using the given Avro schema.
7675
7776 Returns the encoded record as a byte string.
7877 """
79- self .logger .debug (
80- "Encoding record using {schema} schema" .format (
81- schema = self .schema .name )
82- )
83- datum_writer = DatumWriter (self .schema )
78+ if not silent :
79+ self .logger .info (
80+ "Encoding record using {schema} schema" .format (
81+ schema = self .schema ['name' ]
82+ )
83+ )
8484 with BytesIO () as output_stream :
85- encoder = BinaryEncoder (output_stream )
8685 try :
87- datum_writer .write (record , encoder )
86+ schemaless_writer (output_stream , self .schema , record ,
87+ strict_allow_default = True )
8888 return output_stream .getvalue ()
89- except AvroException as e :
89+ except Exception as e :
9090 self .logger .error ("Failed to encode record: {}" .format (e ))
9191 raise AvroClientError (
92- "Failed to encode record: {}" .format (e )) from None
92+ "Failed to encode record: {}" .format (e )
93+ ) from None
9394
9495 def encode_batch (self , record_list ):
9596 """
@@ -99,25 +100,11 @@ def encode_batch(self, record_list):
99100 """
100101 self .logger .info (
101102 "Encoding ({num_rec}) records using {schema} schema" .format (
102- num_rec = len (record_list ), schema = self .schema . name
103+ num_rec = len (record_list ), schema = self .schema [ ' name' ]
103104 )
104105 )
105- encoded_records = []
106- datum_writer = DatumWriter (self .schema )
107- with BytesIO () as output_stream :
108- encoder = BinaryEncoder (output_stream )
109- for record in record_list :
110- try :
111- datum_writer .write (record , encoder )
112- encoded_records .append (output_stream .getvalue ())
113- output_stream .seek (0 )
114- output_stream .truncate (0 )
115- except AvroException as e :
116- self .logger .error ("Failed to encode record: {}" .format (e ))
117- raise AvroClientError (
118- "Failed to encode record: {}" .format (e )
119- ) from None
120- return encoded_records
106+ return [self .encode_record (record , silent = True )
107+ for record in record_list ]
121108
122109
123110class AvroDecoder (AvroClient ):
@@ -126,23 +113,22 @@ class AvroDecoder(AvroClient):
126113 Platform API endpoint from which to fetch the schema in JSON format.
127114 """
128115
129- def decode_record (self , record ):
116+ def decode_record (self , record , silent = False ):
130117 """
131118 Decodes a single record represented using the given Avro
132119 schema. Input must be a bytes-like object.
133120
134121 Returns a dictionary where each key is a field in the schema.
135122 """
136- self .logger .debug (
137- "Decoding {rec} using {schema} schema" .format (
138- rec = record , schema = self .schema .name
123+ if not silent :
124+ self .logger .info (
125+ "Decoding record using {schema} schema" .format (
126+ schema = self .schema ['name' ]
127+ )
139128 )
140- )
141- datum_reader = DatumReader (self .schema )
142129 with BytesIO (record ) as input_stream :
143- decoder = BinaryDecoder (input_stream )
144130 try :
145- return datum_reader . read ( decoder )
131+ return schemaless_reader ( input_stream , self . schema )
146132 except Exception as e :
147133 self .logger .error ("Failed to decode record: {}" .format (e ))
148134 raise AvroClientError (
@@ -157,14 +143,11 @@ def decode_batch(self, record_list):
157143 """
158144 self .logger .info (
159145 "Decoding ({num_rec}) records using {schema} schema" .format (
160- num_rec = len (record_list ), schema = self .schema . name
146+ num_rec = len (record_list ), schema = self .schema [ ' name' ]
161147 )
162148 )
163- decoded_records = []
164- for record in record_list :
165- decoded_record = self .decode_record (record )
166- decoded_records .append (decoded_record )
167- return decoded_records
149+ return [self .decode_record (record , silent = True )
150+ for record in record_list ]
168151
169152
170153class AvroClientError (Exception ):
0 commit comments