from io import BytesIO
import base64
import json
import os
from typing import Any, Optional
import fastavro
from google.cloud import pubsub_v1
from google.cloud.pubsub import SchemaServiceClient
from fastavro import schemaless_reader, schemaless_writer
from zcloud.pubsub import MessageType, Priority, MessageSchemaFields, MessageMetadataFields
HARDCODED_COLUMNA_SCHEMA = {
"type": "record",
"name": "columna_primary",
"fields": [
{
"name": MessageSchemaFields.SOURCE.value,
"type": "string",
"doc": "Identifier or name of the message source.",
},
{
"name": MessageSchemaFields.MESSAGE_TYPE.value,
"type": {
"type": "enum",
"name": "eventType",
"symbols": [
MessageType.EVENT.value,
MessageType.ALERT.value,
MessageType.METRIC.value
],
"default": MessageType.EVENT.value,
},
"doc": "Type or category of the message (e.g., 'event', 'alert', 'metric').",
"default": MessageType.EVENT.value,
},
{
"name": MessageSchemaFields.MESSAGE_SUBJECT.value,
"type": "string",
"doc": "String which can be used for additional filtering for subscriptions",
"default": "",
},
{
"name": MessageSchemaFields.PRIORITY.value,
"type": {
"type": "enum",
"name": "PriorityLevel",
"symbols": [
Priority.LOW.value,
Priority.MEDIUM.value,
Priority.HIGH.value
],
"default": Priority.LOW.value,
},
"doc": "Priority level of the message.",
"default": Priority.LOW.value,
},
{
"name": MessageSchemaFields.PAYLOAD.value,
"type": {
"type": "record",
"name": "Payload",
"fields": [
{
"name": "data",
"type": "string",
"doc": "Main content or data carried by the message in a generic string format.",
}
],
},
"doc": "Main content or data carried by the message.",
},
{
"name": MessageSchemaFields.METADATA.value,
"type": {
"type": "record",
"name": "Metadata",
"fields": [
{
"name": MessageMetadataFields.CORRELATION_ID.value,
"type": "string",
"doc": "An identifier used to correlate the message with related operations.",
"default": "",
},
{
"name": MessageMetadataFields.TAGS.value,
"type": {"type": "array", "items": "string"},
"doc": "Tags or labels associated with the message.",
"default": [],
},
],
},
"doc": "Additional metadata related to the message.",
},
],
}
# copilot context: docs are made with sphinx automodule/autoclass/autoapi etc, so use rst in docstrings
HARDCODE_DEFAULT_PROJECT_ID = "zipbio-cloud-test-1"
HARDCODE_DEFAULT_TOPIC_NAME = "columna-test"
[docs]
class MessageContainer:
"""
Wrapper to handle message schema, defaults for pub/sub queue
"""
def __init__(
self,
payload: dict[str, Any],
project_id: str,
topic_name: str,
schema_name: str, # Now required
message_type: MessageType = MessageType.EVENT.value,
message_subject: Optional[str] = None,
priority: Priority = Priority.LOW.value,
metadata: Optional[dict[str, Any]] = None,
attributes: Optional[dict[str, str]] = None,
parent_envelope: Optional[dict[str, Any]] = None,
) -> None:
self.payload = payload
self.project_id = project_id
self.topic_name = topic_name
self.schema_name = schema_name
self.schema = self._get_schema(self.schema_name)
self.message_type = message_type
self.message_subject = "" if message_subject is None else message_subject
self.priority = priority
# handle metadata defaults
metadata_dict = metadata or {MessageMetadataFields.CORRELATION_ID.value:None,MessageMetadataFields.TAGS.value:[]}
tags_default = []
corr_id_default = ""
if metadata_dict.get(MessageMetadataFields.TAGS.value) is None:
metadata_dict[MessageMetadataFields.TAGS.value] = tags_default
if metadata_dict.get(MessageMetadataFields.CORRELATION_ID.value) is None:
metadata_dict[MessageMetadataFields.CORRELATION_ID.value] = corr_id_default
self.metadata = metadata_dict
# handle attributes defaults
self.attributes = attributes or {}
self.parent_envelope = parent_envelope
def _get_schema(self, schema_name: str) -> dict[str, Any]:
"""
Get the schema for the message, default to columna
"""
# fetch schema from pub/sub or use cached, or use default, not sure yet
# hardcode something for prototyping
return HARDCODED_COLUMNA_SCHEMA
def _get_message_dict(self) -> str:
"""
Convert object properties to message dict
"""
return {
MessageSchemaFields.SOURCE.value: self.project_id,
MessageSchemaFields.MESSAGE_TYPE.value: self.message_type,
MessageSchemaFields.MESSAGE_SUBJECT.value: self.message_subject,
MessageSchemaFields.PRIORITY.value: self.priority,
MessageSchemaFields.PAYLOAD.value: {"data": json.dumps(self.payload)},
MessageSchemaFields.METADATA.value: self.metadata,
}
[docs]
def send_message(
self,
) -> None:
"""
Send a message to the pub/sub topic
"""
# Create a publisher client
publisher = pubsub_v1.PublisherClient()
# Get the topic path
topic_path = publisher.topic_path(self.project_id, self.topic_name)
# get the message dict, use fastavro to schema validate and encode to bytes
message_dict = self._get_message_dict()
parent_attributes = self.parent_envelope.get("attributes", {}) if self.parent_envelope else {}
# Combine attributes and parent envelope attributes, with attributes overriding
combined_attributes = {**parent_attributes, **self.attributes}
bytes_io = BytesIO()
schemaless_writer(bytes_io, self.schema, message_dict)
future = publisher.publish(topic_path, bytes_io.getvalue(), **combined_attributes)
future.result()
[docs]
def get_schema(*args,**kwargs
) -> dict:
"""
Get the schema for the message, default to columna
can be used in test mode as well
"""
schema_name = kwargs.get("schema_name",)
test_mode = os.environ.get("MESSAGE_SCHEMA_TEST_MODE", "false").lower() == "true"
if test_mode:
return HARDCODED_COLUMNA_SCHEMA
project_id = kwargs.get("project_id",)
if not project_id:
raise ValueError("project_id is required")
schema_name = kwargs.get("schema_name",)
if not schema_name:
raise ValueError("schema_name is required")
schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_name)
try:
result = schema_client.get_schema(request={"name": schema_path})
except Exception as e:
raise Exception(f"Failed to find schema: {e}")
json_schema = json.loads(result.definition)
return json_schema
[docs]
def unpack_envelope(
envelope: dict,
project_id: str,
schema_name: str
) -> dict:
"""
Unpacks a Pub/Sub envelope containing an Avro-encoded message using the specified schema.
Args:
envelope: The Pub/Sub message envelope containing base64-encoded data
project_id: GCP project ID where the schema is located
schema_name: Name of the schema to use for decoding
Returns:
dict: Decoded message contents
Raises:
ValueError: If envelope is missing required data
Exception: If schema cannot be retrieved or message cannot be decoded
"""
# Validate Pub/Sub message
pubsub_message = envelope.get("message")
if not pubsub_message or "data" not in pubsub_message:
raise ValueError("Bad Request: Missing `data` field")
# Decode the base64 data
data = base64.b64decode(pubsub_message["data"])
# Get the schema
json_schema = get_schema(project_id=project_id, schema_name=schema_name)
# Deserialize the Avro binary payload
with BytesIO(data) as byte_buffer:
byte_buffer.seek(0)
record = schemaless_reader(byte_buffer, writer_schema=json_schema)
return record