import logging
from io import BytesIO
import os
import json
from typing import Optional
from flask import jsonify
from google.api_core.exceptions import NotFound
from google.auth.credentials import AnonymousCredentials
from google.api_core.client_options import ClientOptions
from google.cloud import bigquery,storage
from google.cloud import redis_v1
from google.cloud.pubsub import SchemaServiceClient
import pandas as pd
import redis
from zcloud.pubsub import Schema
from zcloud.pubsub.messaging import unpack_envelope
from zcloud.service_enums import MessageStatus, HTTPStatus, HTTPResponse, ProjectId
logger = logging.getLogger(__name__)
HARDCODED_REDIS_INSTANCE_ID = "cloud-test-redis"
REDIS_TEST_ENV_VAR="REDIS_TEST_HOST"
BQ_TEST_ENV_VAR="BQ_TEST_PROJECT"
BQ_TEST_ENDPOINT_ENV_VAR="BQ_TEST_ENDPOINT"
STORAGE_TEST_ENV_VAR="STORAGE_TEST_HOST"
[docs]
def get_redis_instance_endpoint(*args, **kwargs) -> tuple[str, int]:
if REDIS_TEST_ENV_VAR in os.environ:
if os.environ[REDIS_TEST_ENV_VAR] == "true":
redis_host = os.environ.get("REDIS_HOST", "redis")
try:
redis_port = int(os.environ.get("REDIS_PORT", "6379"))
except ValueError:
redis_port = 6379 # Default to 6379 if port is not a valid integer
return redis_host, redis_port
#fall back to happy path
return get_redis_instance_from_gcp_project(*args,**kwargs)
[docs]
def get_redis_instance_from_gcp_project(project_id: str, location_id: str, instance_id: str) -> tuple[str, int]:
client = redis_v1.CloudRedisClient()
name = f"projects/{project_id}/locations/{location_id}/instances/{instance_id}"
instance = client.get_instance(name=name)
host = instance.host # e.g. "10.0.0.3"
port = instance.port # e.g. 6379
print(f"Redis endpoint: {host}:{port}")
return host, port
[docs]
def connect_to_redis(host: str, port: int, password: Optional[str] = None) -> redis.StrictRedis:
client = redis.StrictRedis(
host=host,
port=port,
password=password, # Only if you have AUTH enabled
ssl=False, # Set to True if SSL is configured
decode_responses=True # So that get/set return strings instead of bytes
)
return client
[docs]
def get_columna_message(envelope):
try:
return unpack_envelope(
envelope=envelope,
project_id=ProjectId.TEST_ENVIRONMENT.value,
schema_name=Schema.COLUMNA_PRIMARY.value
)
except ValueError as e:
return jsonify({
HTTPResponse.STATUS.value: MessageStatus.ERROR.value,
HTTPResponse.DETAILS.value: str(e)
}), HTTPStatus.BAD_REQUEST.value
except Exception as e:
print(e)
return jsonify({
HTTPResponse.STATUS.value: MessageStatus.ERROR.value,
HTTPResponse.DETAILS.value: "Failed to decode message"
}), HTTPStatus.INTERNAL_SERVER_ERROR.value
[docs]
def get_bq_client():
# Check for test/emulator environment
if (BQ_TEST_ENV_VAR in os.environ and os.environ.get(BQ_TEST_ENV_VAR,"false").lower() == "true") or "BIGQUERY_EMULATOR_HOST" in os.environ:
logger.debug(f"BQ_TEST_ENV_VAR: {BQ_TEST_ENV_VAR}")
logger.debug(f"os.environ[BQ_TEST_ENV_VAR]: {os.environ.get(BQ_TEST_ENV_VAR,'false')}")
logger.debug(f"BIGQUERY_EMULATOR_HOST: {os.environ.get('BIGQUERY_EMULATOR_HOST','not set')}")
# Determine endpoint from environment variables
if os.environ.get(BQ_TEST_ENV_VAR,"false").lower() == "true":
endpoint = os.environ.get(BQ_TEST_ENDPOINT_ENV_VAR, "http://bigquery-emulator:9050")
else:
endpoint = os.environ.get("BIGQUERY_EMULATOR_HOST", "http://bigquery-emulator:9050")
# Ensure endpoint has proper URL format
if not endpoint.startswith(("http://", "https://")):
endpoint = f"http://{endpoint}"
logger.debug(f"Using BigQuery emulator endpoint: {endpoint}")
# Get project ID (use default if not specified)
project_id = ProjectId.current()
logger.debug(f"Using BigQuery project ID: {project_id}")
try:
logger.debug(f"Creating BigQuery client with project_id={project_id}, endpoint={endpoint}")
client = bigquery.Client(
project_id,
client_options=ClientOptions(api_endpoint=endpoint),
credentials=AnonymousCredentials(),
)
logger.debug(f"BigQuery client created successfully with project: {client.project}")
return client
except Exception as e:
logger.error(f"Error creating test BigQuery client: {e}")
import traceback
traceback.print_exc()
raise
# Fall back to standard client for production
print("Using standard BigQuery client (not emulator)")
return bigquery.Client()
[docs]
def get_schema(project_id,schema_id):
schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
try:
result = schema_client.get_schema(request={"name": schema_path})
# print(f"Got a schema:\n{result}")
# print(f"Schema properties: {dir(result)}")
return result
except NotFound:
print(f"{schema_id} not found.")
[docs]
def get_storage_client(*args,**kwargs):
project_id = None
if "project_id" in kwargs:
project_id = kwargs["project_id"]
if STORAGE_TEST_ENV_VAR in os.environ:
if os.environ[STORAGE_TEST_ENV_VAR] == "true":
return storage.Client(
credentials=AnonymousCredentials(),
project=project_id if project_id else ProjectId.current(),
)
if "project_id" in kwargs:
return get_storage_client_from_gcp_project(kwargs["project_id"])
return get_storage_client_from_gcp_project(ProjectId.current())
[docs]
def get_storage_client_from_gcp_project(project_id):
return storage.Client(
project=project_id,
)
[docs]
def pandas_df_to_csv_bytes(df: pd.DataFrame) -> BytesIO:
csv_buffer = BytesIO()
df.to_csv(csv_buffer, index=False)
csv_buffer.seek(0)
return csv_buffer
[docs]
def read_manifest(bucket_name, manifest_path):
"""
Reads a manifest file from a GCS bucket.
Note: This function was migrated from the deprecated beam module.
Parameters
----------
bucket_name : str
The name of the GCS bucket.
manifest_path : str
The path to the manifest file in the bucket.
Returns
-------
dict
The parsed manifest as a dictionary.
"""
logging.getLogger(__name__).info(f"Reading manifest from gs://{bucket_name}/{manifest_path}")
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(manifest_path)
manifest_content = blob.download_as_string()
manifest = json.loads(manifest_content)
return manifest