zcloud.util module
This module provides utilities for loading and validating JSON schemas.
Classes
MetaSchemaHandler
SCUManifestSchemaHandler
Functions
load_meta_schema()
load_raw_schema()
Notes
In the future, the module will migrate from using importlib.resources to using a schema validation library.
- class zcloud.util.BeamConfigSchemaHandler[source]
Bases:
ETLConfigSchemaHandlerHandle schemas for Beam configs.
Note: This class is deprecated and has been renamed to ETLConfigSchemaHandler. It is kept for backwards compatibility but may be removed in a future version.
- class zcloud.util.ETLConfigSchemaHandler[source]
Bases:
MetaSchemaHandlerHandle schemas for ETL configs.
This class was previously named BeamConfigSchemaHandler but has been renamed to reflect the broader ETL functionality rather than specific Apache Beam functionality.
- transforms
The name of the transforms field in the schema
- Type:
str
- data
The name of the data field in the schema
- Type:
str
- loads
The name of the loads field in the schema
- Type:
str
- transform_type
The name of the transform type field in transform items
- Type:
str
- loader_type
The name of the loader type field in load items
- Type:
str
- input_data
The name of the input data field
- Type:
str
- transform_parameters
The name of the transform parameters field
- Type:
str
- loader_parameters
The name of the loader parameters field
- Type:
str
- transform_name
The name of the transform name field
- Type:
str
- class zcloud.util.SCUConfigSchemaHandler[source]
Bases:
MetaSchemaHandler
- class zcloud.util.SCUManifestSchemaHandler[source]
Bases:
MetaSchemaHandlerAPI sugar to avoid dangling string references to schema fields through code dependencies
means that schema changes are less likely to break orphaned code, as long as the zcloud in its environment is up to date.
- get_raw_upload_bq_schema_list()[source]
Get a list of dicts compatible with the apache beam bigquery TableFieldSchema API
- Returns:
The BigQuery schema as a dictionary.
- Return type:
dict
- class zcloud.util.SCURecordSchemaHandler[source]
Bases:
MetaSchemaHandler- create_scu_record(entity_id=None, gcs_uri=None, upload_uuid=None, structure_type=None, correlation_id=None, upload_timestamp=None, manifest_uri=None, compute_type=None, **kwargs)[source]
Create an SCU record with explicitly named parameters and optional kwargs.
This method provides explicit parameters for IDE autocomplete while also supporting kwargs for backward compatibility and flexibility.
kwargs are not implemented yet.
- Parameters:
entity_id (str, optional) – The entity identifier
gcs_uri (str, optional) – The Google Cloud Storage URI (same as gcloud_uri)
upload_uuid (str, optional) – The upload UUID
structure_type (str, optional) – The structure type
correlation_id (str, optional) – The correlation identifier
upload_timestamp (str, optional) – The upload timestamp
manifest_uri (str, optional) – The manifest URI
compute_type (str, optional) – The compute type
**kwargs (dict, optional) – Additional keyword arguments. Not implemented yet.
- Returns:
A dictionary containing the SCU record fields.
- Return type:
dict
Examples
>>> handler = SCURecordSchemaHandler() >>> # Using explicit parameters: >>> record = handler.create_scu_record(entity_id="123", upload_uuid="abc") >>> >>> # Using enum values as kwargs: >>> field_names = handler.get_field_names() >>> record = handler.create_scu_record(**{field_names["entity_id"]: "123"})
- get_bq_schema()[source]
Get the BigQuery schema for the SCU record.
Note
This is not a list of bigquery.SchemaField objects, it is a list of dictionaries, each containing the field name, field type, and mode. If you want to use this schema to submit a schema to bigquery, you will need to convert it to a list of bigquery.SchemaField objects.
Example: >>> schema = handler.get_bq_schema() >>> schema_fields = [bigquery.SchemaField(**field) for field in schema] #*maintainer note*: stop removing this escape >>> table = bigquery.Table(table_id, schema=schema_fields) >>> table.create(client=client)
- Returns:
A list of dictionaries, each containing the field name, field type, and mode.
- Return type:
List[Dict[str, str]]
- get_field_names()[source]
- zcloud.util.available_defaults()[source]
Get a dictionary of default configuration values.
- Returns:
A dictionary containing default configuration values for GCP project and bucket.
- Return type:
Dict[str, str]
- zcloud.util.convert_pdb_to_bcif(pdb_text)[source]
Convert PDB format text to Binary CIF format.
Note: This function was migrated from the deprecated beam module.
- Parameters:
pdb_text (str) – The PDB format text to convert.
- Returns:
The Binary CIF format data.
- Return type:
bytes
- zcloud.util.create_dataset_if_not_exists(project_id, dataset_id)[source]
Create a BigQuery dataset if it doesn’t already exist.
- Parameters:
project_id (str) – The Google Cloud project ID.
dataset_id (str) – The BigQuery dataset ID.
- Return type:
None
- zcloud.util.extract_rfd_run_files(manifest, scu_mh=None)[source]
Extract RFD run files from a manifest.
Note: This function was migrated from the deprecated beam module.
- Parameters:
manifest (Dict[str, Any]) – The manifest containing RFD run files.
scu_mh (SCUManifestSchemaHandler, optional) – The SCU manifest schema handler. If None, one will be created.
- Returns:
A list of RFD run file dictionaries.
- Return type:
List[Dict[str, Any]]
- zcloud.util.filter_file_dict_by_ext(file_dict, extension)[source]
Filter a file dictionary by file extension.
Note: This function was migrated from the deprecated beam module.
- Parameters:
file_dict (Dict[str, Any]) – The file dictionary to filter.
extension (str) – The extension to filter by.
- Returns:
True if the file has the specified extension, False otherwise.
- Return type:
bool
- zcloud.util.get_beam_bq_table_from_compute_type(compute_type, meta, scumh=None)[source]
Get a BigQuery table name from a compute type and metadata.
Note: This function is deprecated and has been renamed to get_bq_table_id_from_compute_type. It is kept for backwards compatibility but may be removed in a future version.
- Parameters:
compute_type (str) – The compute type.
meta (Dict[str, Any]) – The metadata.
scumh (SCUManifestSchemaHandler, optional) – The SCU manifest schema handler. If None, one will be created.
- Returns:
The BigQuery table name.
- Return type:
str
- zcloud.util.get_beam_config_schema_handler()[source]
Get a BeamConfigSchemaHandler instance.
Note: This function is deprecated and has been renamed to get_etl_config_schema_handler. It is kept for backwards compatibility but may be removed in a future version.
- Returns:
An instance of the BeamConfigSchemaHandler class (which inherits from ETLConfigSchemaHandler).
- Return type:
BeamConfigSchemaHandler
- zcloud.util.get_bq_table_id_from_compute_type(compute_type, meta, scumh=None)[source]
Get a BigQuery table name from a compute type and metadata.
- Parameters:
compute_type (str) – The compute type.
meta (Dict[str, Any]) – The metadata.
scumh (SCUManifestSchemaHandler, optional) – The SCU manifest schema handler. If None, one will be created.
- Returns:
The BigQuery table name.
- Return type:
str
- zcloud.util.get_etl_config_schema_handler()[source]
Get an ETLConfigSchemaHandler instance.
- Returns:
An instance of the ETLConfigSchemaHandler class.
- Return type:
ETLConfigSchemaHandler
- zcloud.util.get_meta_schema_handler(name)[source]
Get a MetaSchemaHandler instance for a given schema name.
- Parameters:
name (str) – The name of the schema to get a handler for.
- Returns:
The handler for the schema. May be a derived class of MetaSchemaHandler.
- Return type:
MetaSchemaHandler
- Raises:
NotImplementedError – This function is not yet implemented.
- zcloud.util.get_scu_config_handler()[source]
- zcloud.util.get_scu_manifest_handler()[source]
- zcloud.util.list_from_element_files(manifest_dict, *element_strings, scumh=None)[source]
Generate a list of file dictionaries from the given manifest dictionary and element strings.
- Parameters:
manifest_dict (dict) – The manifest dictionary containing metadata and element files.
*element_strings (str) – Variable length argument list of element strings to filter the files.
scumh (SCUManifestSchemaHandler, optional) – An instance of SCUManifestSchemaHandler. If None, a new instance is created using get_scu_manifest_handler(). Generally, you should use the default, you would only specify an alternative handler if something is going wrong or you need an old/new schema
- Returns:
A list of dictionaries, each containing the file path and upload UUID.
- Return type:
list of dict
Notes
The function assumes that manifest_dict contains keys defined in scumh.
The scumh parameter is used to access specific keys within the manifest_dict.
Examples
>>> manifest_dict = { ... 'meta': {'upload_uuid': '1234'}, ... 'element_files': [ ... {'name': 'path1', 'files': [{'name': 'file1'}, {'name': 'file2'}]}, ... {'name': 'path2', 'files': [{'name': 'file3'}, {'name': 'file4'}]} ... ] ... } >>> list_from_element_files(manifest_dict, 'path1') [{'name': '1234/file1', 'upload_uuid': '1234'}, {'name': '1234/file2', 'upload_uuid': '1234'}]
- zcloud.util.list_templates()[source]
List available templates in all supported resource subdirectories.
- Returns:
A dictionary where keys are resource types (e.g. ‘meta_schema’, ‘raw_scu_schema’, etc.) and values are lists of template names (file names without extension).
- Return type:
Dict[str, List[str]]
- zcloud.util.list_templates_by_type(template_type)[source]
List templates under a specific resource subdirectory.
- Parameters:
template_type (str) – The name of the subdirectory (one of the supported types, e.g. AVRO_SCHEMA).
- Returns:
A list of template names (file names without extension).
- Return type:
List[str]
- Raises:
ValueError – If the specified template_type folder is not found.
- zcloud.util.load_avro_schema(name)[source]
Load an Avro schema from a JSON file in the package.
- Parameters:
name (str) – The name of the schema file to load.
- Returns:
The loaded schema as a dictionary.
- Return type:
Dict[str, Any]
- zcloud.util.load_meta_schema(name)[source]
Load a meta schema from a JSON file in the package.
- Parameters:
name (str) – The name of the schema file to load.
- Returns:
The loaded schema as a dictionary.
- Return type:
Dict[str, Any]
- zcloud.util.load_raw_schema(name)[source]
Load a raw schema from a JSON file in the package.
- Parameters:
name (str) – The name of the schema file to load.
- Returns:
The loaded schema as a dictionary.
- Return type:
Dict[str, Any]
- zcloud.util.load_scu_file_schema(name)[source]
Load the schema for an SCU output file
This is a utility intended to abstract the indexing of all the sorts of chaotic files that our SCUs might generate. SCUs are only indexed at the file level, and those files sometimes have structured data, sometimes have unstructured data, and sometimes have a mix of both.
The schema indexed here are intended to be used to index, validate, and process that data in a more idiomatic way.
Any schema validation should be done in the calling function, and the reliability of the schema here is conditional on the responsibility of the person who patched the SCU in.
Note
This function currently just loads a file from package resources. In the future, it will fetch the schema from a schema registry.
- Parameters:
name (str) – The name of the schema file to load.
- Returns:
The loaded schema as a dictionary.
- Return type:
Dict[str, Any]