Source code for zcloud.pubsub.messaging

from io import BytesIO
import base64
import json
import os
from typing import Any, Optional

import fastavro

from google.cloud import pubsub_v1
from google.cloud.pubsub import SchemaServiceClient
from fastavro import schemaless_reader, schemaless_writer

from zcloud.pubsub import MessageType, Priority, MessageSchemaFields, MessageMetadataFields

HARDCODED_COLUMNA_SCHEMA = {
    "type": "record",
    "name": "columna_primary",
    "fields": [
        {
            "name": MessageSchemaFields.SOURCE.value,
            "type": "string", 
            "doc": "Identifier or name of the message source.",
        },
        {
            "name": MessageSchemaFields.MESSAGE_TYPE.value,
            "type": {
                "type": "enum",
                "name": "eventType",
                "symbols": [
                    MessageType.EVENT.value,
                    MessageType.ALERT.value,
                    MessageType.METRIC.value
                ],
                "default": MessageType.EVENT.value,
            },
            "doc": "Type or category of the message (e.g., 'event', 'alert', 'metric').",
            "default": MessageType.EVENT.value,
        },
        {
            "name": MessageSchemaFields.MESSAGE_SUBJECT.value,
            "type": "string",
            "doc": "String which can be used for additional filtering for subscriptions",
            "default": "",
        },
        {
            "name": MessageSchemaFields.PRIORITY.value,
            "type": {
                "type": "enum",
                "name": "PriorityLevel",
                "symbols": [
                    Priority.LOW.value,
                    Priority.MEDIUM.value,
                    Priority.HIGH.value
                ],
                "default": Priority.LOW.value,
            },
            "doc": "Priority level of the message.",
            "default": Priority.LOW.value,
        },
        {
            "name": MessageSchemaFields.PAYLOAD.value,
            "type": {
                "type": "record",
                "name": "Payload",
                "fields": [
                    {
                        "name": "data",
                        "type": "string",
                        "doc": "Main content or data carried by the message in a generic string format.",
                    }
                ],
            },
            "doc": "Main content or data carried by the message.",
        },
        {
            "name": MessageSchemaFields.METADATA.value,
            "type": {
                "type": "record",
                "name": "Metadata",
                "fields": [
                    {
                        "name": MessageMetadataFields.CORRELATION_ID.value,
                        "type": "string",
                        "doc": "An identifier used to correlate the message with related operations.",
                        "default": "",
                    },
                    {
                        "name": MessageMetadataFields.TAGS.value,
                        "type": {"type": "array", "items": "string"},
                        "doc": "Tags or labels associated with the message.",
                        "default": [],
                    },
                ],
            },
            "doc": "Additional metadata related to the message.",
        },
    ],
}

# copilot context: docs are made with sphinx automodule/autoclass/autoapi etc, so use rst in docstrings

HARDCODE_DEFAULT_PROJECT_ID = "zipbio-cloud-test-1"
HARDCODE_DEFAULT_TOPIC_NAME = "columna-test"

[docs] class MessageContainer: """ Wrapper to handle message schema, defaults for pub/sub queue """ def __init__( self, payload: dict[str, Any], project_id: str, topic_name: str, schema_name: str, # Now required message_type: MessageType = MessageType.EVENT.value, message_subject: Optional[str] = None, priority: Priority = Priority.LOW.value, metadata: Optional[dict[str, Any]] = None, attributes: Optional[dict[str, str]] = None, parent_envelope: Optional[dict[str, Any]] = None, ) -> None: self.payload = payload self.project_id = project_id self.topic_name = topic_name self.schema_name = schema_name self.schema = self._get_schema(self.schema_name) self.message_type = message_type self.message_subject = "" if message_subject is None else message_subject self.priority = priority # handle metadata defaults metadata_dict = metadata or {MessageMetadataFields.CORRELATION_ID.value:None,MessageMetadataFields.TAGS.value:[]} tags_default = [] corr_id_default = "" if metadata_dict.get(MessageMetadataFields.TAGS.value) is None: metadata_dict[MessageMetadataFields.TAGS.value] = tags_default if metadata_dict.get(MessageMetadataFields.CORRELATION_ID.value) is None: metadata_dict[MessageMetadataFields.CORRELATION_ID.value] = corr_id_default self.metadata = metadata_dict # handle attributes defaults self.attributes = attributes or {} self.parent_envelope = parent_envelope def _get_schema(self, schema_name: str) -> dict[str, Any]: """ Get the schema for the message, default to columna """ # fetch schema from pub/sub or use cached, or use default, not sure yet # hardcode something for prototyping return HARDCODED_COLUMNA_SCHEMA def _get_message_dict(self) -> str: """ Convert object properties to message dict """ return { MessageSchemaFields.SOURCE.value: self.project_id, MessageSchemaFields.MESSAGE_TYPE.value: self.message_type, MessageSchemaFields.MESSAGE_SUBJECT.value: self.message_subject, MessageSchemaFields.PRIORITY.value: self.priority, MessageSchemaFields.PAYLOAD.value: {"data": json.dumps(self.payload)}, MessageSchemaFields.METADATA.value: self.metadata, }
[docs] def send_message( self, ) -> None: """ Send a message to the pub/sub topic """ # Create a publisher client publisher = pubsub_v1.PublisherClient() # Get the topic path topic_path = publisher.topic_path(self.project_id, self.topic_name) # get the message dict, use fastavro to schema validate and encode to bytes message_dict = self._get_message_dict() parent_attributes = self.parent_envelope.get("attributes", {}) if self.parent_envelope else {} # Combine attributes and parent envelope attributes, with attributes overriding combined_attributes = {**parent_attributes, **self.attributes} bytes_io = BytesIO() schemaless_writer(bytes_io, self.schema, message_dict) future = publisher.publish(topic_path, bytes_io.getvalue(), **combined_attributes) future.result()
[docs] def get_schema(*args,**kwargs ) -> dict: """ Get the schema for the message, default to columna can be used in test mode as well """ schema_name = kwargs.get("schema_name",) test_mode = os.environ.get("MESSAGE_SCHEMA_TEST_MODE", "false").lower() == "true" if test_mode: return HARDCODED_COLUMNA_SCHEMA project_id = kwargs.get("project_id",) if not project_id: raise ValueError("project_id is required") schema_name = kwargs.get("schema_name",) if not schema_name: raise ValueError("schema_name is required") schema_client = SchemaServiceClient() schema_path = schema_client.schema_path(project_id, schema_name) try: result = schema_client.get_schema(request={"name": schema_path}) except Exception as e: raise Exception(f"Failed to find schema: {e}") json_schema = json.loads(result.definition) return json_schema
[docs] def unpack_envelope( envelope: dict, project_id: str, schema_name: str ) -> dict: """ Unpacks a Pub/Sub envelope containing an Avro-encoded message using the specified schema. Args: envelope: The Pub/Sub message envelope containing base64-encoded data project_id: GCP project ID where the schema is located schema_name: Name of the schema to use for decoding Returns: dict: Decoded message contents Raises: ValueError: If envelope is missing required data Exception: If schema cannot be retrieved or message cannot be decoded """ # Validate Pub/Sub message pubsub_message = envelope.get("message") if not pubsub_message or "data" not in pubsub_message: raise ValueError("Bad Request: Missing `data` field") # Decode the base64 data data = base64.b64decode(pubsub_message["data"]) # Get the schema json_schema = get_schema(project_id=project_id, schema_name=schema_name) # Deserialize the Avro binary payload with BytesIO(data) as byte_buffer: byte_buffer.seek(0) record = schemaless_reader(byte_buffer, writer_schema=json_schema) return record