import copy
from enum import Enum
import logging
from typing import Dict, List, Optional, Tuple, Set
import pandas as pd
from urllib.parse import urljoin
from zcloud.data_enums import MetadataColumns, MetadataTables, ServiceKeys, MondayBoardFields, BenchlingConstants
from zcloud.network_requests import (
get_from_cloud_run,
post_to_cloud_run,
post_to_cloud_run_result,
)
from zcloud.service_enums import APIEndpoint
logger = logging.getLogger(__name__)
[docs]
class OrderUploaderColumns(Enum):
"""Enum for order uploader columns."""
DESIGN_NAME = "design_name"
SEQUENCE = "sequence"
TAG_LOCATION = "tag_location"
ENTITY_REGISTRY_ID = "entity_registry_id"
[docs]
class PossibleProgramIDColumns(Enum):
PROGRAM_ID = "program_id"
BENCHLING_PROGRAM_ID = "benchling_program_id"
DESIGN_PROGRAM_ID = "design_program_id"
MONDAY_PROGRAM_ID = "monday_program_id"
[docs]
class PossibleTargetIDColumns(Enum):
BENCHLING_TARGET_ID = "benchling_target_id"
DESIGN_TARGET_ID = "design_target_id"
MONDAY_TARGET_ID = "monday_target_id"
TARGET_ID = "target_id"
[docs]
class PossibleBindingSiteColumns(Enum):
BINDING_SITE_ID = "binding_site_id"
BENCHLING_BINDING_SITE_ID = "benchling_binding_site_id"
DESIGN_BINDING_SITE_ID = "design_binding_site_id"
TARGET_ID = "target_id"
[docs]
class PossibleFusionIDColumns(Enum):
FUSION_ID = "fusion_id"
BENCHLING_FUSION_ID = "benchling_fusion_id"
DESIGN_FUSION_ID = "design_fusion_id"
[docs]
class PossibleUserColumns(Enum):
USER_ID = "user_id"
USER_EMAIL = "user_email"
USER_FULL_NAME = "user_full_name"
CLUSTER_ID = "cluster_id"
[docs]
def oracle_data_is_empty(oracle_data: Dict[str, list[dict[str, str]]]) -> bool:
"""
Check if the oracle data is empty.
Parameters
----------
oracle_data : Dict[str, list[dict[str, str]]]
Dictionary containing oracle data with string keys and list of dictionaries as values.
Returns
-------
bool
True if the oracle data is empty (no values in any lists), False otherwise.
"""
return not any(oracle_data.values())
[docs]
def check_scu_schema(query_dict: Dict[str, str], all_tables: Optional[Dict[str, List[Dict[str, str]]]] = None) -> Dict[str, List[str]]:
"""
Check Scientific Compute Unit (SCU) schema against available tables.
Parameters
----------
query_dict : Dict[str, str]
Dictionary containing query parameters with field information.
all_tables : Optional[Dict[str, List[Dict[str, str]]]], optional
Pre-loaded table data to avoid API calls. If None, will make API call.
Returns
-------
Dict[str, List[str]]
Dictionary mapping table IDs to lists of found field names.
Raises
------
ValueError
If SCU validation fails with non-200 status code.
"""
if all_tables:
fields = set(query_dict.get(MetadataColumns.FIELD.value) or [])
all_fields_by_table_id = {table_id:set(row.get(MetadataColumns.FIELD.value) for row in all_tables[table_id]) for table_id in all_tables.keys()}
table_id_to_found_fields = {} # table_id -> set of found fields
for field_id_query in fields:
for table_id in all_tables.keys():
if field_id_query in all_fields_by_table_id[table_id]:
table_id_to_found_fields_set = table_id_to_found_fields.get(
table_id, set()
)
table_id_to_found_fields_set.add(field_id_query)
table_id_to_found_fields[table_id] = table_id_to_found_fields_set
break # fields should not be in multiple tables
# Convert sets to lists before JSON serialization
return {
table_id: list(fields) for table_id, fields in table_id_to_found_fields.items()
}
url = urljoin(APIEndpoint.SCU_VALIDATOR_URL.value, "check-scu-schema")
response = post_to_cloud_run(url, query_dict)
if response.status_code != 200:
raise ValueError(
f"Failed to validate {query_dict.keys()} '{query_dict.values()}': {response.text}"
)
return response.json()
[docs]
def get_all_scu_tables() -> Dict[str, List[Dict[str, str]]]:
"""
Retrieve all Scientific Compute Unit (SCU) tables from the service.
Returns
-------
Dict[str, List[Dict[str, str]]]
Dictionary mapping table IDs to table data, or error information if the call fails.
"""
url = urljoin(APIEndpoint.SCU_VALIDATOR_URL.value, "get-all-scu-tables")
try:
response = get_from_cloud_run(url,{})
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error calling SCU service: {e}")
return {"error": [{"success": "False", "error": str(e)}]}
[docs]
def call_benchling_service(payload: Dict) -> Dict:
"""
Call the Benchling updater service with the provided payload.
Parameters
----------
payload : Dict
Payload data to send to the Benchling service.
Returns
-------
Dict
Response from the Benchling service, including success status and any error messages.
"""
url = urljoin(APIEndpoint.BENCHLING_SERVICE_URL.value, "upload-order")
try:
response = post_to_cloud_run(url, payload)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error calling Benchling service: {e}")
return {"success": False, "error": str(e)}
[docs]
def post_to_monday_service(payload: Dict, endpoint: str) -> Dict:
"""
Call the Monday service with the provided payload.
Parameters
----------
payload : Dict
Payload data to send to the Monday service.
endpoint : str
The specific endpoint to call on the Monday service.
Returns
-------
Dict
Response from the Monday service, including success status and any error messages.
"""
url = urljoin(APIEndpoint.MONDAY_SERVICE_URL.value, endpoint)
try:
response = post_to_cloud_run(url, payload)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error calling Monday service: {e}")
return {"success": False, "error": str(e)}
[docs]
def ticket_details_from_monday_service(ticket_url: str) -> Dict:
"""
Get ticket details from the Monday service.
Parameters
----------
ticket_url : str
URL of the Monday ticket to retrieve details for.
Returns
-------
Dict
Dictionary containing ticket details including project, target, and iteration code.
"""
fields = [MondayBoardFields.PROJECT.value, MondayBoardFields.TARGET.value, MondayBoardFields.ITERATION_CODE.value]
payload = {
ServiceKeys.MONDAY_TICKET_URL.value: ticket_url,
ServiceKeys.MONDAY_BOARD_FIELDS.value: fields,
}
response_json = post_to_monday_service(payload, "get-program-ticket-details-from-url")
return response_json
[docs]
def create_benchling_order_folder(program_id: str, target_id: str, iteration: str) -> Dict:
"""
Create a new folder in Benchling for the order.
Parameters
----------
program_id : str
The program ID for the order.
target_id : str
The target ID for the order.
iteration : str
The iteration number for the order.
Returns
-------
Dict
Response containing folder creation details including registry_folder_id and iteration_folder_id.
"""
url = urljoin(
APIEndpoint.BENCHLING_SERVICE_URL.value, "create-benchling-order-folder"
)
payload = {
MetadataColumns.PROGRAM_ID.value: program_id,
MetadataColumns.TARGET_ID.value: target_id,
MetadataColumns.ITERATION_NUMBER.value: iteration,
MetadataColumns.GENERATION_ID.value: BenchlingConstants.BENCHLING_GENERATION_FOLDER_NAME.value,
}
return post_to_cloud_run_result(url, payload)
[docs]
def register_protein_entities(
protein_registry_folder_id: str, small_table_data: List[Dict[str, str]]
) -> Dict:
"""
Register protein entities in Benchling.
Parameters
----------
protein_registry_folder_id : str
The folder ID in Benchling where proteins should be registered.
small_table_data : List[Dict[str, str]]
List of dictionaries containing protein data to register.
Returns
-------
Dict
Response containing registration details including aaSequences with entity registry IDs.
"""
url = urljoin(APIEndpoint.BENCHLING_SERVICE_URL.value, "register-protein-entities")
payload = {
"protein_registry_folder_id": protein_registry_folder_id,
ServiceKeys.ENTITY_REGISTRY_DATA.value: small_table_data,
}
return post_to_cloud_run_result(url, payload)
[docs]
def publish_benchling_entry(benchling_entry_query_dict: Dict) -> Dict:
"""
Create a new entry in Benchling.
Parameters
----------
benchling_entry_query_dict : Dict
Dictionary containing entry data including sequence records, CSV data, entry name,
GCS bucket path, author email, Monday ticket URL, and iteration folder ID.
Returns
-------
Dict
Response from the Benchling service indicating success or failure of entry creation.
"""
url = urljoin(APIEndpoint.BENCHLING_SERVICE_URL.value, "publish-benchling-entry")
return post_to_cloud_run_result(url, benchling_entry_query_dict)
[docs]
def update_monday_ticket(ticket_link: str, benchling_url: str) -> Dict:
"""
Update the Monday ticket with the Benchling URL.
Parameters
----------
ticket_link : str
Link to the Monday ticket to update.
benchling_url : str
URL of the Benchling entry to link to the ticket.
Returns
-------
Dict
Response indicating success or failure of the ticket update.
"""
url = urljoin(APIEndpoint.MONDAY_SERVICE_URL.value, "update-ticket")
payload = {"ticket_link": ticket_link, "benchling_url": benchling_url}
try:
response = post_to_cloud_run(url, payload)
response.raise_for_status()
print(f"Successfully updated Monday ticket {ticket_link}")
return response.json()
except Exception as e:
print(f"Error updating Monday ticket: {e}")
return {"success": False, "error": str(e)}
[docs]
def split_design_names(design_name: str) -> pd.Series:
"""
Parse a design name following the schema: {target_id}_{site name}_{iteration code}_{design number}.
Parameters
----------
design_name : str
Design name to parse in the format target_id_site_name_iteration_code_fusion_combo_number.
Returns
-------
pd.Series
Series containing parsed components: target_id_implied, site_name_implied,
iteration_code_implied, design_number_implied, fusion_id_implied.
"""
(
target_id_implied,
site_name_implied,
iteration_code_implied,
fusion_combo_number_implied,
) = design_name.split("_")
fusion_id_implied = fusion_combo_number_implied[:-3]
design_number_implied = fusion_combo_number_implied[-3:]
return pd.Series(
[
target_id_implied,
site_name_implied,
iteration_code_implied,
design_number_implied,
fusion_id_implied,
],
index=[
OrderUploaderIntermediateColumns.TARGET_ID_IMPLIED.value,
OrderUploaderIntermediateColumns.SITE_NAME_IMPLIED.value,
OrderUploaderIntermediateColumns.ITERATION_CODE_IMPLIED.value,
OrderUploaderIntermediateColumns.DESIGN_NUMBER_IMPLIED.value,
OrderUploaderIntermediateColumns.FUSION_ID_IMPLIED.value,
],
)
[docs]
def pdapply_build_design_names_from_row(
row: pd.Series,
iteration: str,
target_id: str,
binding_site_id: Optional[str] = None,
fusion_id: Optional[str] = None,
override: bool = False,
) -> str:
"""
Build a design name from a row, intended for pandas apply.
Parameters
----------
row : pd.Series
Pandas Series representing a row of data.
iteration : str
Iteration code to use in the design name.
target_id : str
Target ID to use in the design name.
binding_site_id : Optional[str], optional
Binding site ID to use. If None, will try to infer from row data.
fusion_id : Optional[str], optional
Fusion ID to use. If None, will try to infer from row data.
override : bool, optional
If True, use provided parameters directly. If False, try to infer missing values.
Returns
-------
str
Generated design name in the format target_id_binding_site_iteration_fusion_id###.
"""
def safe_row_index(row):
try:
return int(row.name)
except Exception:
raise ValueError(f"Row index {row.name} is not convertible to int.")
if override:
return (
f"{target_id}_{binding_site_id}_{iteration}_{fusion_id}{safe_row_index(row):03d}"
)
else:
binding_site_guess = row.get(
OrderUploaderInputColumns.BINDING_SITE_ID.value,
row.get(
OrderUploaderIntermediateColumns.SITE_NAME_IMPLIED.value,
binding_site_id,
),
)
fusion_id_guess = row.get(
OrderUploaderInputColumns.FUSION_ID.value,
row.get(
OrderUploaderIntermediateColumns.FUSION_ID_IMPLIED.value, fusion_id
),
)
return f"{target_id}_{binding_site_guess}_{iteration}_{fusion_id_guess}{safe_row_index(row):03d}"
[docs]
def get_required_columns() -> List[str]:
"""
Get the required columns for the order uploader.
Returns
-------
List[str]
List of column names that are required for order upload.
"""
return [
OrderUploaderInputColumns.SEQUENCE.value,
OrderUploaderInputColumns.TAG_LOCATION.value,
]
[docs]
def get_small_table_fields() -> List[str]:
"""
Get the fields for the small table (core sequence information).
Returns
-------
List[str]
List of column names that constitute the small table fields.
"""
return [
OrderUploaderInputColumns.DESIGN_NAME.value,
OrderUploaderInputColumns.SEQUENCE.value,
OrderUploaderInputColumns.TAG_LOCATION.value,
]
[docs]
def get_small_table_rename_dict() -> Dict[str, str]:
"""
Get the mapping dictionary for renaming small table columns.
Returns
-------
Dict[str, str]
Dictionary mapping input column names to output column names.
"""
return {
OrderUploaderInputColumns.DESIGN_NAME.value: OrderUploaderColumns.DESIGN_NAME.value,
OrderUploaderInputColumns.SEQUENCE.value: OrderUploaderColumns.SEQUENCE.value,
OrderUploaderInputColumns.TAG_LOCATION.value: OrderUploaderColumns.TAG_LOCATION.value,
}
[docs]
def initialize_df_with_design_name(
df_input: pd.DataFrame,
) -> pd.DataFrame:
"""
Initialize DataFrame with design name parsing and validation.
Parameters
----------
df_input : pd.DataFrame
Input DataFrame containing design names to parse.
Returns
-------
pd.DataFrame
DataFrame with additional columns for parsed design name components and adjusted index.
Raises
------
ValueError
If design name numbers don't match the expected index pattern.
"""
df = copy.deepcopy(df_input)
df[
[
OrderUploaderIntermediateColumns.TARGET_ID_IMPLIED.value,
OrderUploaderIntermediateColumns.SITE_NAME_IMPLIED.value,
OrderUploaderIntermediateColumns.ITERATION_CODE_IMPLIED.value,
OrderUploaderIntermediateColumns.DESIGN_NUMBER_IMPLIED.value,
OrderUploaderIntermediateColumns.FUSION_ID_IMPLIED.value,
]
] = df[OrderUploaderInputColumns.DESIGN_NAME.value].apply(split_design_names)
df.sort_values(
by=OrderUploaderIntermediateColumns.DESIGN_NUMBER_IMPLIED.value,
ascending=True,
inplace=True,
)
df.index = pd.RangeIndex(1, len(df) + 1)
if any(
df.index.values
- df[OrderUploaderIntermediateColumns.DESIGN_NUMBER_IMPLIED.value]
.apply(lambda x: int(x))
.values
):
# check for off by n
logger.warning(
f"There are manually supplied names whose numbers don't match the index. Attempting to adjust index by the smallest design number."
)
adjustment = (
int(
df[OrderUploaderIntermediateColumns.DESIGN_NUMBER_IMPLIED.value].iloc[0]
)
- 1
)
df.index = pd.RangeIndex(1 + adjustment, len(df) + 1 + adjustment)
if any(
df.index.values
- df[OrderUploaderIntermediateColumns.DESIGN_NUMBER_IMPLIED.value]
.apply(lambda x: int(x))
.values
):
raise ValueError(
f"There are manually supplied names whose numbers don't match the index."
)
return df
[docs]
def get_program_id_candidate(
df: pd.DataFrame,
program_id_specified: Optional[str],
allow_cli_override: bool = False,
) -> str:
"""
Get program ID candidate from DataFrame and CLI arguments.
Parameters
----------
df : pd.DataFrame
Input DataFrame that may contain program ID information.
program_id_specified : Optional[str]
Program ID specified via CLI arguments.
allow_cli_override : bool, optional
Whether to allow CLI arguments to override DataFrame values, by default False.
Returns
-------
str
The resolved program ID to use.
Raises
------
ValueError
If multiple conflicting program IDs are found and override is not allowed.
"""
if OrderUploaderInputColumns.PROGRAM_ID.value in df.columns:
program_id_col_ids = set(
df[OrderUploaderInputColumns.PROGRAM_ID.value].values
)
else:
program_id_col_ids = set()
if program_id_specified and allow_cli_override:
program_id_query = program_id_specified
else:
program_id_candidates = set()
if program_id_specified:
program_id_candidates.add(program_id_specified)
if program_id_col_ids:
program_id_candidates.update(program_id_col_ids)
if len(program_id_candidates) > 1:
raise ValueError(
f"Multiple program IDs found from different sources: "
f"CLI argument: {program_id_specified if program_id_specified else 'None'}, "
f"CSV column: {program_id_col_ids if program_id_col_ids else 'None'}. "
f"Use --allow-cli-override to use CLI value."
)
program_id_query = program_id_candidates.pop()
return program_id_query
[docs]
def check_program_id(program_id_query: str, eval_records: Optional[List[Dict[str, str]]] = None, try_to_find_monday_id: Optional[str] = None) -> Tuple[str, str, str]:
"""
Validate program ID against metadata oracle.
Parameters
----------
program_id_query : str
Program ID to validate.
eval_records : Optional[List[Dict[str, str]]], optional
Pre-loaded records to validate against instead of making API call, by default None.
try_to_find_monday_id : Optional[str], optional
Monday ID to try to match against, by default None.
Returns
-------
Tuple[str, str, str]
Tuple containing (program_id_benchling, program_id_design, program_id_monday).
Raises
------
ValueError
If program ID cannot be found in the metadata.
"""
allowed_program_id_column_names = get_possible_columns_names_for_metadata_table(
MetadataTables.PROGRAM_TABLE.value
)
if not eval_records:
query_dict_program_id = {
col_value: program_id_query for col_value in allowed_program_id_column_names
}
query_dict_program_id[ServiceKeys.TABLE_ID.value] = (
MetadataTables.PROGRAM_TABLE.value
)
results = check_allowed_project_metadata(query_dict_program_id) # API call
else:
results = {allowed_program_id_column_names[0]: eval_records} # local records mode
query_dict_program_id = None
program_id_benchling = None
program_id_design = None
program_id_monday = None
for query_col in allowed_program_id_column_names:
these_records = results.get(query_col, [])
if these_records:
for this_record in these_records:
program_id_benchling = this_record.get(
PossibleProgramIDColumns.BENCHLING_PROGRAM_ID.value
)
program_id_design = this_record.get(
PossibleProgramIDColumns.DESIGN_PROGRAM_ID.value
)
program_id_monday = this_record.get(
PossibleProgramIDColumns.MONDAY_PROGRAM_ID.value
)
if try_to_find_monday_id:
if program_id_monday != try_to_find_monday_id:
logger.warning(f"Program ID {program_id_monday} does not match the Monday ID {try_to_find_monday_id}")
logger.warning(f"Checking any additional hits for this program")
continue
break
if not (program_id_benchling and program_id_design):
raise ValueError(
"Unable to find program ID in BigQuery for the given query."
)
return program_id_benchling, program_id_design, program_id_monday or ""
[docs]
def get_target_id_candidate(
df: pd.DataFrame,
target_id_specified: Optional[str],
allow_cli_override: bool = False,
) -> str:
"""
Get target ID candidate from DataFrame and CLI arguments.
Parameters
----------
df : pd.DataFrame
Input DataFrame that may contain target ID information.
target_id_specified : Optional[str]
Target ID specified via CLI arguments.
allow_cli_override : bool, optional
Whether to allow CLI arguments to override DataFrame values, by default False.
Returns
-------
str
The resolved target ID to use.
Raises
------
ValueError
If multiple conflicting target IDs are found and override is not allowed.
"""
name_implied_target_ids = (
set(df[OrderUploaderIntermediateColumns.TARGET_ID_IMPLIED.value].values)
if OrderUploaderInputColumns.DESIGN_NAME.value in df.columns
else set()
)
column_implied_target_ids = (
set(df[OrderUploaderInputColumns.TARGET_ID.value].values)
if OrderUploaderInputColumns.TARGET_ID.value in df.columns
else set()
)
non_cli_candidate_target_ids = name_implied_target_ids.union(
column_implied_target_ids
)
cli_target_id_set = {target_id_specified} if target_id_specified else set()
final_query_target_ids = set()
if allow_cli_override:
final_query_target_ids = (
cli_target_id_set if cli_target_id_set else non_cli_candidate_target_ids
)
elif cli_target_id_set ^ non_cli_candidate_target_ids:
final_query_target_ids = non_cli_candidate_target_ids | cli_target_id_set
else:
raise ValueError(
f"Unable to unambiguously determine target IDs from {OrderUploaderInputColumns.TARGET_ID.value} column and design names: {non_cli_candidate_target_ids} with CLI value {cli_target_id_set}. Use --allow-cli-override to force CLI value."
)
if len(final_query_target_ids) > 1:
raise ValueError(
f"Multiple target IDs found in {OrderUploaderInputColumns.TARGET_ID.value} column: {final_query_target_ids}. Use --allow-cli-override to force CLI value."
)
else:
target_id = final_query_target_ids.pop()
return target_id
[docs]
def check_target_id(
target_id_query: str, allowed_other_ids: Optional[List[str]] = None, eval_records: Optional[List[Dict[str, str]]] = None, try_to_find_monday_id: Optional[str] = None
) -> Tuple[str, str, str, str, str]:
"""
Validate target ID against metadata oracle.
Parameters
----------
target_id_query : str
Target ID to validate.
allowed_other_ids : Optional[List[str]], optional
List of allowed program IDs for cross-validation, by default None.
eval_records : Optional[List[Dict[str, str]]], optional
Pre-loaded records to validate against instead of making API call, by default None.
try_to_find_monday_id : Optional[str], optional
Monday ID to try to match against, by default None.
Returns
-------
Tuple[str, str, str, str, str]
Tuple containing (matching_program_id, matching_target_id_benchling,
matching_target_id_design, matching_target_id_monday, matching_target_id_internal).
Raises
------
UnableToFindMetadataError
If target ID cannot be found in the metadata.
"""
allowed_target_id_column_names = get_possible_columns_names_for_metadata_table(
MetadataTables.TARGET_TABLE.value
)
if not eval_records:
target_id_query_dict = {
col_value: target_id_query for col_value in allowed_target_id_column_names
}
target_id_query_dict[ServiceKeys.TABLE_ID.value] = MetadataTables.TARGET_TABLE.value
target_id_oracle_data = check_allowed_project_metadata(target_id_query_dict) # API call
else:
target_id_oracle_data = {allowed_target_id_column_names[0]: eval_records} # local records mode
target_id_query_dict = None
if oracle_data_is_empty(target_id_oracle_data):
raise UnableToFindMetadataError(
"Unable to find target ID in BigQuery for the given query."
)
matching_program_id = None
matching_target_id_benchling = None
matching_target_id_design = None
matching_target_id_monday = None
matching_target_id_internal = None
allowed_other_set = set(allowed_other_ids) if allowed_other_ids else set()
for query_col in allowed_target_id_column_names:
these_records = target_id_oracle_data.get(query_col, [])
if these_records:
for this_record in these_records:
matching_program_id = this_record.get(
PossibleProgramIDColumns.PROGRAM_ID.value
)
logger.debug(f"Checking record: {this_record}")
if allowed_other_set and matching_program_id not in allowed_other_set:
logger.warning(
f"Target ID {target_id_query} is registered for program {matching_program_id}, which is not in the allowed other IDs {allowed_other_set}"
)
logger.warning(f"Checking any additional hits for this target")
continue
matching_target_id_benchling = this_record.get(
PossibleTargetIDColumns.BENCHLING_TARGET_ID.value
)
matching_target_id_design = this_record.get(
PossibleTargetIDColumns.DESIGN_TARGET_ID.value
)
matching_target_id_monday = this_record.get(
PossibleTargetIDColumns.MONDAY_TARGET_ID.value
)
matching_target_id_internal = this_record.get(
PossibleTargetIDColumns.TARGET_ID.value
)
logger.debug(
f"Matching target ID: {matching_target_id_benchling}, {matching_target_id_design}, {matching_target_id_monday}"
)
if try_to_find_monday_id:
if matching_target_id_monday != try_to_find_monday_id:
logger.warning(f"Target ID {matching_target_id_monday} does not match the Monday ID {try_to_find_monday_id}")
logger.warning(f"Checking any additional hits for this target")
continue
break
for val in [matching_program_id, matching_target_id_benchling, matching_target_id_design, matching_target_id_monday, matching_target_id_internal]:
if val is None:
raise UnableToFindMetadataError("Target ID lookup did not return all required fields.")
# All values are str by this point
return (
str(matching_program_id),
str(matching_target_id_benchling),
str(matching_target_id_design),
str(matching_target_id_monday),
str(matching_target_id_internal),
)
[docs]
def get_binding_site_candidates(
df: pd.DataFrame,
binding_site_id_specified: Optional[str],
allow_cli_override: bool = False,
) -> Set[str]:
"""
Get binding site ID candidates from DataFrame and CLI arguments.
Parameters
----------
df : pd.DataFrame
Input DataFrame that may contain binding site ID information.
binding_site_id_specified : Optional[str]
Binding site ID specified via CLI arguments.
allow_cli_override : bool, optional
Whether to allow CLI arguments to override DataFrame values, by default False.
Returns
-------
Set[str]
Set of resolved binding site IDs to use.
Raises
------
ValueError
If multiple conflicting binding site IDs are found and override is not allowed.
"""
name_implied_binding_sites = (
set(df[OrderUploaderIntermediateColumns.SITE_NAME_IMPLIED.value].values)
if OrderUploaderInputColumns.DESIGN_NAME.value in df.columns
else set()
)
column_implied_binding_sites = (
set(df[OrderUploaderInputColumns.BINDING_SITE_ID.value].values)
if OrderUploaderInputColumns.BINDING_SITE_ID.value in df.columns
else set()
)
non_cli_candidate_binding_sites = name_implied_binding_sites.union(
column_implied_binding_sites
)
cli_binding_site_set = (
{binding_site_id_specified} if binding_site_id_specified else set()
)
final_query_binding_sites = set()
if allow_cli_override:
final_query_binding_sites = (
cli_binding_site_set
if cli_binding_site_set
else non_cli_candidate_binding_sites
)
elif cli_binding_site_set ^ non_cli_candidate_binding_sites:
final_query_binding_sites = non_cli_candidate_binding_sites.union(
cli_binding_site_set
)
else:
raise ValueError(
f"Unable to unambiguously determine binding site IDs from {OrderUploaderInputColumns.BINDING_SITE_ID.value} column and design names: {non_cli_candidate_binding_sites} with CLI value {cli_binding_site_set}. Use --allow-cli-override to force CLI value."
)
return final_query_binding_sites
[docs]
def check_binding_site_id(
binding_site_query: str, allowed_other_ids: Optional[List[str]] = None, eval_records: Optional[List[Dict[str, str]]] = None
) -> Tuple[str, str, str]:
"""
Validate binding site ID against metadata oracle.
Parameters
----------
binding_site_query : str
Binding site ID to validate.
allowed_other_ids : Optional[List[str]], optional
List of allowed target IDs for cross-validation, by default None.
eval_records : Optional[List[Dict[str, str]]], optional
Pre-loaded records to validate against instead of making API call, by default None.
Returns
-------
Tuple[str, str, str]
Tuple containing (matching_target_id, binding_site_id_benchling, binding_site_id_design).
Raises
------
UnableToFindMetadataError
If binding site ID cannot be found in the metadata.
"""
allowed_binding_site_id_column_names = (
get_possible_columns_names_for_metadata_table(
MetadataTables.BINDING_SITE_TABLE.value
)
)
if not eval_records:
binding_site_id_query_dict = {
col_value: binding_site_query
for col_value in allowed_binding_site_id_column_names
}
binding_site_id_query_dict[ServiceKeys.TABLE_ID.value] = (
MetadataTables.BINDING_SITE_TABLE.value
)
binding_site_id_oracle_data = check_allowed_project_metadata(
binding_site_id_query_dict
) # API call
else:
binding_site_id_oracle_data = {allowed_binding_site_id_column_names[0]: eval_records} # local records mode
binding_site_id_query_dict = None
if oracle_data_is_empty(binding_site_id_oracle_data):
raise UnableToFindMetadataError(
"Unable to find binding site ID in BigQuery for the given query."
)
matching_target_id = None
binding_site_id_benchling = None
binding_site_id_design = None
allowed_other_set = set(allowed_other_ids) if allowed_other_ids else set()
for query_col in allowed_binding_site_id_column_names:
these_records = binding_site_id_oracle_data.get(query_col, [])
if these_records:
for this_record in these_records:
matching_target_id = this_record.get(
PossibleBindingSiteColumns.TARGET_ID.value
)
if allowed_other_set and matching_target_id not in allowed_other_set:
logger.warning(
f"Binding site ID {binding_site_query} is registered for target {matching_target_id}, which is not in the allowed target IDs {allowed_other_set}"
)
logger.warning(
f"Checking any additional hits for this binding site"
)
continue
binding_site_id_benchling = this_record.get(
PossibleBindingSiteColumns.BENCHLING_BINDING_SITE_ID.value
)
binding_site_id_design = this_record.get(
PossibleBindingSiteColumns.DESIGN_BINDING_SITE_ID.value
)
break
for val in [matching_target_id, binding_site_id_benchling, binding_site_id_design]:
if val is None:
raise UnableToFindMetadataError("Binding site ID lookup did not return all required fields.")
return str(matching_target_id), str(binding_site_id_benchling), str(binding_site_id_design)
[docs]
def get_iteration_id_candidate(
df: pd.DataFrame,
iteration_number_specified: Optional[str],
allow_cli_override: bool = False,
) -> str:
"""
Get iteration ID candidate from DataFrame and CLI arguments.
Parameters
----------
df : pd.DataFrame
Input DataFrame that may contain iteration number information.
iteration_number_specified : Optional[str]
Iteration number specified via CLI arguments.
allow_cli_override : bool, optional
Whether to allow CLI arguments to override DataFrame values, by default False.
Returns
-------
str
The resolved iteration number to use.
Raises
------
UnableToFindMetadataError
If multiple conflicting iteration numbers are found and override is not allowed.
"""
collected_iteration_nums = set()
if iteration_number_specified and allow_cli_override:
iteration = iteration_number_specified
else:
implied_iteration_nums = (
set(
df[
OrderUploaderIntermediateColumns.ITERATION_CODE_IMPLIED.value
].values
)
if OrderUploaderInputColumns.DESIGN_NAME.value in df.columns
else set()
)
column_iteration_nums = (
set(df[OrderUploaderInputColumns.ITERATION_NUMBER.value].values)
if OrderUploaderInputColumns.ITERATION_NUMBER.value in df.columns
else set()
)
collected_iteration_nums = implied_iteration_nums | column_iteration_nums
if iteration_number_specified:
collected_iteration_nums.add(iteration_number_specified)
if len(collected_iteration_nums) > 1:
raise UnableToFindMetadataError(
f"Multiple iteration numbers found in {OrderUploaderInputColumns.ITERATION_NUMBER.value} column: {collected_iteration_nums}. Use --allow-cli-override to force CLI value."
)
else:
iteration = collected_iteration_nums.pop()
return iteration
[docs]
def check_user_id(user_id: str) -> Tuple[str, str, str, str]:
"""
Check if user ID is valid.
"""
allowed_user_id_column_names = get_possible_columns_names_for_metadata_table(
MetadataTables.USER_TABLE.value
)
user_id_query_dict = {
col_value: user_id for col_value in allowed_user_id_column_names
}
user_id_query_dict[ServiceKeys.TABLE_ID.value] = MetadataTables.USER_TABLE.value
user_id_oracle_data = check_allowed_project_metadata(user_id_query_dict) # API call
if oracle_data_is_empty(user_id_oracle_data):
raise UnableToFindMetadataError(
"Unable to find user ID in BigQuery for the given query."
)
matching_user_id = None
matching_user_email = None
matching_user_full_name = None
matching_cluster_id = None
for query_col in allowed_user_id_column_names:
these_records = user_id_oracle_data.get(query_col, [])
if these_records:
for this_record in these_records:
matching_user_id = this_record.get(
PossibleUserColumns.USER_ID.value
)
matching_user_email = this_record.get(
PossibleUserColumns.USER_EMAIL.value
)
matching_user_full_name = this_record.get(
PossibleUserColumns.USER_FULL_NAME.value
)
matching_cluster_id = this_record.get(
PossibleUserColumns.CLUSTER_ID.value
)
break
for val in [matching_user_id, matching_user_email, matching_user_full_name, matching_cluster_id]:
if val is None:
raise UnableToFindMetadataError("User ID lookup did not return all required fields.")
return str(matching_user_id), str(matching_user_email), str(matching_user_full_name), str(matching_cluster_id)
[docs]
def check_generated_design_names(
df: pd.DataFrame,
generated_design_names: pd.Series,
allow_cli_override: bool = False,
) -> None:
"""
Check if generated design names match existing design names in DataFrame.
Parameters
----------
df : pd.DataFrame
Input DataFrame containing existing design names.
generated_design_names : pd.Series
Series of generated design names to compare against.
allow_cli_override : bool, optional
Whether to allow override and use generated names when there's a mismatch, by default False.
Raises
------
UnableToFindMetadataError
If design names don't match and override is not allowed.
"""
name_discrepancy_mask = (
df[OrderUploaderInputColumns.DESIGN_NAME.value] != generated_design_names
)
discrepancy_in_names = any(name_discrepancy_mask)
if allow_cli_override:
if discrepancy_in_names:
logger.warning(
f"Design names do not match generated design names. Using generated design names because of --allow-cli-override"
)
df[OrderUploaderInputColumns.DESIGN_NAME.value] = generated_design_names
else:
if discrepancy_in_names:
# output the rows that don't match
logger.error(
f"Input csv design names do not match generated design names. Use --allow-cli-override to force CLI value."
)
output_df = df[name_discrepancy_mask][
[
OrderUploaderInputColumns.DESIGN_NAME.value,
]
]
output_df["generated_design_name"] = generated_design_names[
name_discrepancy_mask
]
logger.error("\n" + output_df.to_string())
raise UnableToFindMetadataError(
f"Input csv design names do not match generated design names. Use --allow-cli-override to force CLI value."
)
[docs]
def get_fusion_id_candidates(
df: pd.DataFrame,
fusion_id_specified: Optional[str],
allow_cli_override: bool = False,
) -> Set[str]:
"""
Get candidate fusion IDs from various sources.
Parameters
----------
df : pd.DataFrame
Input DataFrame that may contain fusion ID information.
fusion_id_specified : Optional[str]
Fusion ID specified via CLI arguments.
allow_cli_override : bool, optional
Whether to allow CLI arguments to override DataFrame values, by default False.
Returns
-------
Set[str]
Set of candidate fusion IDs.
Raises
------
ValueError
If multiple conflicting fusion IDs are found and override is not allowed.
"""
column_implied_fusion_ids = set()
if OrderUploaderInputColumns.FUSION_ID.value in df.columns:
column_implied_fusion_ids = set(df[OrderUploaderInputColumns.FUSION_ID.value].values)
name_implied_fusion_ids = set()
if OrderUploaderIntermediateColumns.FUSION_ID_IMPLIED.value in df.columns:
name_implied_unique_fusion_ids = set(df[OrderUploaderIntermediateColumns.FUSION_ID_IMPLIED.value].values)
if len(name_implied_unique_fusion_ids) == 1 and next(iter(name_implied_unique_fusion_ids)) == "":
name_implied_fusion_ids = set()
else:
name_implied_fusion_ids = name_implied_unique_fusion_ids
non_cli_candidate_fusion_ids = name_implied_fusion_ids.union(
column_implied_fusion_ids
)
cli_fusion_id_set = {fusion_id_specified} if fusion_id_specified else set()
final_query_fusion_ids = set()
if allow_cli_override:
final_query_fusion_ids = (
cli_fusion_id_set if cli_fusion_id_set else non_cli_candidate_fusion_ids
)
elif cli_fusion_id_set ^ non_cli_candidate_fusion_ids:
final_query_fusion_ids = non_cli_candidate_fusion_ids | cli_fusion_id_set
elif not cli_fusion_id_set and not non_cli_candidate_fusion_ids:
final_query_fusion_ids = set() # it's okay not to have a fusion ID, unlike the other name properties
else:
raise ValueError(
f"Unable to unambiguously determine fusion IDs from {OrderUploaderInputColumns.FUSION_ID.value} column and design names: {non_cli_candidate_fusion_ids} with CLI value {cli_fusion_id_set}. Use --allow-cli-override to force CLI value."
)
return final_query_fusion_ids
[docs]
def check_fusion_id(
fusion_id_query: str, eval_records: Optional[List[Dict[str, str]]] = None
) -> Tuple[str, str, str]:
"""
Validate a fusion ID against the metadata validator.
Parameters
----------
fusion_id_query : str
Fusion ID to validate.
eval_records : Optional[List[Dict[str, str]]], optional
Pre-loaded records to validate against instead of making API call, by default None.
Returns
-------
Tuple[str, str, str]
Tuple containing (fusion_id_internal, fusion_id_benchling, fusion_id_design).
Raises
------
UnableToFindMetadataError
If fusion ID cannot be found in the metadata.
"""
allowed_fusion_id_column_names = get_possible_columns_names_for_metadata_table(
MetadataTables.FUSION_TABLE.value
)
if not eval_records:
fusion_id_query_dict = {
col_value: fusion_id_query for col_value in allowed_fusion_id_column_names
}
fusion_id_query_dict[ServiceKeys.TABLE_ID.value] = MetadataTables.FUSION_TABLE.value
fusion_id_oracle_data = check_allowed_project_metadata(fusion_id_query_dict) # API call
else:
fusion_id_oracle_data = {allowed_fusion_id_column_names[0]: eval_records} # local records mode
fusion_id_query_dict = None
if oracle_data_is_empty(fusion_id_oracle_data):
raise UnableToFindMetadataError(
"Unable to find fusion ID in BigQuery for the given query."
)
fusion_id_internal = None
fusion_id_benchling = None
fusion_id_design = None
for query_col in allowed_fusion_id_column_names:
these_records = fusion_id_oracle_data.get(query_col, [])
if these_records:
for this_record in these_records:
fusion_id_internal = this_record.get(
PossibleFusionIDColumns.FUSION_ID.value
)
fusion_id_benchling = this_record.get(
PossibleFusionIDColumns.BENCHLING_FUSION_ID.value
)
fusion_id_design = this_record.get(
PossibleFusionIDColumns.DESIGN_FUSION_ID.value
)
break
for val in [fusion_id_internal, fusion_id_benchling, fusion_id_design]:
if val is None:
raise UnableToFindMetadataError("Fusion ID lookup did not return all required fields.")
return str(fusion_id_internal), str(fusion_id_benchling), str(fusion_id_design)