Source code for zcloud.avro

# Some wrappers for getting appropriate avro schema for commonly used zip design operations
"""

This module provides utilities for loading, validating, and writing Avro schemas and records, specifically tailored for zip design operations.

.. warning::
    The schemas are currently fetched from package resources. This is acceptable for a small company with good documentation, but it is recommended to move to a schema registry in the future.


Classes
-------

    :class:`AvroDataHandler`


"""

import fastavro
import importlib.resources as resources
import json
import numpy as np

import io
import logging
from typing import BinaryIO, Optional

from zcloud import load_avro_schema


logger = logging.getLogger(__name__)

# TODO: the schema are, for now, fetched from package resources. This is ok for a small company with good docs, but we should move to a schema registry in the future
# HACK


[docs] class AvroDataHandler: """ A handler for Avro data operations, including loading schemas, validating records, and writing to Avro files. Parameters ---------- schema_name : str The name of the schema to load. output_file : str, optional The file to write records to. Defaults to "records.avro". **kwargs : dict Additional metadata required by the schema. Presence or absence of required metadata is checked against hint fields in the schema. Attributes ---------- schema : dict The loaded Avro schema. zip_metadata : dict or None Metadata specific to zip design operations. This metadata includes a hint field not supported by the Avro spec, which is used to indicate required metadata when writing the Avro file. required_metadata_keys : list or None List of required metadata keys. optional_metadata_keys : list or None List of optional metadata keys. meta : dict Metadata for fastavro. records : list List of records to be written to the Avro file. output_file : str The file to write records to. keys : set The keys of the schema. Update this if evolving the schema. Avoid modifying schema["fields"] directly. """ def __init__(self, schema_name, output_file=None,ignore_metadata=False, **kwargs): self.schema = self._load_schema(schema_name) self.zip_metadata = self.schema.get("zipbio_in_silico_metadata") self.meta = None # if not ignore_metadata: # if not self.zip_metadata is None: # self.required_metadata_keys = self.zip_metadata.get("required_metadata") # self.optional_metadata_keys = self.zip_metadata.get("optional_metadata") # else: # self.required_metadata_keys = None # self.optional_metadata_keys = None # self.meta = self._get_required_metadata(**kwargs) self.records = [] self.output_file = "records.avro" if output_file is None else output_file self._base_keys = {field["name"] for field in self.schema["fields"]} self.schema_keys = set( self._base_keys ) # Use keys to evolve schema, _base_keys should reflect the contents of the avsc file at read time def _load_schema(self, schema_name): """ Load an Avro schema from a .avsc file located in the resources/avro_schema directory. Parameters ---------- schema_name : str The name of the schema to load. Returns ------- dict The loaded Avro schema. """ return load_avro_schema(schema_name) # def _get_required_metadata(self, **kwargs): # """ # Ensure **kwargs includes required metadata, and create the meta dict for fastavro. # Parameters # ---------- # **kwargs : dict # Additional metadata required by the schema. # Returns # ------- # dict # Metadata for fastavro. # Raises # ------ # ValueError # If required metadata is missing. # """ # meta = {} # if not self.required_metadata_keys is None: # required_key_set = set(self.required_metadata_keys) # required_metadata_keys = required_key_set.intersection(kwargs) # # Raise an error if we're missing required keys # if not required_key_set == required_metadata_keys: # logger.debug( # f"Missing required metadata: {required_key_set - required_metadata_keys}" # ) # raise ValueError("Missing required metadata") # meta = {**meta, **{key: kwargs[key] for key in required_metadata_keys}} # if not self.optional_metadata_keys is None: # optional_keys = set(self.optional_metadata_keys).intersection(kwargs) # meta = {**meta, **{key: kwargs[key] for key in optional_keys}} # return meta def _validate_record(self, record): """ Validate a record against the loaded schema. Parameters ---------- record : dict The record to validate. Raises ------ ValueError If the record does not conform to the schema. """ schema_fields = {field["name"]: field for field in self.schema["fields"]} for field_name, field_value in record.items(): if field_name not in schema_fields: raise ValueError(f"Field '{field_name}' not found in schema.") field_schema = schema_fields[field_name] if self._is_nullable(field_schema) and self._is_falsy(field_value): record[field_name] = None elif not self._is_nullable(field_schema) and self._is_falsy(field_value): raise ValueError( f"Field '{field_name}' is not nullable but received a falsy value." ) def _is_nullable(self, field_schema): """ Check if a field is nullable. Parameters ---------- field_schema : dict The schema of the field. Returns ------- bool True if the field is nullable, False otherwise. """ return ( "null" in field_schema["type"] if isinstance(field_schema["type"], list) else False ) def _is_falsy(self, value): """ Check if a value is considered falsy and should be treated as a null. Parameters ---------- value : any The value to check. Returns ------- bool True if the value is falsy, False otherwise. """ return value is None or value is False or value is np.nan or value == ""
[docs] def write_to_avro(self, output_file: Optional[BinaryIO] = None): """ Write the collected records to an Avro file. Parameters ---------- output_file : BinaryIO, optional The file to write records to. If None, writes to self.output_file. """ if output_file is None: with open(self.output_file, "wb") as out: fastavro.writer(out, self.schema, self.records, metadata=self.meta) else: fastavro.writer(output_file, self.schema, self.records, metadata=self.meta)
def _add_record(self, **kwargs): """ Validate and append a record. Parameters ---------- **kwargs : dict The record to add. Raises ------ ValueError If the record does not conform to the schema. """ if self.validate_record(kwargs): self.records.append(kwargs) else: raise ValueError("Record does not conform to schema")
[docs] def validate_record(self, record): """ Simple validation against the schema, checking that all required fields exist. Parameters ---------- record : dict The record to validate. Returns ------- bool True if the record is valid, False otherwise. """ fields = {field["name"] for field in self.schema["fields"]} record_fields = set(record.keys()) if not record_fields.issubset(fields): return False # Further type checking can be implemented here if needed return True