Source code for zcloud.util

"""
This module provides utilities for loading and validating JSON schemas.


Classes
-------

:class:`MetaSchemaHandler`

:class:`SCUManifestSchemaHandler`

Functions
---------

:func:`load_meta_schema`

:func:`load_raw_schema`

Notes
-----

In the future, the module will migrate from using importlib.resources to using a schema validation library.
"""

from importlib import resources
import json
import logging
import os
from typing import Type, Optional, Any, Dict, List

from google.cloud import bigquery
from jsonschema import validate

from zcloud.scu_enums import RawSCURecord
from zcloud.service_enums import ProjectId

logger = logging.getLogger(__name__)


# TODO make these globals in a resource file or something like that
RESOURCES = "zcloud.resources"
AVRO_SCHEMA = "avro_schema"
META_SCHEMA = "meta_schema"
RAW_SCU_SCHEMA = "raw_scu_schema"
SCU_FILE_SCHEMA = "scu_file_schema"
DEFAULT_ETL_SCHEMA_NAME = "etl_pipeline_default"
DEFAULT_BEAM_SCHEMA_NAME = "beam_pipeline_default"  # Deprecated, use DEFAULT_ETL_SCHEMA_NAME instead


BUCKET = "docking-bay-1"


[docs] def list_templates() -> Dict[str, List[str]]: """ List available templates in all supported resource subdirectories. Returns ------- Dict[str, List[str]] A dictionary where keys are resource types (e.g. 'meta_schema', 'raw_scu_schema', etc.) and values are lists of template names (file names without extension). """ template_dirs = { "scu_file_schema": SCU_FILE_SCHEMA, "meta_schema": META_SCHEMA, "raw_scu_schema": RAW_SCU_SCHEMA, "avro_schema": AVRO_SCHEMA, } templates = {} for key, subdir in template_dirs.items(): subdir_path = resources.files(RESOURCES) / subdir if not subdir_path.is_dir(): continue # List files and remove extensions. (Note: for avro schemas the extension is .avsc, # and for the others it is .json.) templates[key] = [ os.path.splitext(item.name)[0] for item in subdir_path.iterdir() if item.is_file() ] return templates
[docs] def list_templates_by_type(template_type: str) -> List[str]: """ List templates under a specific resource subdirectory. Parameters ---------- template_type : str The name of the subdirectory (one of the supported types, e.g. AVRO_SCHEMA). Returns ------- List[str] A list of template names (file names without extension). Raises ------ ValueError If the specified template_type folder is not found. """ subdir_path = resources.files(RESOURCES) / template_type if not subdir_path.is_dir(): raise ValueError(f"Template type '{template_type}' not found.") return [ os.path.splitext(item.name)[0] for item in subdir_path.iterdir() if item.is_file() and not item.name.endswith(".py") ]
[docs] def load_scu_file_schema(name: str) -> Dict[str, Any]: """ Load the schema for an SCU output file This is a utility intended to abstract the indexing of all the sorts of chaotic files that our SCUs might generate. SCUs are only indexed at the file level, and those files sometimes have structured data, sometimes have unstructured data, and sometimes have a mix of both. The schema indexed here are intended to be used to index, validate, and process that data in a more idiomatic way. Any schema validation should be done in the calling function, and the reliability of the schema here is conditional on the responsibility of the person who patched the SCU in. .. note:: This function currently just loads a file from package resources. In the future, it will fetch the schema from a schema registry. Parameters ---------- name : str The name of the schema file to load. Returns ------- Dict[str, Any] The loaded schema as a dictionary. """ schema_path = resources.files(RESOURCES) / SCU_FILE_SCHEMA / f"{name}.json" with schema_path.open("r") as f: return json.load(f)
[docs] def load_meta_schema(name: str) -> Dict[str, Any]: """ Load a meta schema from a JSON file in the package. Parameters ---------- name : str The name of the schema file to load. Returns ------- Dict[str, Any] The loaded schema as a dictionary. """ schema_path = resources.files(RESOURCES) / META_SCHEMA / f"{name}.json" with schema_path.open("r") as f: return json.load(f)
[docs] def load_raw_schema(name: str) -> Dict[str, Any]: """ Load a raw schema from a JSON file in the package. Parameters ---------- name : str The name of the schema file to load. Returns ------- Dict[str, Any] The loaded schema as a dictionary. """ meta_schema_handler = MetaSchemaHandler( "scu_raw" ) # get the generic SCU raw output schema to validate against schema_path = resources.files(RESOURCES) / RAW_SCU_SCHEMA / f"{name}.json" with schema_path.open("r") as f: schema = json.load(f) if schema["name"] != name: logger.warning( f"Schema name {schema['name']} does not match expected name {name}" ) meta_schema_handler.validate(schema) return schema
[docs] def load_avro_schema(name: str) -> Dict[str, Any]: """ Load an Avro schema from a JSON file in the package. Parameters ---------- name : str The name of the schema file to load. Returns ------- Dict[str, Any] The loaded schema as a dictionary. """ schema_path = resources.files(RESOURCES) / AVRO_SCHEMA / f"{name}.avsc" with schema_path.open("r") as f: return json.load(f)
[docs] class MetaSchemaHandler: def __init__(self, name: str): self.schema = load_meta_schema(name) logger.debug(f"Loaded schema: {self.schema}") self.description = "description" self.type = self.schema["type"] self.properties = self.schema["properties"] self.required = self.schema["required"] self.title = self.schema["title"] self.definitions = self.schema.get("definitions", {}) self.meta_keys = set(self.schema.get("x-zip-metadata", {}))
[docs] def validate(self, data: dict): validate(data, self.schema)
[docs] class SCUManifestSchemaHandler(MetaSchemaHandler): """ API sugar to avoid dangling string references to schema fields through code dependencies means that schema changes are less likely to break orphaned code, as long as the zcloud in its environment is up to date. """ def __init__(self): super().__init__("scu_manifest") self.project = "project" self.subproject = "subproject" self.experiment = "experiment" self.authors = "authors" self.compute_type = "compute_type" self.meta = "meta" # distinct from x-zip-metadata, this is upload metadata self.upload_uuid = "upload_uuid" self.upload_timestamp = "upload_timestamp" self.element_files = "element_files" self.files = "files" self.elements = "elements" self.file_path = "name" self.gcloud_uri = "gcloud_uri" self.correlation_id = "correlation_id" # check that the meta fields we just listed, project, subproject,experiment, and authors are in the x-zip-metadata field # This is really more of an integration tests or a sanity check if self.project not in self.meta_keys: raise ValueError( f"Expected field {self.project} not found in x-zip-metadata" ) if self.subproject not in self.meta_keys: raise ValueError( f"Expected field {self.subproject} not found in x-zip-metadata" ) if self.experiment not in self.meta_keys: raise ValueError( f"Expected field {self.experiment} not found in x-zip-metadata" ) if self.authors not in self.meta_keys: raise ValueError( f"Expected field {self.authors} not found in x-zip-metadata" ) # check keys related to upload metadata upload_meta = self.properties.get("meta").get("properties").keys() if self.compute_type not in upload_meta: raise ValueError( f"Expected field {self.compute_type} not found in properties" ) if self.upload_uuid not in upload_meta: raise ValueError( f"Expected field {self.upload_uuid} not found in properties" ) if self.upload_timestamp not in upload_meta: raise ValueError(f"Expected field {self.upload_timestamp} not found in properties") # check keys related to the upload contents if self.element_files not in self.properties: raise ValueError( f"Expected field {self.element_files} not found in properties" ) if ( self.files not in self.properties.get(self.element_files) .get("items") .get("properties") .keys() ): raise ValueError( f"Expected field {self.files} not found in properties of the element_files" ) if self.elements not in self.properties.keys(): raise ValueError(f"Expected field {self.elements} not found in properties") # check keys related to the files if self.file_path not in self.definitions.get("file").get("properties").keys(): raise ValueError( f"Expected field {self.file_path} not found in properties for the 'file' definition" )
[docs] def get_raw_upload_bq_schema_list(self): """ Get a list of dicts compatible with the apache beam bigquery TableFieldSchema API Returns ------- dict The BigQuery schema as a dictionary. """ bq_schema = [ {"name": self.project, "field_type": "STRING", "mode": "NULLABLE"}, {"name": self.subproject, "field_type": "STRING", "mode": "NULLABLE"}, {"name": self.experiment, "field_type": "STRING", "mode": "NULLABLE"}, {"name": self.authors, "field_type": "STRING", "mode": "REPEATED"}, {"name": self.compute_type, "field_type": "STRING", "mode": "REQUIRED"}, {"name": self.upload_uuid, "field_type": "STRING", "mode": "REQUIRED"}, { "name": self.upload_timestamp, "field_type": "TIMESTAMP", "mode": "REQUIRED", }, {"name": self.gcloud_uri, "field_type": "STRING", "mode": "REQUIRED"}, ] return bq_schema
[docs] class SCURecordSchemaHandler(MetaSchemaHandler): def __init__(self): super().__init__("scu_record") self.entity_id = RawSCURecord.ENTITY_ID.value self.gcloud_uri = RawSCURecord.GCS_URI.value self.upload_uuid = RawSCURecord.UPLOAD_UUID.value self.structure_type = RawSCURecord.STRUCTURE_TYPE.value self.correlation_id = RawSCURecord.CORRELATION_ID.value self.upload_timestamp = RawSCURecord.UPLOAD_TIMESTAMP.value self.manifest_uri = RawSCURecord.MANIFEST_URI.value self.compute_type = RawSCURecord.COMPUTE_TYPE.value self.scu_fields = [ self.entity_id, self.gcloud_uri, self.upload_uuid, self.structure_type, self.correlation_id, self.upload_timestamp, self.manifest_uri, self.compute_type, ] self.required_fields = [ self.entity_id, self.gcloud_uri, self.upload_uuid, self.compute_type, ]
[docs] def get_field_names(self): return self.scu_fields
[docs] def create_scu_record( self, entity_id=None, gcs_uri=None, upload_uuid=None, structure_type=None, correlation_id=None, upload_timestamp=None, manifest_uri=None, compute_type=None, **kwargs, ): """ Create an SCU record with explicitly named parameters and optional kwargs. This method provides explicit parameters for IDE autocomplete while also supporting kwargs for backward compatibility and flexibility. kwargs are not implemented yet. Parameters ---------- entity_id : str, optional The entity identifier gcs_uri : str, optional The Google Cloud Storage URI (same as gcloud_uri) upload_uuid : str, optional The upload UUID structure_type : str, optional The structure type correlation_id : str, optional The correlation identifier upload_timestamp : str, optional The upload timestamp manifest_uri : str, optional The manifest URI compute_type : str, optional The compute type **kwargs : dict, optional Additional keyword arguments. Not implemented yet. Returns ------- dict A dictionary containing the SCU record fields. Examples -------- >>> handler = SCURecordSchemaHandler() >>> # Using explicit parameters: >>> record = handler.create_scu_record(entity_id="123", upload_uuid="abc") >>> >>> # Using enum values as kwargs: >>> field_names = handler.get_field_names() >>> record = handler.create_scu_record(**{field_names["entity_id"]: "123"}) """ # Start with explicit parameters result = { self.entity_id: entity_id, self.gcloud_uri: gcs_uri, self.upload_uuid: upload_uuid, self.structure_type: structure_type, self.correlation_id: correlation_id, self.upload_timestamp: upload_timestamp, self.manifest_uri: manifest_uri, self.compute_type: compute_type, } return result
[docs] def get_bq_schema(self) -> List[Dict[str, str]]: """ Get the BigQuery schema for the SCU record. .. note:: This is not a list of bigquery.SchemaField objects, it is a list of dictionaries, each containing the field name, field type, and mode. If you want to use this schema to submit a schema to bigquery, you will need to convert it to a list of bigquery.SchemaField objects. Example: >>> schema = handler.get_bq_schema() >>> schema_fields = [bigquery.SchemaField(\*\*field) for field in schema] #*maintainer note*: stop removing this escape >>> table = bigquery.Table(table_id, schema=schema_fields) >>> table.create(client=client) Returns ------- List[Dict[str, str]] A list of dictionaries, each containing the field name, field type, and mode. """ return [ { "name": field, "field_type": ( "STRING" if field != self.upload_timestamp else "TIMESTAMP" ), "mode": "REQUIRED" if field in self.required_fields else "NULLABLE", } for field in self.scu_fields ]
[docs] class ETLConfigSchemaHandler(MetaSchemaHandler): """ Handle schemas for ETL configs. This class was previously named BeamConfigSchemaHandler but has been renamed to reflect the broader ETL functionality rather than specific Apache Beam functionality. Attributes ---------- transforms : str The name of the transforms field in the schema data : str The name of the data field in the schema loads : str The name of the loads field in the schema transform_type : str The name of the transform type field in transform items loader_type : str The name of the loader type field in load items input_data : str The name of the input data field transform_parameters : str The name of the transform parameters field loader_parameters : str The name of the loader parameters field transform_name : str The name of the transform name field """ def __init__(self): super().__init__(DEFAULT_ETL_SCHEMA_NAME) self.transforms = "transforms" self.data = "data" self.loads = "loads" # Check that the required fields are in the schema properties if self.transforms not in self.properties: raise ValueError( f"Expected field {self.transforms} not found in properties" ) if self.data not in self.properties: raise ValueError(f"Expected field {self.data} not found in properties") if self.loads not in self.properties: raise ValueError(f"Expected field {self.loads} not found in properties") # Check that the required fields are in the schema required list required_fields = self.schema.get("required", []) if self.transforms not in required_fields: raise ValueError( f"Expected field {self.transforms} not found in required fields" ) if self.data not in required_fields: raise ValueError(f"Expected field {self.data} not found in required fields") if self.loads not in required_fields: raise ValueError( f"Expected field {self.loads} not found in required fields" ) self.transform_type = "transform_type" self.loader_type = "loader_type" self.input_data = "input_data" self.transform_parameters = "parameters" self.loader_parameters = "parameters" self.transform_name = "name" # Validate that the important sub-properties exist in the schema transform_properties = ( self.properties.get(self.transforms).get("items").get("properties") ) if self.transform_type not in transform_properties: raise ValueError( f"Expected field {self.transform_type} not found in properties of transforms" ) if self.input_data not in transform_properties: raise ValueError( f"Expected field {self.input_data} not found in properties of transforms" ) if self.transform_parameters not in transform_properties: raise ValueError( f"Expected field {self.transform_parameters} not found in properties of transforms" ) if self.transform_name not in transform_properties: raise ValueError( f"Expected field {self.transform_name} not found in properties of transforms" ) self.data_pipeline_tags = "pipeline_tags" self.data_type = "type" # data_properties = self.properties.get(self.data).get("items").get("oneOf") # if not any(self.data_parameters in item.get("properties", {}) for item in data_properties): # raise ValueError(f"Expected field {self.data_parameters} not found in properties of data items") load_properties = self.properties.get(self.loads).get("items").get("properties") if self.loader_type not in load_properties: raise ValueError( f"Expected field {self.loader_type} not found in properties of loads" ) if self.input_data not in load_properties: raise ValueError( f"Expected field {self.input_data} not found in properties of loads" ) if self.loader_parameters not in load_properties: raise ValueError( f"Expected field {self.loader_parameters} not found in properties of loads" ) self.data_name = "name" self.data_metadata = "metadata" self.data_tags = "tags" if self.data_tags not in self.definitions.get("DataBlockMetadata").get( "properties" ): raise ValueError( f"Expected field {self.data_tags} not found in properties of data block metadata definition" ) self.data_extractor = "extractor" self.extractor_type = "extractor_type" self.extractor_parameters = "parameters"
[docs] class SCUConfigSchemaHandler(MetaSchemaHandler): def __init__(self): # super().__init__("scu_config") # maintainer note: this is deprecated and no scu_config schema exists self.project = "project" self.subproject = "subproject" self.experiment = "experiment" self.authors = "authors" self.bucket = "bucket" self.gcp_project = "gcp_project" # Check that the required fields are in the schema properties required_fields = [ self.project, self.subproject, self.experiment, self.authors, self.bucket, self.gcp_project, ] for field in required_fields: if field not in self.properties: raise ValueError(f"Expected field {field} not found in properties") # Check that the required fields are in the schema required list for field in required_fields: if field not in self.required: raise ValueError(f"Expected field {field} not found in required fields")
[docs] def get_beam_config_schema_handler(): """ Get a BeamConfigSchemaHandler instance. Note: This function is deprecated and has been renamed to get_etl_config_schema_handler. It is kept for backwards compatibility but may be removed in a future version. Returns ------- BeamConfigSchemaHandler An instance of the BeamConfigSchemaHandler class (which inherits from ETLConfigSchemaHandler). """ logging.getLogger(__name__).warning( "get_beam_config_schema_handler is deprecated and has been renamed to get_etl_config_schema_handler." ) return BeamConfigSchemaHandler()
[docs] def get_scu_manifest_handler(): return SCUManifestSchemaHandler()
[docs] def get_scu_config_handler(): return SCUConfigSchemaHandler()
[docs] def get_etl_config_schema_handler(): """ Get an ETLConfigSchemaHandler instance. Returns ------- ETLConfigSchemaHandler An instance of the ETLConfigSchemaHandler class. """ return ETLConfigSchemaHandler()
[docs] def list_from_element_files( manifest_dict: Dict[str, Any], *element_strings: str, scumh: Optional[SCUManifestSchemaHandler] = None ) -> List[Dict[str, str]]: """ Generate a list of file dictionaries from the given manifest dictionary and element strings. Parameters ---------- manifest_dict : dict The manifest dictionary containing metadata and element files. *element_strings : str Variable length argument list of element strings to filter the files. scumh : SCUManifestSchemaHandler, optional An instance of SCUManifestSchemaHandler. If None, a new instance is created using `get_scu_manifest_handler()`. Generally, you should use the default, you would only specify an alternative handler if something is going wrong or you need an old/new schema Returns ------- list of dict A list of dictionaries, each containing the file path and upload UUID. Notes ----- - The function assumes that `manifest_dict` contains keys defined in `scumh`. - The `scumh` parameter is used to access specific keys within the `manifest_dict`. Examples -------- >>> manifest_dict = { ... 'meta': {'upload_uuid': '1234'}, ... 'element_files': [ ... {'name': 'path1', 'files': [{'name': 'file1'}, {'name': 'file2'}]}, ... {'name': 'path2', 'files': [{'name': 'file3'}, {'name': 'file4'}]} ... ] ... } >>> list_from_element_files(manifest_dict, 'path1') [{'name': '1234/file1', 'upload_uuid': '1234'}, {'name': '1234/file2', 'upload_uuid': '1234'}] """ if scumh is None: scumh = get_scu_manifest_handler() uuid = manifest_dict[scumh.meta][scumh.upload_uuid] file_dicts = [] for element in manifest_dict[scumh.element_files]: if element[scumh.file_path] in element_strings: file_dicts.extend( [ { scumh.file_path: f"{os.path.join(uuid,f[scumh.file_path])}", scumh.upload_uuid: uuid, } for f in element[scumh.files] ] ) return file_dicts
# TODO: factory class with registered schema handlers, should be able to get a handler for a given schema name, instead of just named default funcs like get_scu_manifest_handler
[docs] def get_meta_schema_handler(name: str) -> MetaSchemaHandler: """ Get a MetaSchemaHandler instance for a given schema name. Parameters ---------- name : str The name of the schema to get a handler for. Returns ------- MetaSchemaHandler The handler for the schema. May be a derived class of MetaSchemaHandler. Raises ------ NotImplementedError This function is not yet implemented. """ raise NotImplementedError("This function is not yet implemented")
[docs] def create_dataset_if_not_exists(project_id: str, dataset_id: str) -> None: """ Create a BigQuery dataset if it doesn't already exist. Parameters ---------- project_id : str The Google Cloud project ID. dataset_id : str The BigQuery dataset ID. Returns ------- None """ client = bigquery.Client(project=project_id) dataset_ref = bigquery.DatasetReference(project_id, dataset_id) try: client.get_dataset(dataset_ref) logger.info(f"Dataset {dataset_id} already exists in {project_id}") except Exception as e: logger.info(f"Dataset {dataset_id} does not exist in {project_id}, creating it now.") try: dataset = bigquery.Dataset(dataset_ref) client.create_dataset(dataset) logger.info(f"Dataset {dataset_id} created successfully in {project_id}") except Exception as create_error: logger.error(f"Failed to create dataset {dataset_id}: {create_error}") raise
[docs] def get_bq_table_id_from_compute_type( compute_type: str, meta: Dict[str, Any], scumh: Optional[SCUManifestSchemaHandler] = None ) -> str: """ Get a BigQuery table name from a compute type and metadata. Parameters ---------- compute_type : str The compute type. meta : Dict[str, Any] The metadata. scumh : SCUManifestSchemaHandler, optional The SCU manifest schema handler. If None, one will be created. Returns ------- str The BigQuery table name. """ scumh = scumh or get_scu_manifest_handler() project_id = ProjectId.current() if project_id: create_dataset_if_not_exists(project_id, compute_type) return f"{project_id}:{compute_type}.{meta[scumh.upload_uuid]}" else: raise ValueError("Unable to determine project ID for BigQuery table")
[docs] def get_beam_bq_table_from_compute_type( compute_type: str, meta: Dict[str, Any], scumh: Optional[SCUManifestSchemaHandler] = None ) -> str: """ Get a BigQuery table name from a compute type and metadata. Note: This function is deprecated and has been renamed to get_bq_table_id_from_compute_type. It is kept for backwards compatibility but may be removed in a future version. Parameters ---------- compute_type : str The compute type. meta : Dict[str, Any] The metadata. scumh : SCUManifestSchemaHandler, optional The SCU manifest schema handler. If None, one will be created. Returns ------- str The BigQuery table name. """ logging.getLogger(__name__).warning( "get_beam_bq_table_from_compute_type is deprecated and has been renamed to get_bq_table_id_from_compute_type." ) return get_bq_table_id_from_compute_type(compute_type, meta, scumh)
[docs] def available_defaults() -> Dict[str, str]: """ Get a dictionary of default configuration values. Returns ------- Dict[str, str] A dictionary containing default configuration values for GCP project and bucket. """ return { "gcp_project": ProjectId.current(), "bucket": BUCKET, }
# Functions migrated from the deprecated beam module import io import os from typing import Any
[docs] def convert_pdb_to_bcif(pdb_text: str) -> bytes: """ Convert PDB format text to Binary CIF format. Note: This function was migrated from the deprecated beam module. Parameters ---------- pdb_text : str The PDB format text to convert. Returns ------- bytes The Binary CIF format data. """ try: from biotite.structure.io.pdb import PDBFile import biotite.structure.io.pdbx except ImportError: logging.getLogger(__name__).error("biotite package is required for convert_pdb_to_bcif function") raise pdb_file = io.StringIO(pdb_text) pdb_file_obj = PDBFile.read(pdb_file) bcif_file_obj = biotite.structure.io.pdbx.BinaryCIFFile() biotite.structure.io.pdbx.set_structure(bcif_file_obj, pdb_file_obj.get_structure()) bcif_file = io.BytesIO() bcif_file_obj.write(bcif_file) serialized_bcif = bcif_file.getvalue() bcif_file.close() return serialized_bcif
[docs] def filter_file_dict_by_ext(file_dict: Dict[str, Any], extension: str) -> bool: """ Filter a file dictionary by file extension. Note: This function was migrated from the deprecated beam module. Parameters ---------- file_dict : Dict[str, Any] The file dictionary to filter. extension : str The extension to filter by. Returns ------- bool True if the file has the specified extension, False otherwise. """ return file_dict["name"].endswith(extension)
[docs] def extract_rfd_run_files(manifest: Dict[str, Any], scu_mh=None) -> List[Dict[str, Any]]: """ Extract RFD run files from a manifest. Note: This function was migrated from the deprecated beam module. Parameters ---------- manifest : Dict[str, Any] The manifest containing RFD run files. scu_mh : SCUManifestSchemaHandler, optional The SCU manifest schema handler. If None, one will be created. Returns ------- List[Dict[str, Any]] A list of RFD run file dictionaries. """ if scu_mh is None: scu_mh = get_scu_manifest_handler() uuid = manifest[scu_mh.meta][scu_mh.upload_uuid] file_dicts = [] for element in manifest[scu_mh.element_files]: if element["name"] == "rfd_output": file_dicts.extend( [ { "name": f'{os.path.join(uuid,f["name"])}', "upload_uuid": uuid, "rfd_num": f["name"].split("_")[-1].split(".")[0], } for f in element["files"] ] ) return file_dicts
[docs] class BeamConfigSchemaHandler(ETLConfigSchemaHandler): """ Handle schemas for Beam configs. Note: This class is deprecated and has been renamed to ETLConfigSchemaHandler. It is kept for backwards compatibility but may be removed in a future version. """ def __init__(self): logging.getLogger(__name__).warning( "BeamConfigSchemaHandler is deprecated and has been renamed to ETLConfigSchemaHandler." ) super().__init__()