Source code for zcloud.benchling_order

import copy
from enum import Enum
import logging
from typing import Dict, List, Optional, Tuple, Set


import pandas as pd
from urllib.parse import urljoin

from zcloud.data_enums import MetadataColumns, MetadataTables, ServiceKeys, MondayBoardFields, BenchlingConstants
from zcloud.network_requests import (
    get_from_cloud_run,
    post_to_cloud_run,
    post_to_cloud_run_result,
)
from zcloud.service_enums import APIEndpoint


logger = logging.getLogger(__name__)


[docs] class OrderUploaderColumns(Enum): """Enum for order uploader columns.""" DESIGN_NAME = "design_name" SEQUENCE = "sequence" TAG_LOCATION = "tag_location" ENTITY_REGISTRY_ID = "entity_registry_id"
[docs] class OrderUploaderInputColumns(Enum): """Enum for order uploader input columns.""" PROGRAM_ID = "program" TARGET_ID = "target_id" BINDING_SITE_ID = "binding_site" ITERATION_NUMBER = "iteration" FUSION_ID = "fusion_id" DESIGN_NAME = "design_name" SEQUENCE = "sequence" TAG_LOCATION = "tag_location"
[docs] class OrderUploaderIntermediateColumns(Enum): TARGET_ID_IMPLIED = "target_id_implied" SITE_NAME_IMPLIED = "site_name_implied" ITERATION_CODE_IMPLIED = "iteration_code_implied" DESIGN_NUMBER_IMPLIED = "design_number_implied" FUSION_ID_IMPLIED = "fusion_id_implied"
[docs] class PossibleProgramIDColumns(Enum): PROGRAM_ID = "program_id" BENCHLING_PROGRAM_ID = "benchling_program_id" DESIGN_PROGRAM_ID = "design_program_id" MONDAY_PROGRAM_ID = "monday_program_id"
[docs] class PossibleTargetIDColumns(Enum): BENCHLING_TARGET_ID = "benchling_target_id" DESIGN_TARGET_ID = "design_target_id" MONDAY_TARGET_ID = "monday_target_id" TARGET_ID = "target_id"
[docs] class PossibleBindingSiteColumns(Enum): BINDING_SITE_ID = "binding_site_id" BENCHLING_BINDING_SITE_ID = "benchling_binding_site_id" DESIGN_BINDING_SITE_ID = "design_binding_site_id" TARGET_ID = "target_id"
[docs] class PossibleFusionIDColumns(Enum): FUSION_ID = "fusion_id" BENCHLING_FUSION_ID = "benchling_fusion_id" DESIGN_FUSION_ID = "design_fusion_id"
[docs] class PossibleUserColumns(Enum): USER_ID = "user_id" USER_EMAIL = "user_email" USER_FULL_NAME = "user_full_name" CLUSTER_ID = "cluster_id"
[docs] class MetadataTableColumnDispatcher: table2columns = { MetadataTables.FUSION_TABLE.value: [ col.value for col in PossibleFusionIDColumns ], MetadataTables.TARGET_TABLE.value: [ col.value for col in PossibleTargetIDColumns ], MetadataTables.BINDING_SITE_TABLE.value: [ col.value for col in PossibleBindingSiteColumns ], MetadataTables.PROGRAM_TABLE.value: [ col.value for col in PossibleProgramIDColumns ], MetadataTables.USER_TABLE.value: [ col.value for col in PossibleUserColumns ], }
[docs] class UnableToFindMetadataError(ValueError): pass
[docs] def oracle_data_is_empty(oracle_data: Dict[str, list[dict[str, str]]]) -> bool: """ Check if the oracle data is empty. Parameters ---------- oracle_data : Dict[str, list[dict[str, str]]] Dictionary containing oracle data with string keys and list of dictionaries as values. Returns ------- bool True if the oracle data is empty (no values in any lists), False otherwise. """ return not any(oracle_data.values())
[docs] def get_possible_columns_names_for_metadata_table(metadata_table_id: str) -> List[str]: """ Get possible column names for a specific metadata table. Parameters ---------- metadata_table_id : str The ID of the metadata table to get column names for. Returns ------- List[str] List of possible column names for the specified metadata table. """ return MetadataTableColumnDispatcher.table2columns[metadata_table_id]
[docs] def check_allowed_project_metadata(query_dict: Dict[str, str]) -> Dict[str, List[Dict[str, str]]]: """ Validate a metadata entity against BigQuery. Parameters ---------- query_dict : Dict[str, str] Dictionary containing query parameters. For program_id, this is a dict mapping allowed column names to the program_id value. For target_id, this is a list of dicts with benchling_target_id, design_target_id, and monday_target_id. For binding_site_id, this includes binding_site_id and target_id. Returns ------- Dict[str, List[Dict[str, str]]] Dictionary containing matching records from BigQuery. Raises ------ ValueError If validation fails with non-200 status code. """ url = urljoin( APIEndpoint.METADATA_VALIDATOR_URL.value, "check-allowed-project-metadata" ) # query = json.dumps(query_dict) response = get_from_cloud_run(url, query_dict) if response.status_code != 200: raise ValueError( f"Failed to validate {query_dict.keys()} '{query_dict.values()}': {response.text}" ) results = response.json() return results
[docs] def check_scu_schema(query_dict: Dict[str, str], all_tables: Optional[Dict[str, List[Dict[str, str]]]] = None) -> Dict[str, List[str]]: """ Check Scientific Compute Unit (SCU) schema against available tables. Parameters ---------- query_dict : Dict[str, str] Dictionary containing query parameters with field information. all_tables : Optional[Dict[str, List[Dict[str, str]]]], optional Pre-loaded table data to avoid API calls. If None, will make API call. Returns ------- Dict[str, List[str]] Dictionary mapping table IDs to lists of found field names. Raises ------ ValueError If SCU validation fails with non-200 status code. """ if all_tables: fields = set(query_dict.get(MetadataColumns.FIELD.value) or []) all_fields_by_table_id = {table_id:set(row.get(MetadataColumns.FIELD.value) for row in all_tables[table_id]) for table_id in all_tables.keys()} table_id_to_found_fields = {} # table_id -> set of found fields for field_id_query in fields: for table_id in all_tables.keys(): if field_id_query in all_fields_by_table_id[table_id]: table_id_to_found_fields_set = table_id_to_found_fields.get( table_id, set() ) table_id_to_found_fields_set.add(field_id_query) table_id_to_found_fields[table_id] = table_id_to_found_fields_set break # fields should not be in multiple tables # Convert sets to lists before JSON serialization return { table_id: list(fields) for table_id, fields in table_id_to_found_fields.items() } url = urljoin(APIEndpoint.SCU_VALIDATOR_URL.value, "check-scu-schema") response = post_to_cloud_run(url, query_dict) if response.status_code != 200: raise ValueError( f"Failed to validate {query_dict.keys()} '{query_dict.values()}': {response.text}" ) return response.json()
[docs] def get_all_scu_tables() -> Dict[str, List[Dict[str, str]]]: """ Retrieve all Scientific Compute Unit (SCU) tables from the service. Returns ------- Dict[str, List[Dict[str, str]]] Dictionary mapping table IDs to table data, or error information if the call fails. """ url = urljoin(APIEndpoint.SCU_VALIDATOR_URL.value, "get-all-scu-tables") try: response = get_from_cloud_run(url,{}) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Error calling SCU service: {e}") return {"error": [{"success": "False", "error": str(e)}]}
[docs] def call_benchling_service(payload: Dict) -> Dict: """ Call the Benchling updater service with the provided payload. Parameters ---------- payload : Dict Payload data to send to the Benchling service. Returns ------- Dict Response from the Benchling service, including success status and any error messages. """ url = urljoin(APIEndpoint.BENCHLING_SERVICE_URL.value, "upload-order") try: response = post_to_cloud_run(url, payload) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Error calling Benchling service: {e}") return {"success": False, "error": str(e)}
[docs] def post_to_monday_service(payload: Dict, endpoint: str) -> Dict: """ Call the Monday service with the provided payload. Parameters ---------- payload : Dict Payload data to send to the Monday service. endpoint : str The specific endpoint to call on the Monday service. Returns ------- Dict Response from the Monday service, including success status and any error messages. """ url = urljoin(APIEndpoint.MONDAY_SERVICE_URL.value, endpoint) try: response = post_to_cloud_run(url, payload) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Error calling Monday service: {e}") return {"success": False, "error": str(e)}
[docs] def ticket_details_from_monday_service(ticket_url: str) -> Dict: """ Get ticket details from the Monday service. Parameters ---------- ticket_url : str URL of the Monday ticket to retrieve details for. Returns ------- Dict Dictionary containing ticket details including project, target, and iteration code. """ fields = [MondayBoardFields.PROJECT.value, MondayBoardFields.TARGET.value, MondayBoardFields.ITERATION_CODE.value] payload = { ServiceKeys.MONDAY_TICKET_URL.value: ticket_url, ServiceKeys.MONDAY_BOARD_FIELDS.value: fields, } response_json = post_to_monday_service(payload, "get-program-ticket-details-from-url") return response_json
[docs] def create_benchling_order_folder(program_id: str, target_id: str, iteration: str) -> Dict: """ Create a new folder in Benchling for the order. Parameters ---------- program_id : str The program ID for the order. target_id : str The target ID for the order. iteration : str The iteration number for the order. Returns ------- Dict Response containing folder creation details including registry_folder_id and iteration_folder_id. """ url = urljoin( APIEndpoint.BENCHLING_SERVICE_URL.value, "create-benchling-order-folder" ) payload = { MetadataColumns.PROGRAM_ID.value: program_id, MetadataColumns.TARGET_ID.value: target_id, MetadataColumns.ITERATION_NUMBER.value: iteration, MetadataColumns.GENERATION_ID.value: BenchlingConstants.BENCHLING_GENERATION_FOLDER_NAME.value, } return post_to_cloud_run_result(url, payload)
[docs] def register_protein_entities( protein_registry_folder_id: str, small_table_data: List[Dict[str, str]] ) -> Dict: """ Register protein entities in Benchling. Parameters ---------- protein_registry_folder_id : str The folder ID in Benchling where proteins should be registered. small_table_data : List[Dict[str, str]] List of dictionaries containing protein data to register. Returns ------- Dict Response containing registration details including aaSequences with entity registry IDs. """ url = urljoin(APIEndpoint.BENCHLING_SERVICE_URL.value, "register-protein-entities") payload = { "protein_registry_folder_id": protein_registry_folder_id, ServiceKeys.ENTITY_REGISTRY_DATA.value: small_table_data, } return post_to_cloud_run_result(url, payload)
[docs] def publish_benchling_entry(benchling_entry_query_dict: Dict) -> Dict: """ Create a new entry in Benchling. Parameters ---------- benchling_entry_query_dict : Dict Dictionary containing entry data including sequence records, CSV data, entry name, GCS bucket path, author email, Monday ticket URL, and iteration folder ID. Returns ------- Dict Response from the Benchling service indicating success or failure of entry creation. """ url = urljoin(APIEndpoint.BENCHLING_SERVICE_URL.value, "publish-benchling-entry") return post_to_cloud_run_result(url, benchling_entry_query_dict)
[docs] def update_monday_ticket(ticket_link: str, benchling_url: str) -> Dict: """ Update the Monday ticket with the Benchling URL. Parameters ---------- ticket_link : str Link to the Monday ticket to update. benchling_url : str URL of the Benchling entry to link to the ticket. Returns ------- Dict Response indicating success or failure of the ticket update. """ url = urljoin(APIEndpoint.MONDAY_SERVICE_URL.value, "update-ticket") payload = {"ticket_link": ticket_link, "benchling_url": benchling_url} try: response = post_to_cloud_run(url, payload) response.raise_for_status() print(f"Successfully updated Monday ticket {ticket_link}") return response.json() except Exception as e: print(f"Error updating Monday ticket: {e}") return {"success": False, "error": str(e)}
[docs] def split_design_names(design_name: str) -> pd.Series: """ Parse a design name following the schema: {target_id}_{site name}_{iteration code}_{design number}. Parameters ---------- design_name : str Design name to parse in the format target_id_site_name_iteration_code_fusion_combo_number. Returns ------- pd.Series Series containing parsed components: target_id_implied, site_name_implied, iteration_code_implied, design_number_implied, fusion_id_implied. """ ( target_id_implied, site_name_implied, iteration_code_implied, fusion_combo_number_implied, ) = design_name.split("_") fusion_id_implied = fusion_combo_number_implied[:-3] design_number_implied = fusion_combo_number_implied[-3:] return pd.Series( [ target_id_implied, site_name_implied, iteration_code_implied, design_number_implied, fusion_id_implied, ], index=[ OrderUploaderIntermediateColumns.TARGET_ID_IMPLIED.value, OrderUploaderIntermediateColumns.SITE_NAME_IMPLIED.value, OrderUploaderIntermediateColumns.ITERATION_CODE_IMPLIED.value, OrderUploaderIntermediateColumns.DESIGN_NUMBER_IMPLIED.value, OrderUploaderIntermediateColumns.FUSION_ID_IMPLIED.value, ], )
[docs] def pdapply_build_design_names_from_row( row: pd.Series, iteration: str, target_id: str, binding_site_id: Optional[str] = None, fusion_id: Optional[str] = None, override: bool = False, ) -> str: """ Build a design name from a row, intended for pandas apply. Parameters ---------- row : pd.Series Pandas Series representing a row of data. iteration : str Iteration code to use in the design name. target_id : str Target ID to use in the design name. binding_site_id : Optional[str], optional Binding site ID to use. If None, will try to infer from row data. fusion_id : Optional[str], optional Fusion ID to use. If None, will try to infer from row data. override : bool, optional If True, use provided parameters directly. If False, try to infer missing values. Returns ------- str Generated design name in the format target_id_binding_site_iteration_fusion_id###. """ def safe_row_index(row): try: return int(row.name) except Exception: raise ValueError(f"Row index {row.name} is not convertible to int.") if override: return ( f"{target_id}_{binding_site_id}_{iteration}_{fusion_id}{safe_row_index(row):03d}" ) else: binding_site_guess = row.get( OrderUploaderInputColumns.BINDING_SITE_ID.value, row.get( OrderUploaderIntermediateColumns.SITE_NAME_IMPLIED.value, binding_site_id, ), ) fusion_id_guess = row.get( OrderUploaderInputColumns.FUSION_ID.value, row.get( OrderUploaderIntermediateColumns.FUSION_ID_IMPLIED.value, fusion_id ), ) return f"{target_id}_{binding_site_guess}_{iteration}_{fusion_id_guess}{safe_row_index(row):03d}"
[docs] def get_required_columns() -> List[str]: """ Get the required columns for the order uploader. Returns ------- List[str] List of column names that are required for order upload. """ return [ OrderUploaderInputColumns.SEQUENCE.value, OrderUploaderInputColumns.TAG_LOCATION.value, ]
[docs] def get_all_indexed_input_columns() -> List[str]: """ Get all the columns that are indexed in the input CSV. Returns ------- List[str] List of all column names that are part of the indexed input schema. """ return [ OrderUploaderInputColumns.DESIGN_NAME.value, OrderUploaderInputColumns.SEQUENCE.value, OrderUploaderInputColumns.TAG_LOCATION.value, OrderUploaderInputColumns.ITERATION_NUMBER.value, OrderUploaderInputColumns.FUSION_ID.value, OrderUploaderInputColumns.BINDING_SITE_ID.value, OrderUploaderInputColumns.TARGET_ID.value, OrderUploaderInputColumns.PROGRAM_ID.value, ]
[docs] def get_small_table_fields() -> List[str]: """ Get the fields for the small table (core sequence information). Returns ------- List[str] List of column names that constitute the small table fields. """ return [ OrderUploaderInputColumns.DESIGN_NAME.value, OrderUploaderInputColumns.SEQUENCE.value, OrderUploaderInputColumns.TAG_LOCATION.value, ]
[docs] def get_small_table_rename_dict() -> Dict[str, str]: """ Get the mapping dictionary for renaming small table columns. Returns ------- Dict[str, str] Dictionary mapping input column names to output column names. """ return { OrderUploaderInputColumns.DESIGN_NAME.value: OrderUploaderColumns.DESIGN_NAME.value, OrderUploaderInputColumns.SEQUENCE.value: OrderUploaderColumns.SEQUENCE.value, OrderUploaderInputColumns.TAG_LOCATION.value: OrderUploaderColumns.TAG_LOCATION.value, }
[docs] def initialize_df_with_design_name( df_input: pd.DataFrame, ) -> pd.DataFrame: """ Initialize DataFrame with design name parsing and validation. Parameters ---------- df_input : pd.DataFrame Input DataFrame containing design names to parse. Returns ------- pd.DataFrame DataFrame with additional columns for parsed design name components and adjusted index. Raises ------ ValueError If design name numbers don't match the expected index pattern. """ df = copy.deepcopy(df_input) df[ [ OrderUploaderIntermediateColumns.TARGET_ID_IMPLIED.value, OrderUploaderIntermediateColumns.SITE_NAME_IMPLIED.value, OrderUploaderIntermediateColumns.ITERATION_CODE_IMPLIED.value, OrderUploaderIntermediateColumns.DESIGN_NUMBER_IMPLIED.value, OrderUploaderIntermediateColumns.FUSION_ID_IMPLIED.value, ] ] = df[OrderUploaderInputColumns.DESIGN_NAME.value].apply(split_design_names) df.sort_values( by=OrderUploaderIntermediateColumns.DESIGN_NUMBER_IMPLIED.value, ascending=True, inplace=True, ) df.index = pd.RangeIndex(1, len(df) + 1) if any( df.index.values - df[OrderUploaderIntermediateColumns.DESIGN_NUMBER_IMPLIED.value] .apply(lambda x: int(x)) .values ): # check for off by n logger.warning( f"There are manually supplied names whose numbers don't match the index. Attempting to adjust index by the smallest design number." ) adjustment = ( int( df[OrderUploaderIntermediateColumns.DESIGN_NUMBER_IMPLIED.value].iloc[0] ) - 1 ) df.index = pd.RangeIndex(1 + adjustment, len(df) + 1 + adjustment) if any( df.index.values - df[OrderUploaderIntermediateColumns.DESIGN_NUMBER_IMPLIED.value] .apply(lambda x: int(x)) .values ): raise ValueError( f"There are manually supplied names whose numbers don't match the index." ) return df
[docs] def get_program_id_candidate( df: pd.DataFrame, program_id_specified: Optional[str], allow_cli_override: bool = False, ) -> str: """ Get program ID candidate from DataFrame and CLI arguments. Parameters ---------- df : pd.DataFrame Input DataFrame that may contain program ID information. program_id_specified : Optional[str] Program ID specified via CLI arguments. allow_cli_override : bool, optional Whether to allow CLI arguments to override DataFrame values, by default False. Returns ------- str The resolved program ID to use. Raises ------ ValueError If multiple conflicting program IDs are found and override is not allowed. """ if OrderUploaderInputColumns.PROGRAM_ID.value in df.columns: program_id_col_ids = set( df[OrderUploaderInputColumns.PROGRAM_ID.value].values ) else: program_id_col_ids = set() if program_id_specified and allow_cli_override: program_id_query = program_id_specified else: program_id_candidates = set() if program_id_specified: program_id_candidates.add(program_id_specified) if program_id_col_ids: program_id_candidates.update(program_id_col_ids) if len(program_id_candidates) > 1: raise ValueError( f"Multiple program IDs found from different sources: " f"CLI argument: {program_id_specified if program_id_specified else 'None'}, " f"CSV column: {program_id_col_ids if program_id_col_ids else 'None'}. " f"Use --allow-cli-override to use CLI value." ) program_id_query = program_id_candidates.pop() return program_id_query
[docs] def check_program_id(program_id_query: str, eval_records: Optional[List[Dict[str, str]]] = None, try_to_find_monday_id: Optional[str] = None) -> Tuple[str, str, str]: """ Validate program ID against metadata oracle. Parameters ---------- program_id_query : str Program ID to validate. eval_records : Optional[List[Dict[str, str]]], optional Pre-loaded records to validate against instead of making API call, by default None. try_to_find_monday_id : Optional[str], optional Monday ID to try to match against, by default None. Returns ------- Tuple[str, str, str] Tuple containing (program_id_benchling, program_id_design, program_id_monday). Raises ------ ValueError If program ID cannot be found in the metadata. """ allowed_program_id_column_names = get_possible_columns_names_for_metadata_table( MetadataTables.PROGRAM_TABLE.value ) if not eval_records: query_dict_program_id = { col_value: program_id_query for col_value in allowed_program_id_column_names } query_dict_program_id[ServiceKeys.TABLE_ID.value] = ( MetadataTables.PROGRAM_TABLE.value ) results = check_allowed_project_metadata(query_dict_program_id) # API call else: results = {allowed_program_id_column_names[0]: eval_records} # local records mode query_dict_program_id = None program_id_benchling = None program_id_design = None program_id_monday = None for query_col in allowed_program_id_column_names: these_records = results.get(query_col, []) if these_records: for this_record in these_records: program_id_benchling = this_record.get( PossibleProgramIDColumns.BENCHLING_PROGRAM_ID.value ) program_id_design = this_record.get( PossibleProgramIDColumns.DESIGN_PROGRAM_ID.value ) program_id_monday = this_record.get( PossibleProgramIDColumns.MONDAY_PROGRAM_ID.value ) if try_to_find_monday_id: if program_id_monday != try_to_find_monday_id: logger.warning(f"Program ID {program_id_monday} does not match the Monday ID {try_to_find_monday_id}") logger.warning(f"Checking any additional hits for this program") continue break if not (program_id_benchling and program_id_design): raise ValueError( "Unable to find program ID in BigQuery for the given query." ) return program_id_benchling, program_id_design, program_id_monday or ""
[docs] def get_target_id_candidate( df: pd.DataFrame, target_id_specified: Optional[str], allow_cli_override: bool = False, ) -> str: """ Get target ID candidate from DataFrame and CLI arguments. Parameters ---------- df : pd.DataFrame Input DataFrame that may contain target ID information. target_id_specified : Optional[str] Target ID specified via CLI arguments. allow_cli_override : bool, optional Whether to allow CLI arguments to override DataFrame values, by default False. Returns ------- str The resolved target ID to use. Raises ------ ValueError If multiple conflicting target IDs are found and override is not allowed. """ name_implied_target_ids = ( set(df[OrderUploaderIntermediateColumns.TARGET_ID_IMPLIED.value].values) if OrderUploaderInputColumns.DESIGN_NAME.value in df.columns else set() ) column_implied_target_ids = ( set(df[OrderUploaderInputColumns.TARGET_ID.value].values) if OrderUploaderInputColumns.TARGET_ID.value in df.columns else set() ) non_cli_candidate_target_ids = name_implied_target_ids.union( column_implied_target_ids ) cli_target_id_set = {target_id_specified} if target_id_specified else set() final_query_target_ids = set() if allow_cli_override: final_query_target_ids = ( cli_target_id_set if cli_target_id_set else non_cli_candidate_target_ids ) elif cli_target_id_set ^ non_cli_candidate_target_ids: final_query_target_ids = non_cli_candidate_target_ids | cli_target_id_set else: raise ValueError( f"Unable to unambiguously determine target IDs from {OrderUploaderInputColumns.TARGET_ID.value} column and design names: {non_cli_candidate_target_ids} with CLI value {cli_target_id_set}. Use --allow-cli-override to force CLI value." ) if len(final_query_target_ids) > 1: raise ValueError( f"Multiple target IDs found in {OrderUploaderInputColumns.TARGET_ID.value} column: {final_query_target_ids}. Use --allow-cli-override to force CLI value." ) else: target_id = final_query_target_ids.pop() return target_id
[docs] def check_target_id( target_id_query: str, allowed_other_ids: Optional[List[str]] = None, eval_records: Optional[List[Dict[str, str]]] = None, try_to_find_monday_id: Optional[str] = None ) -> Tuple[str, str, str, str, str]: """ Validate target ID against metadata oracle. Parameters ---------- target_id_query : str Target ID to validate. allowed_other_ids : Optional[List[str]], optional List of allowed program IDs for cross-validation, by default None. eval_records : Optional[List[Dict[str, str]]], optional Pre-loaded records to validate against instead of making API call, by default None. try_to_find_monday_id : Optional[str], optional Monday ID to try to match against, by default None. Returns ------- Tuple[str, str, str, str, str] Tuple containing (matching_program_id, matching_target_id_benchling, matching_target_id_design, matching_target_id_monday, matching_target_id_internal). Raises ------ UnableToFindMetadataError If target ID cannot be found in the metadata. """ allowed_target_id_column_names = get_possible_columns_names_for_metadata_table( MetadataTables.TARGET_TABLE.value ) if not eval_records: target_id_query_dict = { col_value: target_id_query for col_value in allowed_target_id_column_names } target_id_query_dict[ServiceKeys.TABLE_ID.value] = MetadataTables.TARGET_TABLE.value target_id_oracle_data = check_allowed_project_metadata(target_id_query_dict) # API call else: target_id_oracle_data = {allowed_target_id_column_names[0]: eval_records} # local records mode target_id_query_dict = None if oracle_data_is_empty(target_id_oracle_data): raise UnableToFindMetadataError( "Unable to find target ID in BigQuery for the given query." ) matching_program_id = None matching_target_id_benchling = None matching_target_id_design = None matching_target_id_monday = None matching_target_id_internal = None allowed_other_set = set(allowed_other_ids) if allowed_other_ids else set() for query_col in allowed_target_id_column_names: these_records = target_id_oracle_data.get(query_col, []) if these_records: for this_record in these_records: matching_program_id = this_record.get( PossibleProgramIDColumns.PROGRAM_ID.value ) logger.debug(f"Checking record: {this_record}") if allowed_other_set and matching_program_id not in allowed_other_set: logger.warning( f"Target ID {target_id_query} is registered for program {matching_program_id}, which is not in the allowed other IDs {allowed_other_set}" ) logger.warning(f"Checking any additional hits for this target") continue matching_target_id_benchling = this_record.get( PossibleTargetIDColumns.BENCHLING_TARGET_ID.value ) matching_target_id_design = this_record.get( PossibleTargetIDColumns.DESIGN_TARGET_ID.value ) matching_target_id_monday = this_record.get( PossibleTargetIDColumns.MONDAY_TARGET_ID.value ) matching_target_id_internal = this_record.get( PossibleTargetIDColumns.TARGET_ID.value ) logger.debug( f"Matching target ID: {matching_target_id_benchling}, {matching_target_id_design}, {matching_target_id_monday}" ) if try_to_find_monday_id: if matching_target_id_monday != try_to_find_monday_id: logger.warning(f"Target ID {matching_target_id_monday} does not match the Monday ID {try_to_find_monday_id}") logger.warning(f"Checking any additional hits for this target") continue break for val in [matching_program_id, matching_target_id_benchling, matching_target_id_design, matching_target_id_monday, matching_target_id_internal]: if val is None: raise UnableToFindMetadataError("Target ID lookup did not return all required fields.") # All values are str by this point return ( str(matching_program_id), str(matching_target_id_benchling), str(matching_target_id_design), str(matching_target_id_monday), str(matching_target_id_internal), )
[docs] def get_binding_site_candidates( df: pd.DataFrame, binding_site_id_specified: Optional[str], allow_cli_override: bool = False, ) -> Set[str]: """ Get binding site ID candidates from DataFrame and CLI arguments. Parameters ---------- df : pd.DataFrame Input DataFrame that may contain binding site ID information. binding_site_id_specified : Optional[str] Binding site ID specified via CLI arguments. allow_cli_override : bool, optional Whether to allow CLI arguments to override DataFrame values, by default False. Returns ------- Set[str] Set of resolved binding site IDs to use. Raises ------ ValueError If multiple conflicting binding site IDs are found and override is not allowed. """ name_implied_binding_sites = ( set(df[OrderUploaderIntermediateColumns.SITE_NAME_IMPLIED.value].values) if OrderUploaderInputColumns.DESIGN_NAME.value in df.columns else set() ) column_implied_binding_sites = ( set(df[OrderUploaderInputColumns.BINDING_SITE_ID.value].values) if OrderUploaderInputColumns.BINDING_SITE_ID.value in df.columns else set() ) non_cli_candidate_binding_sites = name_implied_binding_sites.union( column_implied_binding_sites ) cli_binding_site_set = ( {binding_site_id_specified} if binding_site_id_specified else set() ) final_query_binding_sites = set() if allow_cli_override: final_query_binding_sites = ( cli_binding_site_set if cli_binding_site_set else non_cli_candidate_binding_sites ) elif cli_binding_site_set ^ non_cli_candidate_binding_sites: final_query_binding_sites = non_cli_candidate_binding_sites.union( cli_binding_site_set ) else: raise ValueError( f"Unable to unambiguously determine binding site IDs from {OrderUploaderInputColumns.BINDING_SITE_ID.value} column and design names: {non_cli_candidate_binding_sites} with CLI value {cli_binding_site_set}. Use --allow-cli-override to force CLI value." ) return final_query_binding_sites
[docs] def check_binding_site_id( binding_site_query: str, allowed_other_ids: Optional[List[str]] = None, eval_records: Optional[List[Dict[str, str]]] = None ) -> Tuple[str, str, str]: """ Validate binding site ID against metadata oracle. Parameters ---------- binding_site_query : str Binding site ID to validate. allowed_other_ids : Optional[List[str]], optional List of allowed target IDs for cross-validation, by default None. eval_records : Optional[List[Dict[str, str]]], optional Pre-loaded records to validate against instead of making API call, by default None. Returns ------- Tuple[str, str, str] Tuple containing (matching_target_id, binding_site_id_benchling, binding_site_id_design). Raises ------ UnableToFindMetadataError If binding site ID cannot be found in the metadata. """ allowed_binding_site_id_column_names = ( get_possible_columns_names_for_metadata_table( MetadataTables.BINDING_SITE_TABLE.value ) ) if not eval_records: binding_site_id_query_dict = { col_value: binding_site_query for col_value in allowed_binding_site_id_column_names } binding_site_id_query_dict[ServiceKeys.TABLE_ID.value] = ( MetadataTables.BINDING_SITE_TABLE.value ) binding_site_id_oracle_data = check_allowed_project_metadata( binding_site_id_query_dict ) # API call else: binding_site_id_oracle_data = {allowed_binding_site_id_column_names[0]: eval_records} # local records mode binding_site_id_query_dict = None if oracle_data_is_empty(binding_site_id_oracle_data): raise UnableToFindMetadataError( "Unable to find binding site ID in BigQuery for the given query." ) matching_target_id = None binding_site_id_benchling = None binding_site_id_design = None allowed_other_set = set(allowed_other_ids) if allowed_other_ids else set() for query_col in allowed_binding_site_id_column_names: these_records = binding_site_id_oracle_data.get(query_col, []) if these_records: for this_record in these_records: matching_target_id = this_record.get( PossibleBindingSiteColumns.TARGET_ID.value ) if allowed_other_set and matching_target_id not in allowed_other_set: logger.warning( f"Binding site ID {binding_site_query} is registered for target {matching_target_id}, which is not in the allowed target IDs {allowed_other_set}" ) logger.warning( f"Checking any additional hits for this binding site" ) continue binding_site_id_benchling = this_record.get( PossibleBindingSiteColumns.BENCHLING_BINDING_SITE_ID.value ) binding_site_id_design = this_record.get( PossibleBindingSiteColumns.DESIGN_BINDING_SITE_ID.value ) break for val in [matching_target_id, binding_site_id_benchling, binding_site_id_design]: if val is None: raise UnableToFindMetadataError("Binding site ID lookup did not return all required fields.") return str(matching_target_id), str(binding_site_id_benchling), str(binding_site_id_design)
[docs] def get_iteration_id_candidate( df: pd.DataFrame, iteration_number_specified: Optional[str], allow_cli_override: bool = False, ) -> str: """ Get iteration ID candidate from DataFrame and CLI arguments. Parameters ---------- df : pd.DataFrame Input DataFrame that may contain iteration number information. iteration_number_specified : Optional[str] Iteration number specified via CLI arguments. allow_cli_override : bool, optional Whether to allow CLI arguments to override DataFrame values, by default False. Returns ------- str The resolved iteration number to use. Raises ------ UnableToFindMetadataError If multiple conflicting iteration numbers are found and override is not allowed. """ collected_iteration_nums = set() if iteration_number_specified and allow_cli_override: iteration = iteration_number_specified else: implied_iteration_nums = ( set( df[ OrderUploaderIntermediateColumns.ITERATION_CODE_IMPLIED.value ].values ) if OrderUploaderInputColumns.DESIGN_NAME.value in df.columns else set() ) column_iteration_nums = ( set(df[OrderUploaderInputColumns.ITERATION_NUMBER.value].values) if OrderUploaderInputColumns.ITERATION_NUMBER.value in df.columns else set() ) collected_iteration_nums = implied_iteration_nums | column_iteration_nums if iteration_number_specified: collected_iteration_nums.add(iteration_number_specified) if len(collected_iteration_nums) > 1: raise UnableToFindMetadataError( f"Multiple iteration numbers found in {OrderUploaderInputColumns.ITERATION_NUMBER.value} column: {collected_iteration_nums}. Use --allow-cli-override to force CLI value." ) else: iteration = collected_iteration_nums.pop() return iteration
[docs] def check_user_id(user_id: str) -> Tuple[str, str, str, str]: """ Check if user ID is valid. """ allowed_user_id_column_names = get_possible_columns_names_for_metadata_table( MetadataTables.USER_TABLE.value ) user_id_query_dict = { col_value: user_id for col_value in allowed_user_id_column_names } user_id_query_dict[ServiceKeys.TABLE_ID.value] = MetadataTables.USER_TABLE.value user_id_oracle_data = check_allowed_project_metadata(user_id_query_dict) # API call if oracle_data_is_empty(user_id_oracle_data): raise UnableToFindMetadataError( "Unable to find user ID in BigQuery for the given query." ) matching_user_id = None matching_user_email = None matching_user_full_name = None matching_cluster_id = None for query_col in allowed_user_id_column_names: these_records = user_id_oracle_data.get(query_col, []) if these_records: for this_record in these_records: matching_user_id = this_record.get( PossibleUserColumns.USER_ID.value ) matching_user_email = this_record.get( PossibleUserColumns.USER_EMAIL.value ) matching_user_full_name = this_record.get( PossibleUserColumns.USER_FULL_NAME.value ) matching_cluster_id = this_record.get( PossibleUserColumns.CLUSTER_ID.value ) break for val in [matching_user_id, matching_user_email, matching_user_full_name, matching_cluster_id]: if val is None: raise UnableToFindMetadataError("User ID lookup did not return all required fields.") return str(matching_user_id), str(matching_user_email), str(matching_user_full_name), str(matching_cluster_id)
[docs] def check_generated_design_names( df: pd.DataFrame, generated_design_names: pd.Series, allow_cli_override: bool = False, ) -> None: """ Check if generated design names match existing design names in DataFrame. Parameters ---------- df : pd.DataFrame Input DataFrame containing existing design names. generated_design_names : pd.Series Series of generated design names to compare against. allow_cli_override : bool, optional Whether to allow override and use generated names when there's a mismatch, by default False. Raises ------ UnableToFindMetadataError If design names don't match and override is not allowed. """ name_discrepancy_mask = ( df[OrderUploaderInputColumns.DESIGN_NAME.value] != generated_design_names ) discrepancy_in_names = any(name_discrepancy_mask) if allow_cli_override: if discrepancy_in_names: logger.warning( f"Design names do not match generated design names. Using generated design names because of --allow-cli-override" ) df[OrderUploaderInputColumns.DESIGN_NAME.value] = generated_design_names else: if discrepancy_in_names: # output the rows that don't match logger.error( f"Input csv design names do not match generated design names. Use --allow-cli-override to force CLI value." ) output_df = df[name_discrepancy_mask][ [ OrderUploaderInputColumns.DESIGN_NAME.value, ] ] output_df["generated_design_name"] = generated_design_names[ name_discrepancy_mask ] logger.error("\n" + output_df.to_string()) raise UnableToFindMetadataError( f"Input csv design names do not match generated design names. Use --allow-cli-override to force CLI value." )
[docs] def get_fusion_id_candidates( df: pd.DataFrame, fusion_id_specified: Optional[str], allow_cli_override: bool = False, ) -> Set[str]: """ Get candidate fusion IDs from various sources. Parameters ---------- df : pd.DataFrame Input DataFrame that may contain fusion ID information. fusion_id_specified : Optional[str] Fusion ID specified via CLI arguments. allow_cli_override : bool, optional Whether to allow CLI arguments to override DataFrame values, by default False. Returns ------- Set[str] Set of candidate fusion IDs. Raises ------ ValueError If multiple conflicting fusion IDs are found and override is not allowed. """ column_implied_fusion_ids = set() if OrderUploaderInputColumns.FUSION_ID.value in df.columns: column_implied_fusion_ids = set(df[OrderUploaderInputColumns.FUSION_ID.value].values) name_implied_fusion_ids = set() if OrderUploaderIntermediateColumns.FUSION_ID_IMPLIED.value in df.columns: name_implied_unique_fusion_ids = set(df[OrderUploaderIntermediateColumns.FUSION_ID_IMPLIED.value].values) if len(name_implied_unique_fusion_ids) == 1 and next(iter(name_implied_unique_fusion_ids)) == "": name_implied_fusion_ids = set() else: name_implied_fusion_ids = name_implied_unique_fusion_ids non_cli_candidate_fusion_ids = name_implied_fusion_ids.union( column_implied_fusion_ids ) cli_fusion_id_set = {fusion_id_specified} if fusion_id_specified else set() final_query_fusion_ids = set() if allow_cli_override: final_query_fusion_ids = ( cli_fusion_id_set if cli_fusion_id_set else non_cli_candidate_fusion_ids ) elif cli_fusion_id_set ^ non_cli_candidate_fusion_ids: final_query_fusion_ids = non_cli_candidate_fusion_ids | cli_fusion_id_set elif not cli_fusion_id_set and not non_cli_candidate_fusion_ids: final_query_fusion_ids = set() # it's okay not to have a fusion ID, unlike the other name properties else: raise ValueError( f"Unable to unambiguously determine fusion IDs from {OrderUploaderInputColumns.FUSION_ID.value} column and design names: {non_cli_candidate_fusion_ids} with CLI value {cli_fusion_id_set}. Use --allow-cli-override to force CLI value." ) return final_query_fusion_ids
[docs] def check_fusion_id( fusion_id_query: str, eval_records: Optional[List[Dict[str, str]]] = None ) -> Tuple[str, str, str]: """ Validate a fusion ID against the metadata validator. Parameters ---------- fusion_id_query : str Fusion ID to validate. eval_records : Optional[List[Dict[str, str]]], optional Pre-loaded records to validate against instead of making API call, by default None. Returns ------- Tuple[str, str, str] Tuple containing (fusion_id_internal, fusion_id_benchling, fusion_id_design). Raises ------ UnableToFindMetadataError If fusion ID cannot be found in the metadata. """ allowed_fusion_id_column_names = get_possible_columns_names_for_metadata_table( MetadataTables.FUSION_TABLE.value ) if not eval_records: fusion_id_query_dict = { col_value: fusion_id_query for col_value in allowed_fusion_id_column_names } fusion_id_query_dict[ServiceKeys.TABLE_ID.value] = MetadataTables.FUSION_TABLE.value fusion_id_oracle_data = check_allowed_project_metadata(fusion_id_query_dict) # API call else: fusion_id_oracle_data = {allowed_fusion_id_column_names[0]: eval_records} # local records mode fusion_id_query_dict = None if oracle_data_is_empty(fusion_id_oracle_data): raise UnableToFindMetadataError( "Unable to find fusion ID in BigQuery for the given query." ) fusion_id_internal = None fusion_id_benchling = None fusion_id_design = None for query_col in allowed_fusion_id_column_names: these_records = fusion_id_oracle_data.get(query_col, []) if these_records: for this_record in these_records: fusion_id_internal = this_record.get( PossibleFusionIDColumns.FUSION_ID.value ) fusion_id_benchling = this_record.get( PossibleFusionIDColumns.BENCHLING_FUSION_ID.value ) fusion_id_design = this_record.get( PossibleFusionIDColumns.DESIGN_FUSION_ID.value ) break for val in [fusion_id_internal, fusion_id_benchling, fusion_id_design]: if val is None: raise UnableToFindMetadataError("Fusion ID lookup did not return all required fields.") return str(fusion_id_internal), str(fusion_id_benchling), str(fusion_id_design)
[docs] def get_full_metadata_tables(*table_ids: str) -> Dict: """ Get complete metadata tables for the specified table IDs. Parameters ---------- *table_ids : str Variable number of table IDs to retrieve. Returns ------- Dict Dictionary containing the full metadata tables data. """ url = urljoin( APIEndpoint.METADATA_VALIDATOR_URL.value, "get-whole-project-metadata-table" ) payload = {ServiceKeys.COMMA_SEPARATED_TABLE_IDS.value: ",".join(table_ids)} return post_to_cloud_run_result(url, payload)