zcloud.util module

This module provides utilities for loading and validating JSON schemas.

Classes

MetaSchemaHandler

SCUManifestSchemaHandler

Functions

load_meta_schema()

load_raw_schema()

Notes

In the future, the module will migrate from using importlib.resources to using a schema validation library.

class zcloud.util.BeamConfigSchemaHandler[source]

Bases: ETLConfigSchemaHandler

Handle schemas for Beam configs.

Note: This class is deprecated and has been renamed to ETLConfigSchemaHandler. It is kept for backwards compatibility but may be removed in a future version.

class zcloud.util.ETLConfigSchemaHandler[source]

Bases: MetaSchemaHandler

Handle schemas for ETL configs.

This class was previously named BeamConfigSchemaHandler but has been renamed to reflect the broader ETL functionality rather than specific Apache Beam functionality.

transforms

The name of the transforms field in the schema

Type:

str

data

The name of the data field in the schema

Type:

str

loads

The name of the loads field in the schema

Type:

str

transform_type

The name of the transform type field in transform items

Type:

str

loader_type

The name of the loader type field in load items

Type:

str

input_data

The name of the input data field

Type:

str

transform_parameters

The name of the transform parameters field

Type:

str

loader_parameters

The name of the loader parameters field

Type:

str

transform_name

The name of the transform name field

Type:

str

class zcloud.util.MetaSchemaHandler(name)[source]

Bases: object

validate(data)[source]
class zcloud.util.SCUConfigSchemaHandler[source]

Bases: MetaSchemaHandler

class zcloud.util.SCUManifestSchemaHandler[source]

Bases: MetaSchemaHandler

API sugar to avoid dangling string references to schema fields through code dependencies

means that schema changes are less likely to break orphaned code, as long as the zcloud in its environment is up to date.

get_raw_upload_bq_schema_list()[source]

Get a list of dicts compatible with the apache beam bigquery TableFieldSchema API

Returns:

The BigQuery schema as a dictionary.

Return type:

dict

class zcloud.util.SCURecordSchemaHandler[source]

Bases: MetaSchemaHandler

create_scu_record(entity_id=None, gcs_uri=None, upload_uuid=None, structure_type=None, correlation_id=None, upload_timestamp=None, manifest_uri=None, compute_type=None, **kwargs)[source]

Create an SCU record with explicitly named parameters and optional kwargs.

This method provides explicit parameters for IDE autocomplete while also supporting kwargs for backward compatibility and flexibility.

kwargs are not implemented yet.

Parameters:
  • entity_id (str, optional) – The entity identifier

  • gcs_uri (str, optional) – The Google Cloud Storage URI (same as gcloud_uri)

  • upload_uuid (str, optional) – The upload UUID

  • structure_type (str, optional) – The structure type

  • correlation_id (str, optional) – The correlation identifier

  • upload_timestamp (str, optional) – The upload timestamp

  • manifest_uri (str, optional) – The manifest URI

  • compute_type (str, optional) – The compute type

  • **kwargs (dict, optional) – Additional keyword arguments. Not implemented yet.

Returns:

A dictionary containing the SCU record fields.

Return type:

dict

Examples

>>> handler = SCURecordSchemaHandler()
>>> # Using explicit parameters:
>>> record = handler.create_scu_record(entity_id="123", upload_uuid="abc")
>>>
>>> # Using enum values as kwargs:
>>> field_names = handler.get_field_names()
>>> record = handler.create_scu_record(**{field_names["entity_id"]: "123"})
get_bq_schema()[source]

Get the BigQuery schema for the SCU record.

Note

This is not a list of bigquery.SchemaField objects, it is a list of dictionaries, each containing the field name, field type, and mode. If you want to use this schema to submit a schema to bigquery, you will need to convert it to a list of bigquery.SchemaField objects.

Example: >>> schema = handler.get_bq_schema() >>> schema_fields = [bigquery.SchemaField(**field) for field in schema] #*maintainer note*: stop removing this escape >>> table = bigquery.Table(table_id, schema=schema_fields) >>> table.create(client=client)

Returns:

A list of dictionaries, each containing the field name, field type, and mode.

Return type:

List[Dict[str, str]]

get_field_names()[source]
zcloud.util.available_defaults()[source]

Get a dictionary of default configuration values.

Returns:

A dictionary containing default configuration values for GCP project and bucket.

Return type:

Dict[str, str]

zcloud.util.convert_pdb_to_bcif(pdb_text)[source]

Convert PDB format text to Binary CIF format.

Note: This function was migrated from the deprecated beam module.

Parameters:

pdb_text (str) – The PDB format text to convert.

Returns:

The Binary CIF format data.

Return type:

bytes

zcloud.util.create_dataset_if_not_exists(project_id, dataset_id)[source]

Create a BigQuery dataset if it doesn’t already exist.

Parameters:
  • project_id (str) – The Google Cloud project ID.

  • dataset_id (str) – The BigQuery dataset ID.

Return type:

None

zcloud.util.extract_rfd_run_files(manifest, scu_mh=None)[source]

Extract RFD run files from a manifest.

Note: This function was migrated from the deprecated beam module.

Parameters:
  • manifest (Dict[str, Any]) – The manifest containing RFD run files.

  • scu_mh (SCUManifestSchemaHandler, optional) – The SCU manifest schema handler. If None, one will be created.

Returns:

A list of RFD run file dictionaries.

Return type:

List[Dict[str, Any]]

zcloud.util.filter_file_dict_by_ext(file_dict, extension)[source]

Filter a file dictionary by file extension.

Note: This function was migrated from the deprecated beam module.

Parameters:
  • file_dict (Dict[str, Any]) – The file dictionary to filter.

  • extension (str) – The extension to filter by.

Returns:

True if the file has the specified extension, False otherwise.

Return type:

bool

zcloud.util.get_beam_bq_table_from_compute_type(compute_type, meta, scumh=None)[source]

Get a BigQuery table name from a compute type and metadata.

Note: This function is deprecated and has been renamed to get_bq_table_id_from_compute_type. It is kept for backwards compatibility but may be removed in a future version.

Parameters:
  • compute_type (str) – The compute type.

  • meta (Dict[str, Any]) – The metadata.

  • scumh (SCUManifestSchemaHandler, optional) – The SCU manifest schema handler. If None, one will be created.

Returns:

The BigQuery table name.

Return type:

str

zcloud.util.get_beam_config_schema_handler()[source]

Get a BeamConfigSchemaHandler instance.

Note: This function is deprecated and has been renamed to get_etl_config_schema_handler. It is kept for backwards compatibility but may be removed in a future version.

Returns:

An instance of the BeamConfigSchemaHandler class (which inherits from ETLConfigSchemaHandler).

Return type:

BeamConfigSchemaHandler

zcloud.util.get_bq_table_id_from_compute_type(compute_type, meta, scumh=None)[source]

Get a BigQuery table name from a compute type and metadata.

Parameters:
  • compute_type (str) – The compute type.

  • meta (Dict[str, Any]) – The metadata.

  • scumh (SCUManifestSchemaHandler, optional) – The SCU manifest schema handler. If None, one will be created.

Returns:

The BigQuery table name.

Return type:

str

zcloud.util.get_etl_config_schema_handler()[source]

Get an ETLConfigSchemaHandler instance.

Returns:

An instance of the ETLConfigSchemaHandler class.

Return type:

ETLConfigSchemaHandler

zcloud.util.get_meta_schema_handler(name)[source]

Get a MetaSchemaHandler instance for a given schema name.

Parameters:

name (str) – The name of the schema to get a handler for.

Returns:

The handler for the schema. May be a derived class of MetaSchemaHandler.

Return type:

MetaSchemaHandler

Raises:

NotImplementedError – This function is not yet implemented.

zcloud.util.get_scu_config_handler()[source]
zcloud.util.get_scu_manifest_handler()[source]
zcloud.util.list_from_element_files(manifest_dict, *element_strings, scumh=None)[source]

Generate a list of file dictionaries from the given manifest dictionary and element strings.

Parameters:
  • manifest_dict (dict) – The manifest dictionary containing metadata and element files.

  • *element_strings (str) – Variable length argument list of element strings to filter the files.

  • scumh (SCUManifestSchemaHandler, optional) – An instance of SCUManifestSchemaHandler. If None, a new instance is created using get_scu_manifest_handler(). Generally, you should use the default, you would only specify an alternative handler if something is going wrong or you need an old/new schema

Returns:

A list of dictionaries, each containing the file path and upload UUID.

Return type:

list of dict

Notes

  • The function assumes that manifest_dict contains keys defined in scumh.

  • The scumh parameter is used to access specific keys within the manifest_dict.

Examples

>>> manifest_dict = {
...     'meta': {'upload_uuid': '1234'},
...     'element_files': [
...         {'name': 'path1', 'files': [{'name': 'file1'}, {'name': 'file2'}]},
...         {'name': 'path2', 'files': [{'name': 'file3'}, {'name': 'file4'}]}
...     ]
... }
>>> list_from_element_files(manifest_dict, 'path1')
[{'name': '1234/file1', 'upload_uuid': '1234'}, {'name': '1234/file2', 'upload_uuid': '1234'}]
zcloud.util.list_templates()[source]

List available templates in all supported resource subdirectories.

Returns:

A dictionary where keys are resource types (e.g. ‘meta_schema’, ‘raw_scu_schema’, etc.) and values are lists of template names (file names without extension).

Return type:

Dict[str, List[str]]

zcloud.util.list_templates_by_type(template_type)[source]

List templates under a specific resource subdirectory.

Parameters:

template_type (str) – The name of the subdirectory (one of the supported types, e.g. AVRO_SCHEMA).

Returns:

A list of template names (file names without extension).

Return type:

List[str]

Raises:

ValueError – If the specified template_type folder is not found.

zcloud.util.load_avro_schema(name)[source]

Load an Avro schema from a JSON file in the package.

Parameters:

name (str) – The name of the schema file to load.

Returns:

The loaded schema as a dictionary.

Return type:

Dict[str, Any]

zcloud.util.load_meta_schema(name)[source]

Load a meta schema from a JSON file in the package.

Parameters:

name (str) – The name of the schema file to load.

Returns:

The loaded schema as a dictionary.

Return type:

Dict[str, Any]

zcloud.util.load_raw_schema(name)[source]

Load a raw schema from a JSON file in the package.

Parameters:

name (str) – The name of the schema file to load.

Returns:

The loaded schema as a dictionary.

Return type:

Dict[str, Any]

zcloud.util.load_scu_file_schema(name)[source]

Load the schema for an SCU output file

This is a utility intended to abstract the indexing of all the sorts of chaotic files that our SCUs might generate. SCUs are only indexed at the file level, and those files sometimes have structured data, sometimes have unstructured data, and sometimes have a mix of both.

The schema indexed here are intended to be used to index, validate, and process that data in a more idiomatic way.

Any schema validation should be done in the calling function, and the reliability of the schema here is conditional on the responsibility of the person who patched the SCU in.

Note

This function currently just loads a file from package resources. In the future, it will fetch the schema from a schema registry.

Parameters:

name (str) – The name of the schema file to load.

Returns:

The loaded schema as a dictionary.

Return type:

Dict[str, Any]