Source code for zcloud.gcp_services

import logging
from io import BytesIO
import os
import json
from typing import Optional

from flask import jsonify
from google.api_core.exceptions import NotFound
from google.auth.credentials import AnonymousCredentials
from google.api_core.client_options import ClientOptions
from google.cloud import bigquery,storage
from google.cloud import redis_v1
from google.cloud.pubsub import SchemaServiceClient
import pandas as pd
import redis


from zcloud.pubsub import Schema
from zcloud.pubsub.messaging import unpack_envelope
from zcloud.service_enums import MessageStatus, HTTPStatus, HTTPResponse, ProjectId


logger = logging.getLogger(__name__)

HARDCODED_REDIS_INSTANCE_ID = "cloud-test-redis"
REDIS_TEST_ENV_VAR="REDIS_TEST_HOST"
BQ_TEST_ENV_VAR="BQ_TEST_PROJECT"
BQ_TEST_ENDPOINT_ENV_VAR="BQ_TEST_ENDPOINT"
STORAGE_TEST_ENV_VAR="STORAGE_TEST_HOST"

[docs] def get_redis_instance_endpoint(*args, **kwargs) -> tuple[str, int]: if REDIS_TEST_ENV_VAR in os.environ: if os.environ[REDIS_TEST_ENV_VAR] == "true": redis_host = os.environ.get("REDIS_HOST", "redis") try: redis_port = int(os.environ.get("REDIS_PORT", "6379")) except ValueError: redis_port = 6379 # Default to 6379 if port is not a valid integer return redis_host, redis_port #fall back to happy path return get_redis_instance_from_gcp_project(*args,**kwargs)
[docs] def get_redis_instance_from_gcp_project(project_id: str, location_id: str, instance_id: str) -> tuple[str, int]: client = redis_v1.CloudRedisClient() name = f"projects/{project_id}/locations/{location_id}/instances/{instance_id}" instance = client.get_instance(name=name) host = instance.host # e.g. "10.0.0.3" port = instance.port # e.g. 6379 print(f"Redis endpoint: {host}:{port}") return host, port
[docs] def connect_to_redis(host: str, port: int, password: Optional[str] = None) -> redis.StrictRedis: client = redis.StrictRedis( host=host, port=port, password=password, # Only if you have AUTH enabled ssl=False, # Set to True if SSL is configured decode_responses=True # So that get/set return strings instead of bytes ) return client
[docs] def get_columna_message(envelope): try: return unpack_envelope( envelope=envelope, project_id=ProjectId.TEST_ENVIRONMENT.value, schema_name=Schema.COLUMNA_PRIMARY.value ) except ValueError as e: return jsonify({ HTTPResponse.STATUS.value: MessageStatus.ERROR.value, HTTPResponse.DETAILS.value: str(e) }), HTTPStatus.BAD_REQUEST.value except Exception as e: print(e) return jsonify({ HTTPResponse.STATUS.value: MessageStatus.ERROR.value, HTTPResponse.DETAILS.value: "Failed to decode message" }), HTTPStatus.INTERNAL_SERVER_ERROR.value
[docs] def get_bq_client(): # Check for test/emulator environment if (BQ_TEST_ENV_VAR in os.environ and os.environ.get(BQ_TEST_ENV_VAR,"false").lower() == "true") or "BIGQUERY_EMULATOR_HOST" in os.environ: logger.debug(f"BQ_TEST_ENV_VAR: {BQ_TEST_ENV_VAR}") logger.debug(f"os.environ[BQ_TEST_ENV_VAR]: {os.environ.get(BQ_TEST_ENV_VAR,'false')}") logger.debug(f"BIGQUERY_EMULATOR_HOST: {os.environ.get('BIGQUERY_EMULATOR_HOST','not set')}") # Determine endpoint from environment variables if os.environ.get(BQ_TEST_ENV_VAR,"false").lower() == "true": endpoint = os.environ.get(BQ_TEST_ENDPOINT_ENV_VAR, "http://bigquery-emulator:9050") else: endpoint = os.environ.get("BIGQUERY_EMULATOR_HOST", "http://bigquery-emulator:9050") # Ensure endpoint has proper URL format if not endpoint.startswith(("http://", "https://")): endpoint = f"http://{endpoint}" logger.debug(f"Using BigQuery emulator endpoint: {endpoint}") # Get project ID (use default if not specified) project_id = ProjectId.current() logger.debug(f"Using BigQuery project ID: {project_id}") try: logger.debug(f"Creating BigQuery client with project_id={project_id}, endpoint={endpoint}") client = bigquery.Client( project_id, client_options=ClientOptions(api_endpoint=endpoint), credentials=AnonymousCredentials(), ) logger.debug(f"BigQuery client created successfully with project: {client.project}") return client except Exception as e: logger.error(f"Error creating test BigQuery client: {e}") import traceback traceback.print_exc() raise # Fall back to standard client for production print("Using standard BigQuery client (not emulator)") return bigquery.Client()
[docs] def get_schema(project_id,schema_id): schema_client = SchemaServiceClient() schema_path = schema_client.schema_path(project_id, schema_id) try: result = schema_client.get_schema(request={"name": schema_path}) # print(f"Got a schema:\n{result}") # print(f"Schema properties: {dir(result)}") return result except NotFound: print(f"{schema_id} not found.")
[docs] def get_storage_client(*args,**kwargs): project_id = None if "project_id" in kwargs: project_id = kwargs["project_id"] if STORAGE_TEST_ENV_VAR in os.environ: if os.environ[STORAGE_TEST_ENV_VAR] == "true": return storage.Client( credentials=AnonymousCredentials(), project=project_id if project_id else ProjectId.current(), ) if "project_id" in kwargs: return get_storage_client_from_gcp_project(kwargs["project_id"]) return get_storage_client_from_gcp_project(ProjectId.current())
[docs] def get_storage_client_from_gcp_project(project_id): return storage.Client( project=project_id, )
[docs] def pandas_df_to_csv_bytes(df: pd.DataFrame) -> BytesIO: csv_buffer = BytesIO() df.to_csv(csv_buffer, index=False) csv_buffer.seek(0) return csv_buffer
[docs] def read_manifest(bucket_name, manifest_path): """ Reads a manifest file from a GCS bucket. Note: This function was migrated from the deprecated beam module. Parameters ---------- bucket_name : str The name of the GCS bucket. manifest_path : str The path to the manifest file in the bucket. Returns ------- dict The parsed manifest as a dictionary. """ logging.getLogger(__name__).info(f"Reading manifest from gs://{bucket_name}/{manifest_path}") storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(manifest_path) manifest_content = blob.download_as_string() manifest = json.loads(manifest_content) return manifest