"""
This module provides utilities for loading and validating JSON schemas.
Classes
-------
:class:`MetaSchemaHandler`
:class:`SCUManifestSchemaHandler`
Functions
---------
:func:`load_meta_schema`
:func:`load_raw_schema`
Notes
-----
In the future, the module will migrate from using importlib.resources to using a schema validation library.
"""
from importlib import resources
import json
import logging
import os
from typing import Type, Optional, Any, Dict, List
from google.cloud import bigquery
from jsonschema import validate
from zcloud.scu_enums import RawSCURecord
from zcloud.service_enums import ProjectId
logger = logging.getLogger(__name__)
# TODO make these globals in a resource file or something like that
RESOURCES = "zcloud.resources"
AVRO_SCHEMA = "avro_schema"
META_SCHEMA = "meta_schema"
RAW_SCU_SCHEMA = "raw_scu_schema"
SCU_FILE_SCHEMA = "scu_file_schema"
DEFAULT_ETL_SCHEMA_NAME = "etl_pipeline_default"
DEFAULT_BEAM_SCHEMA_NAME = "beam_pipeline_default" # Deprecated, use DEFAULT_ETL_SCHEMA_NAME instead
BUCKET = "docking-bay-1"
[docs]
def list_templates() -> Dict[str, List[str]]:
"""
List available templates in all supported resource subdirectories.
Returns
-------
Dict[str, List[str]]
A dictionary where keys are resource types (e.g. 'meta_schema', 'raw_scu_schema', etc.)
and values are lists of template names (file names without extension).
"""
template_dirs = {
"scu_file_schema": SCU_FILE_SCHEMA,
"meta_schema": META_SCHEMA,
"raw_scu_schema": RAW_SCU_SCHEMA,
"avro_schema": AVRO_SCHEMA,
}
templates = {}
for key, subdir in template_dirs.items():
subdir_path = resources.files(RESOURCES) / subdir
if not subdir_path.is_dir():
continue
# List files and remove extensions. (Note: for avro schemas the extension is .avsc,
# and for the others it is .json.)
templates[key] = [
os.path.splitext(item.name)[0]
for item in subdir_path.iterdir()
if item.is_file()
]
return templates
[docs]
def list_templates_by_type(template_type: str) -> List[str]:
"""
List templates under a specific resource subdirectory.
Parameters
----------
template_type : str
The name of the subdirectory (one of the supported types, e.g. AVRO_SCHEMA).
Returns
-------
List[str]
A list of template names (file names without extension).
Raises
------
ValueError
If the specified template_type folder is not found.
"""
subdir_path = resources.files(RESOURCES) / template_type
if not subdir_path.is_dir():
raise ValueError(f"Template type '{template_type}' not found.")
return [
os.path.splitext(item.name)[0]
for item in subdir_path.iterdir()
if item.is_file() and not item.name.endswith(".py")
]
[docs]
def load_scu_file_schema(name: str) -> Dict[str, Any]:
"""
Load the schema for an SCU output file
This is a utility intended to abstract the indexing of all the sorts of chaotic files that our SCUs might generate.
SCUs are only indexed at the file level, and those files sometimes have structured data, sometimes have unstructured data, and sometimes have a mix of both.
The schema indexed here are intended to be used to index, validate, and process that data in a more idiomatic way.
Any schema validation should be done in the calling function, and the reliability of the schema here is conditional on the responsibility of the person who patched the SCU in.
.. note::
This function currently just loads a file from package resources. In the future, it will fetch the schema from a schema registry.
Parameters
----------
name : str
The name of the schema file to load.
Returns
-------
Dict[str, Any]
The loaded schema as a dictionary.
"""
schema_path = resources.files(RESOURCES) / SCU_FILE_SCHEMA / f"{name}.json"
with schema_path.open("r") as f:
return json.load(f)
[docs]
def load_raw_schema(name: str) -> Dict[str, Any]:
"""
Load a raw schema from a JSON file in the package.
Parameters
----------
name : str
The name of the schema file to load.
Returns
-------
Dict[str, Any]
The loaded schema as a dictionary.
"""
meta_schema_handler = MetaSchemaHandler(
"scu_raw"
) # get the generic SCU raw output schema to validate against
schema_path = resources.files(RESOURCES) / RAW_SCU_SCHEMA / f"{name}.json"
with schema_path.open("r") as f:
schema = json.load(f)
if schema["name"] != name:
logger.warning(
f"Schema name {schema['name']} does not match expected name {name}"
)
meta_schema_handler.validate(schema)
return schema
[docs]
def load_avro_schema(name: str) -> Dict[str, Any]:
"""
Load an Avro schema from a JSON file in the package.
Parameters
----------
name : str
The name of the schema file to load.
Returns
-------
Dict[str, Any]
The loaded schema as a dictionary.
"""
schema_path = resources.files(RESOURCES) / AVRO_SCHEMA / f"{name}.avsc"
with schema_path.open("r") as f:
return json.load(f)
[docs]
class SCUManifestSchemaHandler(MetaSchemaHandler):
"""
API sugar to avoid dangling string references to schema fields through code dependencies
means that schema changes are less likely to break orphaned code, as long as the zcloud in its environment is up to date.
"""
def __init__(self):
super().__init__("scu_manifest")
self.project = "project"
self.subproject = "subproject"
self.experiment = "experiment"
self.authors = "authors"
self.compute_type = "compute_type"
self.meta = "meta" # distinct from x-zip-metadata, this is upload metadata
self.upload_uuid = "upload_uuid"
self.upload_timestamp = "upload_timestamp"
self.element_files = "element_files"
self.files = "files"
self.elements = "elements"
self.file_path = "name"
self.gcloud_uri = "gcloud_uri"
self.correlation_id = "correlation_id"
# check that the meta fields we just listed, project, subproject,experiment, and authors are in the x-zip-metadata field
# This is really more of an integration tests or a sanity check
if self.project not in self.meta_keys:
raise ValueError(
f"Expected field {self.project} not found in x-zip-metadata"
)
if self.subproject not in self.meta_keys:
raise ValueError(
f"Expected field {self.subproject} not found in x-zip-metadata"
)
if self.experiment not in self.meta_keys:
raise ValueError(
f"Expected field {self.experiment} not found in x-zip-metadata"
)
if self.authors not in self.meta_keys:
raise ValueError(
f"Expected field {self.authors} not found in x-zip-metadata"
)
# check keys related to upload metadata
upload_meta = self.properties.get("meta").get("properties").keys()
if self.compute_type not in upload_meta:
raise ValueError(
f"Expected field {self.compute_type} not found in properties"
)
if self.upload_uuid not in upload_meta:
raise ValueError(
f"Expected field {self.upload_uuid} not found in properties"
)
if self.upload_timestamp not in upload_meta:
raise ValueError(f"Expected field {self.upload_timestamp} not found in properties")
# check keys related to the upload contents
if self.element_files not in self.properties:
raise ValueError(
f"Expected field {self.element_files} not found in properties"
)
if (
self.files
not in self.properties.get(self.element_files)
.get("items")
.get("properties")
.keys()
):
raise ValueError(
f"Expected field {self.files} not found in properties of the element_files"
)
if self.elements not in self.properties.keys():
raise ValueError(f"Expected field {self.elements} not found in properties")
# check keys related to the files
if self.file_path not in self.definitions.get("file").get("properties").keys():
raise ValueError(
f"Expected field {self.file_path} not found in properties for the 'file' definition"
)
[docs]
def get_raw_upload_bq_schema_list(self):
"""
Get a list of dicts compatible with the apache beam bigquery TableFieldSchema API
Returns
-------
dict
The BigQuery schema as a dictionary.
"""
bq_schema = [
{"name": self.project, "field_type": "STRING", "mode": "NULLABLE"},
{"name": self.subproject, "field_type": "STRING", "mode": "NULLABLE"},
{"name": self.experiment, "field_type": "STRING", "mode": "NULLABLE"},
{"name": self.authors, "field_type": "STRING", "mode": "REPEATED"},
{"name": self.compute_type, "field_type": "STRING", "mode": "REQUIRED"},
{"name": self.upload_uuid, "field_type": "STRING", "mode": "REQUIRED"},
{
"name": self.upload_timestamp,
"field_type": "TIMESTAMP",
"mode": "REQUIRED",
},
{"name": self.gcloud_uri, "field_type": "STRING", "mode": "REQUIRED"},
]
return bq_schema
[docs]
class SCURecordSchemaHandler(MetaSchemaHandler):
def __init__(self):
super().__init__("scu_record")
self.entity_id = RawSCURecord.ENTITY_ID.value
self.gcloud_uri = RawSCURecord.GCS_URI.value
self.upload_uuid = RawSCURecord.UPLOAD_UUID.value
self.structure_type = RawSCURecord.STRUCTURE_TYPE.value
self.correlation_id = RawSCURecord.CORRELATION_ID.value
self.upload_timestamp = RawSCURecord.UPLOAD_TIMESTAMP.value
self.manifest_uri = RawSCURecord.MANIFEST_URI.value
self.compute_type = RawSCURecord.COMPUTE_TYPE.value
self.scu_fields = [
self.entity_id,
self.gcloud_uri,
self.upload_uuid,
self.structure_type,
self.correlation_id,
self.upload_timestamp,
self.manifest_uri,
self.compute_type,
]
self.required_fields = [
self.entity_id,
self.gcloud_uri,
self.upload_uuid,
self.compute_type,
]
[docs]
def get_field_names(self):
return self.scu_fields
[docs]
def create_scu_record(
self,
entity_id=None,
gcs_uri=None,
upload_uuid=None,
structure_type=None,
correlation_id=None,
upload_timestamp=None,
manifest_uri=None,
compute_type=None,
**kwargs,
):
"""
Create an SCU record with explicitly named parameters and optional kwargs.
This method provides explicit parameters for IDE autocomplete while also
supporting kwargs for backward compatibility and flexibility.
kwargs are not implemented yet.
Parameters
----------
entity_id : str, optional
The entity identifier
gcs_uri : str, optional
The Google Cloud Storage URI (same as gcloud_uri)
upload_uuid : str, optional
The upload UUID
structure_type : str, optional
The structure type
correlation_id : str, optional
The correlation identifier
upload_timestamp : str, optional
The upload timestamp
manifest_uri : str, optional
The manifest URI
compute_type : str, optional
The compute type
**kwargs : dict, optional
Additional keyword arguments. Not implemented yet.
Returns
-------
dict
A dictionary containing the SCU record fields.
Examples
--------
>>> handler = SCURecordSchemaHandler()
>>> # Using explicit parameters:
>>> record = handler.create_scu_record(entity_id="123", upload_uuid="abc")
>>>
>>> # Using enum values as kwargs:
>>> field_names = handler.get_field_names()
>>> record = handler.create_scu_record(**{field_names["entity_id"]: "123"})
"""
# Start with explicit parameters
result = {
self.entity_id: entity_id,
self.gcloud_uri: gcs_uri,
self.upload_uuid: upload_uuid,
self.structure_type: structure_type,
self.correlation_id: correlation_id,
self.upload_timestamp: upload_timestamp,
self.manifest_uri: manifest_uri,
self.compute_type: compute_type,
}
return result
[docs]
def get_bq_schema(self) -> List[Dict[str, str]]:
"""
Get the BigQuery schema for the SCU record.
.. note::
This is not a list of bigquery.SchemaField objects, it is a list of dictionaries, each containing the field name, field type, and mode.
If you want to use this schema to submit a schema to bigquery, you will need to convert it to a list of bigquery.SchemaField objects.
Example:
>>> schema = handler.get_bq_schema()
>>> schema_fields = [bigquery.SchemaField(\*\*field) for field in schema] #*maintainer note*: stop removing this escape
>>> table = bigquery.Table(table_id, schema=schema_fields)
>>> table.create(client=client)
Returns
-------
List[Dict[str, str]]
A list of dictionaries, each containing the field name, field type, and mode.
"""
return [
{
"name": field,
"field_type": (
"STRING" if field != self.upload_timestamp else "TIMESTAMP"
),
"mode": "REQUIRED" if field in self.required_fields else "NULLABLE",
}
for field in self.scu_fields
]
[docs]
class ETLConfigSchemaHandler(MetaSchemaHandler):
"""
Handle schemas for ETL configs.
This class was previously named BeamConfigSchemaHandler but has been renamed
to reflect the broader ETL functionality rather than specific Apache Beam functionality.
Attributes
----------
transforms : str
The name of the transforms field in the schema
data : str
The name of the data field in the schema
loads : str
The name of the loads field in the schema
transform_type : str
The name of the transform type field in transform items
loader_type : str
The name of the loader type field in load items
input_data : str
The name of the input data field
transform_parameters : str
The name of the transform parameters field
loader_parameters : str
The name of the loader parameters field
transform_name : str
The name of the transform name field
"""
def __init__(self):
super().__init__(DEFAULT_ETL_SCHEMA_NAME)
self.transforms = "transforms"
self.data = "data"
self.loads = "loads"
# Check that the required fields are in the schema properties
if self.transforms not in self.properties:
raise ValueError(
f"Expected field {self.transforms} not found in properties"
)
if self.data not in self.properties:
raise ValueError(f"Expected field {self.data} not found in properties")
if self.loads not in self.properties:
raise ValueError(f"Expected field {self.loads} not found in properties")
# Check that the required fields are in the schema required list
required_fields = self.schema.get("required", [])
if self.transforms not in required_fields:
raise ValueError(
f"Expected field {self.transforms} not found in required fields"
)
if self.data not in required_fields:
raise ValueError(f"Expected field {self.data} not found in required fields")
if self.loads not in required_fields:
raise ValueError(
f"Expected field {self.loads} not found in required fields"
)
self.transform_type = "transform_type"
self.loader_type = "loader_type"
self.input_data = "input_data"
self.transform_parameters = "parameters"
self.loader_parameters = "parameters"
self.transform_name = "name"
# Validate that the important sub-properties exist in the schema
transform_properties = (
self.properties.get(self.transforms).get("items").get("properties")
)
if self.transform_type not in transform_properties:
raise ValueError(
f"Expected field {self.transform_type} not found in properties of transforms"
)
if self.input_data not in transform_properties:
raise ValueError(
f"Expected field {self.input_data} not found in properties of transforms"
)
if self.transform_parameters not in transform_properties:
raise ValueError(
f"Expected field {self.transform_parameters} not found in properties of transforms"
)
if self.transform_name not in transform_properties:
raise ValueError(
f"Expected field {self.transform_name} not found in properties of transforms"
)
self.data_pipeline_tags = "pipeline_tags"
self.data_type = "type"
# data_properties = self.properties.get(self.data).get("items").get("oneOf")
# if not any(self.data_parameters in item.get("properties", {}) for item in data_properties):
# raise ValueError(f"Expected field {self.data_parameters} not found in properties of data items")
load_properties = self.properties.get(self.loads).get("items").get("properties")
if self.loader_type not in load_properties:
raise ValueError(
f"Expected field {self.loader_type} not found in properties of loads"
)
if self.input_data not in load_properties:
raise ValueError(
f"Expected field {self.input_data} not found in properties of loads"
)
if self.loader_parameters not in load_properties:
raise ValueError(
f"Expected field {self.loader_parameters} not found in properties of loads"
)
self.data_name = "name"
self.data_metadata = "metadata"
self.data_tags = "tags"
if self.data_tags not in self.definitions.get("DataBlockMetadata").get(
"properties"
):
raise ValueError(
f"Expected field {self.data_tags} not found in properties of data block metadata definition"
)
self.data_extractor = "extractor"
self.extractor_type = "extractor_type"
self.extractor_parameters = "parameters"
[docs]
class SCUConfigSchemaHandler(MetaSchemaHandler):
def __init__(self):
# super().__init__("scu_config") # maintainer note: this is deprecated and no scu_config schema exists
self.project = "project"
self.subproject = "subproject"
self.experiment = "experiment"
self.authors = "authors"
self.bucket = "bucket"
self.gcp_project = "gcp_project"
# Check that the required fields are in the schema properties
required_fields = [
self.project,
self.subproject,
self.experiment,
self.authors,
self.bucket,
self.gcp_project,
]
for field in required_fields:
if field not in self.properties:
raise ValueError(f"Expected field {field} not found in properties")
# Check that the required fields are in the schema required list
for field in required_fields:
if field not in self.required:
raise ValueError(f"Expected field {field} not found in required fields")
[docs]
def get_beam_config_schema_handler():
"""
Get a BeamConfigSchemaHandler instance.
Note: This function is deprecated and has been renamed to get_etl_config_schema_handler.
It is kept for backwards compatibility but may be removed in a future version.
Returns
-------
BeamConfigSchemaHandler
An instance of the BeamConfigSchemaHandler class (which inherits from ETLConfigSchemaHandler).
"""
logging.getLogger(__name__).warning(
"get_beam_config_schema_handler is deprecated and has been renamed to get_etl_config_schema_handler."
)
return BeamConfigSchemaHandler()
[docs]
def get_scu_manifest_handler():
return SCUManifestSchemaHandler()
[docs]
def get_scu_config_handler():
return SCUConfigSchemaHandler()
[docs]
def get_etl_config_schema_handler():
"""
Get an ETLConfigSchemaHandler instance.
Returns
-------
ETLConfigSchemaHandler
An instance of the ETLConfigSchemaHandler class.
"""
return ETLConfigSchemaHandler()
[docs]
def list_from_element_files(
manifest_dict: Dict[str, Any], *element_strings: str, scumh: Optional[SCUManifestSchemaHandler] = None
) -> List[Dict[str, str]]:
"""
Generate a list of file dictionaries from the given manifest dictionary and element strings.
Parameters
----------
manifest_dict : dict
The manifest dictionary containing metadata and element files.
*element_strings : str
Variable length argument list of element strings to filter the files.
scumh : SCUManifestSchemaHandler, optional
An instance of SCUManifestSchemaHandler. If None, a new instance is created using `get_scu_manifest_handler()`.
Generally, you should use the default, you would only specify an alternative handler if something is going wrong or you need an old/new schema
Returns
-------
list of dict
A list of dictionaries, each containing the file path and upload UUID.
Notes
-----
- The function assumes that `manifest_dict` contains keys defined in `scumh`.
- The `scumh` parameter is used to access specific keys within the `manifest_dict`.
Examples
--------
>>> manifest_dict = {
... 'meta': {'upload_uuid': '1234'},
... 'element_files': [
... {'name': 'path1', 'files': [{'name': 'file1'}, {'name': 'file2'}]},
... {'name': 'path2', 'files': [{'name': 'file3'}, {'name': 'file4'}]}
... ]
... }
>>> list_from_element_files(manifest_dict, 'path1')
[{'name': '1234/file1', 'upload_uuid': '1234'}, {'name': '1234/file2', 'upload_uuid': '1234'}]
"""
if scumh is None:
scumh = get_scu_manifest_handler()
uuid = manifest_dict[scumh.meta][scumh.upload_uuid]
file_dicts = []
for element in manifest_dict[scumh.element_files]:
if element[scumh.file_path] in element_strings:
file_dicts.extend(
[
{
scumh.file_path: f"{os.path.join(uuid,f[scumh.file_path])}",
scumh.upload_uuid: uuid,
}
for f in element[scumh.files]
]
)
return file_dicts
# TODO: factory class with registered schema handlers, should be able to get a handler for a given schema name, instead of just named default funcs like get_scu_manifest_handler
[docs]
def create_dataset_if_not_exists(project_id: str, dataset_id: str) -> None:
"""
Create a BigQuery dataset if it doesn't already exist.
Parameters
----------
project_id : str
The Google Cloud project ID.
dataset_id : str
The BigQuery dataset ID.
Returns
-------
None
"""
client = bigquery.Client(project=project_id)
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
try:
client.get_dataset(dataset_ref)
logger.info(f"Dataset {dataset_id} already exists in {project_id}")
except Exception as e:
logger.info(f"Dataset {dataset_id} does not exist in {project_id}, creating it now.")
try:
dataset = bigquery.Dataset(dataset_ref)
client.create_dataset(dataset)
logger.info(f"Dataset {dataset_id} created successfully in {project_id}")
except Exception as create_error:
logger.error(f"Failed to create dataset {dataset_id}: {create_error}")
raise
[docs]
def get_bq_table_id_from_compute_type(
compute_type: str, meta: Dict[str, Any], scumh: Optional[SCUManifestSchemaHandler] = None
) -> str:
"""
Get a BigQuery table name from a compute type and metadata.
Parameters
----------
compute_type : str
The compute type.
meta : Dict[str, Any]
The metadata.
scumh : SCUManifestSchemaHandler, optional
The SCU manifest schema handler. If None, one will be created.
Returns
-------
str
The BigQuery table name.
"""
scumh = scumh or get_scu_manifest_handler()
project_id = ProjectId.current()
if project_id:
create_dataset_if_not_exists(project_id, compute_type)
return f"{project_id}:{compute_type}.{meta[scumh.upload_uuid]}"
else:
raise ValueError("Unable to determine project ID for BigQuery table")
[docs]
def get_beam_bq_table_from_compute_type(
compute_type: str, meta: Dict[str, Any], scumh: Optional[SCUManifestSchemaHandler] = None
) -> str:
"""
Get a BigQuery table name from a compute type and metadata.
Note: This function is deprecated and has been renamed to get_bq_table_id_from_compute_type.
It is kept for backwards compatibility but may be removed in a future version.
Parameters
----------
compute_type : str
The compute type.
meta : Dict[str, Any]
The metadata.
scumh : SCUManifestSchemaHandler, optional
The SCU manifest schema handler. If None, one will be created.
Returns
-------
str
The BigQuery table name.
"""
logging.getLogger(__name__).warning(
"get_beam_bq_table_from_compute_type is deprecated and has been renamed to get_bq_table_id_from_compute_type."
)
return get_bq_table_id_from_compute_type(compute_type, meta, scumh)
[docs]
def available_defaults() -> Dict[str, str]:
"""
Get a dictionary of default configuration values.
Returns
-------
Dict[str, str]
A dictionary containing default configuration values for GCP project and bucket.
"""
return {
"gcp_project": ProjectId.current(),
"bucket": BUCKET,
}
# Functions migrated from the deprecated beam module
import io
import os
from typing import Any
[docs]
def convert_pdb_to_bcif(pdb_text: str) -> bytes:
"""
Convert PDB format text to Binary CIF format.
Note: This function was migrated from the deprecated beam module.
Parameters
----------
pdb_text : str
The PDB format text to convert.
Returns
-------
bytes
The Binary CIF format data.
"""
try:
from biotite.structure.io.pdb import PDBFile
import biotite.structure.io.pdbx
except ImportError:
logging.getLogger(__name__).error("biotite package is required for convert_pdb_to_bcif function")
raise
pdb_file = io.StringIO(pdb_text)
pdb_file_obj = PDBFile.read(pdb_file)
bcif_file_obj = biotite.structure.io.pdbx.BinaryCIFFile()
biotite.structure.io.pdbx.set_structure(bcif_file_obj, pdb_file_obj.get_structure())
bcif_file = io.BytesIO()
bcif_file_obj.write(bcif_file)
serialized_bcif = bcif_file.getvalue()
bcif_file.close()
return serialized_bcif
[docs]
def filter_file_dict_by_ext(file_dict: Dict[str, Any], extension: str) -> bool:
"""
Filter a file dictionary by file extension.
Note: This function was migrated from the deprecated beam module.
Parameters
----------
file_dict : Dict[str, Any]
The file dictionary to filter.
extension : str
The extension to filter by.
Returns
-------
bool
True if the file has the specified extension, False otherwise.
"""
return file_dict["name"].endswith(extension)
[docs]
class BeamConfigSchemaHandler(ETLConfigSchemaHandler):
"""
Handle schemas for Beam configs.
Note: This class is deprecated and has been renamed to ETLConfigSchemaHandler.
It is kept for backwards compatibility but may be removed in a future version.
"""
def __init__(self):
logging.getLogger(__name__).warning(
"BeamConfigSchemaHandler is deprecated and has been renamed to ETLConfigSchemaHandler."
)
super().__init__()