# Some wrappers for getting appropriate avro schema for commonly used zip design operations
"""
This module provides utilities for loading, validating, and writing Avro schemas and records, specifically tailored for zip design operations.
.. warning::
The schemas are currently fetched from package resources. This is acceptable for a small company with good documentation, but it is recommended to move to a schema registry in the future.
Classes
-------
:class:`AvroDataHandler`
"""
import fastavro
import importlib.resources as resources
import json
import numpy as np
import io
import logging
from typing import BinaryIO, Optional
from zcloud import load_avro_schema
logger = logging.getLogger(__name__)
# TODO: the schema are, for now, fetched from package resources. This is ok for a small company with good docs, but we should move to a schema registry in the future
# HACK
[docs]
class AvroDataHandler:
"""
A handler for Avro data operations, including loading schemas, validating records, and writing to Avro files.
Parameters
----------
schema_name : str
The name of the schema to load.
output_file : str, optional
The file to write records to. Defaults to "records.avro".
**kwargs : dict
Additional metadata required by the schema.
Presence or absence of required metadata is checked against hint fields in the schema.
Attributes
----------
schema : dict
The loaded Avro schema.
zip_metadata : dict or None
Metadata specific to zip design operations. This metadata includes a hint field not supported by the Avro spec, which is used to indicate required metadata when writing the Avro file.
required_metadata_keys : list or None
List of required metadata keys.
optional_metadata_keys : list or None
List of optional metadata keys.
meta : dict
Metadata for fastavro.
records : list
List of records to be written to the Avro file.
output_file : str
The file to write records to.
keys : set
The keys of the schema. Update this if evolving the schema. Avoid modifying schema["fields"] directly.
"""
def __init__(self, schema_name, output_file=None,ignore_metadata=False, **kwargs):
self.schema = self._load_schema(schema_name)
self.zip_metadata = self.schema.get("zipbio_in_silico_metadata")
self.meta = None
# if not ignore_metadata:
# if not self.zip_metadata is None:
# self.required_metadata_keys = self.zip_metadata.get("required_metadata")
# self.optional_metadata_keys = self.zip_metadata.get("optional_metadata")
# else:
# self.required_metadata_keys = None
# self.optional_metadata_keys = None
# self.meta = self._get_required_metadata(**kwargs)
self.records = []
self.output_file = "records.avro" if output_file is None else output_file
self._base_keys = {field["name"] for field in self.schema["fields"]}
self.schema_keys = set(
self._base_keys
) # Use keys to evolve schema, _base_keys should reflect the contents of the avsc file at read time
def _load_schema(self, schema_name):
"""
Load an Avro schema from a .avsc file located in the resources/avro_schema directory.
Parameters
----------
schema_name : str
The name of the schema to load.
Returns
-------
dict
The loaded Avro schema.
"""
return load_avro_schema(schema_name)
# def _get_required_metadata(self, **kwargs):
# """
# Ensure **kwargs includes required metadata, and create the meta dict for fastavro.
# Parameters
# ----------
# **kwargs : dict
# Additional metadata required by the schema.
# Returns
# -------
# dict
# Metadata for fastavro.
# Raises
# ------
# ValueError
# If required metadata is missing.
# """
# meta = {}
# if not self.required_metadata_keys is None:
# required_key_set = set(self.required_metadata_keys)
# required_metadata_keys = required_key_set.intersection(kwargs)
# # Raise an error if we're missing required keys
# if not required_key_set == required_metadata_keys:
# logger.debug(
# f"Missing required metadata: {required_key_set - required_metadata_keys}"
# )
# raise ValueError("Missing required metadata")
# meta = {**meta, **{key: kwargs[key] for key in required_metadata_keys}}
# if not self.optional_metadata_keys is None:
# optional_keys = set(self.optional_metadata_keys).intersection(kwargs)
# meta = {**meta, **{key: kwargs[key] for key in optional_keys}}
# return meta
def _validate_record(self, record):
"""
Validate a record against the loaded schema.
Parameters
----------
record : dict
The record to validate.
Raises
------
ValueError
If the record does not conform to the schema.
"""
schema_fields = {field["name"]: field for field in self.schema["fields"]}
for field_name, field_value in record.items():
if field_name not in schema_fields:
raise ValueError(f"Field '{field_name}' not found in schema.")
field_schema = schema_fields[field_name]
if self._is_nullable(field_schema) and self._is_falsy(field_value):
record[field_name] = None
elif not self._is_nullable(field_schema) and self._is_falsy(field_value):
raise ValueError(
f"Field '{field_name}' is not nullable but received a falsy value."
)
def _is_nullable(self, field_schema):
"""
Check if a field is nullable.
Parameters
----------
field_schema : dict
The schema of the field.
Returns
-------
bool
True if the field is nullable, False otherwise.
"""
return (
"null" in field_schema["type"]
if isinstance(field_schema["type"], list)
else False
)
def _is_falsy(self, value):
"""
Check if a value is considered falsy and should be treated as a null.
Parameters
----------
value : any
The value to check.
Returns
-------
bool
True if the value is falsy, False otherwise.
"""
return value is None or value is False or value is np.nan or value == ""
[docs]
def write_to_avro(self, output_file: Optional[BinaryIO] = None):
"""
Write the collected records to an Avro file.
Parameters
----------
output_file : BinaryIO, optional
The file to write records to. If None, writes to self.output_file.
"""
if output_file is None:
with open(self.output_file, "wb") as out:
fastavro.writer(out, self.schema, self.records, metadata=self.meta)
else:
fastavro.writer(output_file, self.schema, self.records, metadata=self.meta)
def _add_record(self, **kwargs):
"""
Validate and append a record.
Parameters
----------
**kwargs : dict
The record to add.
Raises
------
ValueError
If the record does not conform to the schema.
"""
if self.validate_record(kwargs):
self.records.append(kwargs)
else:
raise ValueError("Record does not conform to schema")
[docs]
def validate_record(self, record):
"""
Simple validation against the schema, checking that all required fields exist.
Parameters
----------
record : dict
The record to validate.
Returns
-------
bool
True if the record is valid, False otherwise.
"""
fields = {field["name"] for field in self.schema["fields"]}
record_fields = set(record.keys())
if not record_fields.issubset(fields):
return False
# Further type checking can be implemented here if needed
return True