Skip to content

Commit a47ecbd

Browse files
committed
Fix Avro bytes serialization
1 parent 1082e97 commit a47ecbd

File tree

4 files changed

+92
-22
lines changed

4 files changed

+92
-22
lines changed

src/confluent_kafka/schema_registry/_async/avro.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -348,9 +348,16 @@ def field_transformer(rule_ctx, field_transform, msg): return ( # noqa: E731
348348
parsed_schema = self._parsed_schema
349349

350350
with _ContextStringIO() as fo:
351-
# write the record to the rest of the buffer
352-
schemaless_writer(fo, parsed_schema, value)
353-
buffer = fo.getvalue()
351+
# Check if it's a simple bytes type
352+
is_bytes = (parsed_schema == "bytes" or
353+
(isinstance(parsed_schema, dict) and parsed_schema.get("type") == "bytes"))
354+
if is_bytes:
355+
# For simple bytes type, write value directly
356+
buffer = value if isinstance(value, bytes) else value.encode()
357+
else:
358+
# write the record to the rest of the buffer
359+
schemaless_writer(fo, parsed_schema, value)
360+
buffer = fo.getvalue()
354361

355362
if latest_schema is not None:
356363
buffer = self._execute_rules_with_phase(
@@ -585,17 +592,29 @@ async def __deserialize(
585592
reader_schema_raw = writer_schema_raw
586593
reader_schema = writer_schema
587594

595+
# Check if it's a simple bytes type
596+
is_bytes = (writer_schema == "bytes" or
597+
(isinstance(writer_schema, dict) and writer_schema.get("type") == "bytes"))
598+
588599
if migrations:
589-
obj_dict = schemaless_reader(payload,
590-
writer_schema,
591-
None,
592-
self._return_record_name)
600+
if is_bytes:
601+
# For simple bytes type, read payload directly
602+
obj_dict = payload.read()
603+
else:
604+
obj_dict = schemaless_reader(payload,
605+
writer_schema,
606+
None,
607+
self._return_record_name)
593608
obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict)
594609
else:
595-
obj_dict = schemaless_reader(payload,
596-
writer_schema,
597-
reader_schema,
598-
self._return_record_name)
610+
if is_bytes:
611+
# For simple bytes type, read payload directly
612+
obj_dict = payload.read()
613+
else:
614+
obj_dict = schemaless_reader(payload,
615+
writer_schema,
616+
reader_schema,
617+
self._return_record_name)
599618

600619
def field_transformer(rule_ctx, field_transform, message): return ( # noqa: E731
601620
transform(rule_ctx, reader_schema, message, field_transform))

src/confluent_kafka/schema_registry/_sync/avro.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -348,9 +348,16 @@ def field_transformer(rule_ctx, field_transform, msg): return ( # noqa: E731
348348
parsed_schema = self._parsed_schema
349349

350350
with _ContextStringIO() as fo:
351-
# write the record to the rest of the buffer
352-
schemaless_writer(fo, parsed_schema, value)
353-
buffer = fo.getvalue()
351+
# Check if it's a simple bytes type
352+
is_bytes = (parsed_schema == "bytes" or
353+
(isinstance(parsed_schema, dict) and parsed_schema.get("type") == "bytes"))
354+
if is_bytes:
355+
# For simple bytes type, write value directly
356+
buffer = value if isinstance(value, bytes) else value.encode()
357+
else:
358+
# write the record to the rest of the buffer
359+
schemaless_writer(fo, parsed_schema, value)
360+
buffer = fo.getvalue()
354361

355362
if latest_schema is not None:
356363
buffer = self._execute_rules_with_phase(
@@ -585,17 +592,29 @@ def __deserialize(
585592
reader_schema_raw = writer_schema_raw
586593
reader_schema = writer_schema
587594

595+
# Check if it's a simple bytes type
596+
is_bytes = (writer_schema == "bytes" or
597+
(isinstance(writer_schema, dict) and writer_schema.get("type") == "bytes"))
598+
588599
if migrations:
589-
obj_dict = schemaless_reader(payload,
590-
writer_schema,
591-
None,
592-
self._return_record_name)
600+
if is_bytes:
601+
# For simple bytes type, read payload directly
602+
obj_dict = payload.read()
603+
else:
604+
obj_dict = schemaless_reader(payload,
605+
writer_schema,
606+
None,
607+
self._return_record_name)
593608
obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict)
594609
else:
595-
obj_dict = schemaless_reader(payload,
596-
writer_schema,
597-
reader_schema,
598-
self._return_record_name)
610+
if is_bytes:
611+
# For simple bytes type, read payload directly
612+
obj_dict = payload.read()
613+
else:
614+
obj_dict = schemaless_reader(payload,
615+
writer_schema,
616+
reader_schema,
617+
self._return_record_name)
599618

600619
def field_transformer(rule_ctx, field_transform, message): return ( # noqa: E731
601620
transform(rule_ctx, reader_schema, message, field_transform))

tests/schema_registry/_async/test_avro_serdes.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,22 @@ async def test_avro_serialize_use_schema_id():
198198
assert obj == obj2
199199

200200

201+
async def test_avro_serialize_bytes():
202+
conf = {'url': _BASE_URL}
203+
client = AsyncSchemaRegistryClient.new_client(conf)
204+
ser_conf = {'auto.register.schemas': True}
205+
obj = b'\x02\x03\x04'
206+
schema = 'bytes'
207+
ser = await AsyncAvroSerializer(client, schema_str=json.dumps(schema), conf=ser_conf)
208+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
209+
obj_bytes = await ser(obj, ser_ctx)
210+
assert b'\x00\x00\x00\x00\x01\x02\x03\x04' == obj_bytes
211+
212+
deser = await AsyncAvroDeserializer(client)
213+
obj2 = await deser(obj_bytes, ser_ctx)
214+
assert obj == obj2
215+
216+
201217
async def test_avro_serialize_nested():
202218
conf = {'url': _BASE_URL}
203219
client = AsyncSchemaRegistryClient.new_client(conf)

tests/schema_registry/_sync/test_avro_serdes.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,22 @@ def test_avro_serialize_use_schema_id():
198198
assert obj == obj2
199199

200200

201+
def test_avro_serialize_bytes():
202+
conf = {'url': _BASE_URL}
203+
client = SchemaRegistryClient.new_client(conf)
204+
ser_conf = {'auto.register.schemas': True}
205+
obj = b'\x02\x03\x04'
206+
schema = 'bytes'
207+
ser = AvroSerializer(client, schema_str=json.dumps(schema), conf=ser_conf)
208+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
209+
obj_bytes = ser(obj, ser_ctx)
210+
assert b'\x00\x00\x00\x00\x01\x02\x03\x04' == obj_bytes
211+
212+
deser = AvroDeserializer(client)
213+
obj2 = deser(obj_bytes, ser_ctx)
214+
assert obj == obj2
215+
216+
201217
def test_avro_serialize_nested():
202218
conf = {'url': _BASE_URL}
203219
client = SchemaRegistryClient.new_client(conf)

0 commit comments

Comments
 (0)