Skip to main content
This guide walks you through building a custom AI agent that listens for messages in Zenzap and responds using OpenAI. By the end, you’ll have a working bot that polls for new messages, processes them through an LLM, and sends replies back to your topics.

Prerequisites

Project Setup

Create a new directory for your agent and install the required dependencies:
mkdir zenzap-agent && cd zenzap-agent
python -m venv venv && source venv/bin/activate
pip install requests python-dotenv openai
Create a .env file with your credentials:
BOT_API_KEY=your_bot_api_key_here
BOT_SECRET=your_bot_secret_here
CONTROL_CHANNEL_ID=your_control_channel_topic_id_here
OPENAI_API_KEY=your_openai_api_key_here
OPENAI_MODEL=gpt-4o
API_BASE_URL=https://api.zenzap.co
VariableDescription
BOT_API_KEYYour Zenzap API key (from the console)
BOT_SECRETYour Zenzap API secret (used for request signing)
CONTROL_CHANNEL_IDThe topic ID where the bot will post status messages (e.g. “connected”, “disconnecting”)
OPENAI_API_KEYYour OpenAI API key
OPENAI_MODELThe OpenAI model to use (defaults to gpt-4o)
API_BASE_URLThe Zenzap API base URL (defaults to https://api.zenzap.co)

Step 1 — Build the Zenzap API Client

Create zenzap_client.py. This module handles all communication with the Zenzap API, including HMAC request signing (see Authentication for details). Start with the response wrapper and client constructor:
import hashlib
import hmac
import json
import time
from typing import Any, Optional
from dataclasses import dataclass
from urllib.parse import quote, urlencode

import requests


@dataclass
class ApiResponse:
    status: int
    data: Any
    success: bool

    @classmethod
    def from_response(cls, response: requests.Response) -> "ApiResponse":
        try:
            data = response.json()
        except ValueError:
            text = response.text.strip()
            data = {"raw": text} if text else {}
        return cls(
            status=response.status_code,
            data=data,
            success=200 <= response.status_code < 300
        )

    @classmethod
    def from_exception(cls, exception: requests.RequestException) -> "ApiResponse":
        return cls(
            status=0,
            data={"error": str(exception), "type": exception.__class__.__name__},
            success=False,
        )


class ZenzapClient:
    def __init__(
        self,
        api_key: str,
        secret: str,
        base_url: str = "https://api.zenzap.co",
        timeout: float = 30.0,
    ):
        self.api_key = api_key
        self.secret = secret
        self.base_url = base_url.rstrip("/")
        self.timeout = timeout
Next, add the private methods that handle request signing and HTTP calls. Every request must include an HMAC-SHA256 signature — GET requests sign the URI path, while POST/PATCH/DELETE requests sign the JSON body:
    def _generate_signature(self, data: str, timestamp: str) -> str:
        return hmac.new(
            self.secret.encode("utf-8"),
            f"{timestamp}.{data}".encode("utf-8"),
            hashlib.sha256
        ).hexdigest()

    def _get_headers(self, signature: str, timestamp: str, include_content_type: bool = False) -> dict:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "X-Signature": signature,
            "X-Timestamp": timestamp,
        }
        if include_content_type:
            headers["Content-Type"] = "application/json"
        return headers

    def _get(self, path: str) -> ApiResponse:
        timestamp = str(int(time.time() * 1000))
        signature = self._generate_signature(path, timestamp)
        url = f"{self.base_url}{path}"
        try:
            response = requests.get(
                url,
                headers=self._get_headers(signature, timestamp),
                timeout=self.timeout,
            )
            return ApiResponse.from_response(response)
        except requests.RequestException as exception:
            return ApiResponse.from_exception(exception)

    def _request_with_body(self, method: str, path: str, body: dict) -> ApiResponse:
        body_str = json.dumps(body, separators=(",", ":"))
        timestamp = str(int(time.time() * 1000))
        signature = self._generate_signature(body_str, timestamp)
        url = f"{self.base_url}{path}"
        try:
            response = requests.request(
                method,
                url,
                headers=self._get_headers(signature, timestamp, include_content_type=True),
                data=body_str,
                timeout=self.timeout,
            )
            return ApiResponse.from_response(response)
        except requests.RequestException as exception:
            return ApiResponse.from_exception(exception)

    def _post(self, path: str, body: dict) -> ApiResponse:
        return self._request_with_body("POST", path, body)

    def _patch(self, path: str, body: dict) -> ApiResponse:
        return self._request_with_body("PATCH", path, body)

    def _delete(self, path: str, body: dict) -> ApiResponse:
        return self._request_with_body("DELETE", path, body)
Finally, add the public API methods your agent will use:
    def get_current_member(self) -> ApiResponse:
        return self._get("/v2/members/me")

    def get_topic(self, topic_id: str) -> ApiResponse:
        return self._get(f"/v2/topics/{topic_id}")

    def send_message(self, topic_id: str, text: str, external_id: Optional[str] = None) -> ApiResponse:
        body = {"topicId": topic_id, "text": text}
        if external_id:
            body["externalId"] = external_id
        return self._post("/v2/messages", body)

    def mark_message_read(self, message_id: str) -> ApiResponse:
        return self._post(f"/v2/messages/{message_id}/read", {})

    def add_reaction(self, message_id: str, reaction: str) -> ApiResponse:
        return self._post(f"/v2/messages/{message_id}/reactions", {"reaction": reaction})

    def get_updates(
        self,
        offset: Optional[str] = None,
        limit: int = 100,
        poll_timeout: int = 30,
    ) -> ApiResponse:
        params: dict[str, Any] = {"limit": limit, "timeout": poll_timeout}
        if offset:
            params["offset"] = offset
        query = urlencode({k: v for k, v in params.items() if v is not None}, doseq=True)
        path = f"/v2/updates?{query}" if query else "/v2/updates"
        timestamp = str(int(time.time() * 1000))
        signature = self._generate_signature(path, timestamp)
        url = f"{self.base_url}{path}"
        try:
            response = requests.get(
                url,
                headers=self._get_headers(signature, timestamp),
                timeout=poll_timeout + 10,
            )
            return ApiResponse.from_response(response)
        except requests.RequestException as exception:
            return ApiResponse.from_exception(exception)
The client uses additional methods like create_topic, list_topics, create_task, etc. See the full API Reference endpoints for everything you can do.

Step 2 — Handle Incoming Messages

Create bot.py. Start by defining a system prompt and a state object to track the bot’s runtime data:
import os
import signal
import sys
import time
from dataclasses import dataclass, field
from typing import Optional

from dotenv import load_dotenv
from openai import OpenAI

from zenzap_client import ZenzapClient

DEFAULT_SYSTEM_PROMPT = (
    "You are an AI assistant embedded in Zenzap, a team messaging platform. "
    "You operate across multiple topics (group chats). Each message you receive "
    "includes the sender's name and topic name as context. "
    "Be concise — this is chat, not a document. Short answers win. Expand only when asked. "
    "Never start with filler like 'Great question!' or 'Sure!'. Just answer. "
    "You have no memory between topics unless explicitly told."
)


@dataclass
class BotState:
    bot_member_id: str
    next_offset: Optional[str] = None
    topic_name_cache: dict = field(default_factory=dict)
    conversation_histories: dict = field(default_factory=dict)
    running: bool = True
  • bot_member_id — the bot’s own user ID, so it can skip its own messages.
  • next_offset — the cursor for long polling.
  • topic_name_cache — avoids repeated API calls to resolve topic names.
  • conversation_histories — per-topic message history sent to OpenAI for context.
Now add the core message handler. When a message arrives, it appends it to the topic’s conversation history, calls OpenAI, and sends the reply back:
def resolve_topic_name(state: BotState, zenzap: ZenzapClient, topic_id: str) -> str:
    if topic_id not in state.topic_name_cache:
        resp = zenzap.get_topic(topic_id)
        state.topic_name_cache[topic_id] = resp.data.get("name", topic_id) if resp.success else topic_id
    return state.topic_name_cache[topic_id]


def handle_chat(state: BotState, zenzap: ZenzapClient, openai_client: OpenAI, model: str, system_prompt: str, msg: dict) -> None:
    topic_id = msg["topicId"]
    text = msg.get("text", "")
    sender_name = msg.get("senderName", "Unknown")
    topic_name = resolve_topic_name(state, zenzap, topic_id)

    history = state.conversation_histories.setdefault(topic_id, [])
    history.append({"role": "user", "content": f"[from: {sender_name}, in: #{topic_name}]\n{text}"})

    # Keep the last 20 messages per topic to stay within token limits
    if len(history) > 20:
        history[:] = history[-20:]

    try:
        completion = openai_client.chat.completions.create(
            model=model,
            messages=[{"role": "system", "content": system_prompt}] + history,
        )
        reply = completion.choices[0].message.content
    except Exception as e:
        zenzap.send_message(topic_id, f"⚠️ OpenAI error: {e}")
        return

    history.append({"role": "assistant", "content": reply})
    zenzap.send_message(topic_id, reply)

Step 3 — Process Updates From the Poll Loop

Add a function that filters incoming updates and routes relevant ones to the chat handler. The bot should ignore its own messages and empty texts:
def handle_update(state: BotState, zenzap: ZenzapClient, openai_client: OpenAI, model: str, system_prompt: str, update: dict) -> None:
    if update.get("eventType") != "message.created":
        return

    msg = update.get("data", {}).get("message", {})
    if not msg:
        return

    if msg.get("senderId") == state.bot_member_id:
        return

    text = msg.get("text") or ""
    if not text.strip():
        return

    message_id = msg["id"]
    zenzap.mark_message_read(message_id)
    zenzap.add_reaction(message_id, "👀")

    handle_chat(state, zenzap, openai_client, model, system_prompt, msg)
Key behaviors:
  • Only message.created events are processed — see Webhook Events for all event types.
  • The bot skips its own messages by comparing senderId to bot_member_id.
  • Each incoming message is marked as read and given an 👀 reaction as visual feedback.

Step 4 — Wire Up the Main Loop

Finally, add the main() function that initializes the clients and starts the long-polling loop:
def main() -> None:
    load_dotenv()

    bot_api_key = os.getenv("BOT_API_KEY")
    bot_secret = os.getenv("BOT_SECRET")
    control_channel_id = os.getenv("CONTROL_CHANNEL_ID")
    openai_api_key = os.getenv("OPENAI_API_KEY")
    api_base_url = os.getenv("API_BASE_URL", "https://api.zenzap.co")
    openai_model = os.getenv("OPENAI_MODEL", "gpt-4o")

    for name, value in [
        ("BOT_API_KEY", bot_api_key),
        ("BOT_SECRET", bot_secret),
        ("CONTROL_CHANNEL_ID", control_channel_id),
        ("OPENAI_API_KEY", openai_api_key),
    ]:
        if not value:
            print(f"Missing required environment variable: {name}")
            sys.exit(1)

    zenzap = ZenzapClient(bot_api_key, bot_secret, api_base_url)
    openai_client = OpenAI(api_key=openai_api_key)

    # Verify the bot identity
    me_resp = zenzap.get_current_member()
    if not me_resp.success:
        print(f"Failed to fetch bot identity: {me_resp.data}")
        sys.exit(1)

    bot_member_id = me_resp.data["id"]
    bot_name = me_resp.data.get("name", "Bot")

    # Verify the control channel is accessible
    control_topic_resp = zenzap.get_topic(control_channel_id)
    if not control_topic_resp.success:
        print(f"Failed to fetch control channel: {control_topic_resp.data}")
        sys.exit(1)

    control_topic_name = control_topic_resp.data.get("name", control_channel_id)

    state = BotState(bot_member_id=bot_member_id)
    state.topic_name_cache[control_channel_id] = control_topic_name

    # Graceful shutdown
    def shutdown(signum, frame) -> None:
        state.running = False
        zenzap.send_message(control_channel_id, "🛑 Bot disconnecting...")
        sys.exit(0)

    signal.signal(signal.SIGINT, shutdown)
    signal.signal(signal.SIGTERM, shutdown)

    zenzap.send_message(
        control_channel_id,
        f"Agent connected successfully, this is the control channel #{control_topic_name}",
    )
    print(f"{bot_name} connected. Listening for messages...")

    # Long-polling loop
    while state.running:
        resp = zenzap.get_updates(offset=state.next_offset, poll_timeout=30)

        if not resp.success:
            if resp.status == 409:
                state.next_offset = None
            else:
                print(f"Poll error ({resp.status}): {resp.data}")
                time.sleep(2)
            continue

        state.next_offset = resp.data.get("nextOffset", state.next_offset)

        for update in resp.data.get("updates", []):
            handle_update(state, zenzap, openai_client, openai_model, DEFAULT_SYSTEM_PROMPT, update)


if __name__ == "__main__":
    main()
The main loop uses long polling to efficiently wait for new events. If a 409 error occurs (offset expired), it resets and starts fresh.

Step 5 — Run the Agent

Start the bot:
python bot.py
You should see output confirming the connection:
Bot connected. Listening for messages...
The bot will also send a message to your control channel confirming it’s online. From here, any message sent in a topic the bot has access to will trigger an OpenAI-powered response.

Next Steps

  • Custom tools — Extend handle_chat to support function calling so your agent can create tasks, manage topics, or call external APIs.
  • Conversation management — Use a database instead of in-memory conversation_histories for persistence across restarts.

Contact Support

If you have any questions or need help, don’t hesitate to contact our Support Team.