← Back to Articles & Artefacts
artefactssouth

Event-Driven Architecture for Multi-Agent Creative Practices System

IAIP Research
251221-eda-event-driven-architecture-mlx3e2tyrc6p5.ovxrwpow

Event-Driven Architecture for Multi-Agent Creative Practices System

Executive Summary

Building a distributed event-driven architecture where humans and AI entities can publish/subscribe to shared event hubs, enabling seamless integration of creative practices across coding sessions, narrative design, ceremony design, and story systems while maintaining unified indexing across GitHub, Academic research, Web content, Email, and Drive storage.


SOURCE INDEX (1-2 sentences each)

Engineering & Event Architecture

[1] Confluent - Event-Driven Multi-Agent Systems Establishes four core design patterns for coordinating multiple AI agents: orchestrator-worker, hierarchical agent, blackboard, and market-based patterns, all transformable into event-driven systems using Kafka. Events act as a shared language enabling agents to interpret commands, share context, and coordinate tasks without direct synchronous requests.[19]

[2] Apache Kafka vs Redis Streams vs RabbitMQ Comparison Kafka provides distributed streaming with disk-based persistence for high-throughput event sourcing; Redis Streams offers ultra-low latency in-memory processing with consumer groups; RabbitMQ enables flexible message routing with AMQP protocol. Choose Kafka for large-scale event replay/sourcing, Redis for real-time notifications, RabbitMQ for task queues.[21][24]

[3] Redpanda - Kafka Alternative Redpanda is a C++ rewrite of Kafka eliminating JVM/ZooKeeper complexity while maintaining full Kafka API compatibility; delivers thread-per-core architecture for low-latency, high-throughput streaming with simpler operational overhead. Ideal for resource-constrained local deployments requiring Kafka semantics.[47][53]

[4] NATS JetStream - Cloud-Native Streaming NATS provides lightweight messaging with JetStream extension for event streaming, offering subject-based pub-sub with consumer groups, durable message persistence, and extremely fast latency. Best for microservice coordination and multi-agent communication in restricted infrastructure.[54]

[5] Swim Runtime - Distributed Event-Driven Applications Swim enables building distributed applications from streaming events using linked distributed actors that analyze boundless event streams from millions of sources. Uses op-based CRDTs for asynchronous state replication across actor networks.[46]

[6] EventStoreDB - Event Sourcing Implementation Event sourcing design pattern stores results of business operations as series of events with alternative persistence mechanism. Shows integration with AI/ML for downstream processing: feed events to ChatGPT for status updates or ML models for continuous learning.[49]

Narrative & Creative Systems

[7] NECE - Narrative Event Chain Extraction Toolkit Open-access toolkit automatically extracting and aligning narrative events in temporal order from lengthy narrative texts. Enables analysis of narrative structures and character event flows, with applications in computational narrative analysis.[68]

[8] Story Morals: Value-Driven Narrative Schemas Uses large language models to extract story morals and values across diverse narrative genres (folktales, novels, movies, personal stories). Demonstrates LLMs can effectively interpret narrative schemas and encode lessons reflecting author beliefs.[60]

[9] Neural Story Planning System recursively expands story plots using backward chaining by inferring preconditions for events and events causing those conditions. Measures narrative coherence through question-answering about causal relationships between story events.[71]

[10] StoryExplorer - Interactive Storyline Generation Visualization framework combining stroke annotation and GPT-based hints to extract narrative structures from textual narratives interactively. Enables insight finding-scripting-storytelling workflow for constructing storylines.[72]

[11] Multi-Modal Story Generation Framework Automated story generation system maintaining coherence between consecutive stories with transformer encoder-based storyline guidance predicting plots via multiple-choice question-answering. Addresses challenge of automatic narrative generation with human-like coherence.[74]

[12] Project Jupyter - Computational Narratives Web-based platform for authoring computational narratives combining live code, equations, narrative text, interactive UIs and media. Introduces concept of "literate computing" weaving human language with code results to produce narratives.[75]

Multi-Agent Orchestration

[13] AutoGen Studio - Multi-Agent Developer Tool No-code tool for building and debugging multi-agent systems where multiple agents (AI models + tools) collaborate on complex tasks. Provides workflow specification, interactive evaluation, debugging, and reusable agent component gallery.[17]

[14] AgentLite - Lightweight Agent Framework Task-oriented framework simplifying LLM agent reasoning with lightweight, user-friendly platform for building multi-agent systems. Enhances agent ability to break down tasks and facilitates multi-agent development.[15]

[15] Solace Agent Mesh - Event-Driven AI Framework Framework designed specifically for building and orchestrating multi-agent AI systems with seamless integration of AI agents with real-world data sources. Facilitates complex, multi-step workflows through event-driven patterns.[22]

[16] crewAI - Multi-Agent Collaboration Framework enabling multiple AI agents to work together collaboratively, with role definition and task coordination capabilities. Designed for orchestrating teams of agents working toward shared objectives.[25]

Model Context Protocol (MCP) Integration

[17] MCP - Model Context Protocol Specification Protocol standardizing how applications provide context to LLMs through tools, resources, and prompts. Works with clients like Claude Desktop enabling structured tool calling and resource access.[81]

[18] OpenAI Agents SDK with MCP Shows integration of MCP servers with OpenAI agents, allowing approval workflows and structured tool calling with automatic retries and session management.[78]

[19] use-mcp React Hook Lightweight React integration for connecting to MCP servers with automatic connection management, OAuth authentication, and full tool/resource/prompt support. Enables building MCP clients in web applications.[76]


ARCHITECTURE ABSTRACT

System Design Rationale

The proposed system leverages event-driven patterns to create a federated creative practice ecosystem where:

  1. Centralized Shared Hub: All agents (human and AI) publish events representing creative actions (commits, story fragments, ceremony changes, narrative decisions, feedback).

  2. Peer Hubs: Individual practitioners maintain personal event streams accessible to others for observation, forking, and contribution. Each hub exposes:

    • Unpublished drafts (private events)
    • Published work (public events)
    • Open contributions (accepting external input)
    • Feedback streams (commentary, suggestions)
  3. Cross-Universe Integration: Events indexed by source:

    • Engineering: GitHub commits, pull requests, code reviews (via API)
    • Narrative: Story events, plot points, character arcs (via MCP interface to story database)
    • Ceremony: Ritual events, timing, participants (via custom event schema)
    • Academic: Research papers, citations, peer review (via arXiv/CrossRef APIs)
    • Web/Email/Drive: Metadata extracted and indexed (via MCP servers)
  4. Event-Driven Federation:

    • Practitioners subscribe to events from shared hub + peer hubs
    • Event consumption triggers downstream processing (LLM analysis, fork suggestions, contribution requests)
    • Consumer groups enable parallel processing (e.g., multiple practitioners analyzing same story event)
    • Event compaction removes historical noise while preserving decision lineage
  5. MCP Integration Layer:

    • GitHub MCP server queries repos, extracts commit events, surfaces related issues
    • Story MCP server navigates narrative structures as event sequences
    • Analytics MCP server queries hub for event patterns, surfaces insights
    • Enables Claude/other LLMs to "browse" hub and understand creative context
  6. Operational Guarantees:

    • At-least-once delivery: Events may be reprocessed; idempotent handlers required
    • Event ordering: Per-stream ordering within topic (partition key = creator ID)
    • Retention policy: Recent events hot, archived events queryable via event compaction
    • Replay capability: New subscribers can replay historical context before current events

Implementation Pattern

``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ CENTRALIZED SHARED EVENT HUB β”‚ β”‚ (Kafka/Redpanda/NATS) β”‚ β”‚ β”‚ β”‚ Topics: β”‚ β”‚ - creative.engineering (GitHub commits) β”‚ β”‚ - creative.narrative (story events) β”‚ β”‚ - creative.ceremony (ritual events) β”‚ β”‚ - creative.feedback (commentary) β”‚ β”‚ - creative.contributions (ready to merge) β”‚ β”‚ - creative.metadata (indexing) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β–² β–² β–² β”‚ β”‚ β”‚ β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β” β”‚ Person Aβ”‚ β”‚ Person Bβ”‚ β”‚ AI Agentβ”‚ β”‚ Hub β”‚ β”‚ Hub β”‚ β”‚ Hub β”‚ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ Events: Drafts, Feedback, Ready Contributions

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ SOURCE INTEGRATION LAYER (MCPs) β”‚ β”‚ β”‚ β”‚ - GitHub MCP β†’ Extract commits β”‚ β”‚ - Narrative MCP β†’ Extract story events β”‚ β”‚ - Drive/Email MCP β†’ Metadata β”‚ β”‚ - Academic API β†’ Research metadata β”‚ β”‚ - Analytics MCP β†’ Query patterns β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ```


THREE DEPLOYMENT SOLUTIONS

SOLUTION 1: Redis Streams + Python (Lightweight, Local-First)

Use Case: Single machine, tight latency requirements, <10GB data, integration with existing Python projects

Advantages:

  • Minimal dependencies (Redis binary only)
  • Sub-millisecond latency
  • Simple Python API
  • Works offline
  • Excellent for development/prototyping

Disadvantages:

  • In-memory only (memory management critical)
  • Limited horizontal scaling
  • Not ideal for >10 concurrent consumers
  • Single point of failure without replication

Setup Instructions

```bash

1. Install Redis (macOS with Homebrew)

brew install redis

2. Start Redis with persistence

redis-server --appendonly yes --appendfsync everysec
--dir ~/creative-hub/redis-data
--logfile ~/creative-hub/redis.log &

3. Create Python environment

mkdir -p ~/creative-hub && cd ~/creative-hub python3 -m venv venv source venv/bin/activate

4. Install dependencies

pip install redis pydantic pytest

5. Create event producer/consumer framework

cat > event_hub.py << 'EOFPYTHON' import redis import json from datetime import datetime from typing import Dict, Any from enum import Enum

class EventType(Enum): COMMIT = "creative.engineering" STORY = "creative.narrative" CEREMONY = "creative.ceremony" FEEDBACK = "creative.feedback" CONTRIBUTION = "creative.contributions"

class CreativeEventHub: def init(self, redis_url="redis://localhost:6379"): self.redis = redis.from_url(redis_url) self.consumer_group = "creative-agents"

def publish_event(self, event_type: EventType, creator_id: str, payload: Dict[str, Any]):
    """Publish event to stream"""
    event = {
        "timestamp": datetime.utcnow().isoformat(),
        "creator": creator_id,
        "type": event_type.value,
        "payload": json.dumps(payload)
    }
    stream_key = f"hub:{event_type.value}"
    event_id = self.redis.xadd(stream_key, event)
    return event_id

def subscribe(self, event_type: EventType, consumer_name: str, 
             handler_func, start_from="$"):
    """Subscribe to event stream with consumer group"""
    stream_key = f"hub:{event_type.value}"
    
    # Create consumer group if not exists
    try:
        self.redis.xgroup_create(stream_key, self.consumer_group, 
                                 id=start_from, mkstream=True)
    except redis.ResponseError:
        pass  # Group already exists
    
    # Read and process events
    while True:
        messages = self.redis.xreadgroup(
            {stream_key: '>'},
            self.consumer_group,
            consumer_name,
            count=10,
            block=1000
        )
        
        for stream, events in messages or []:
            for event_id, event_data in events:
                event = {k.decode(): v.decode() for k, v in event_data.items()}
                handler_func(event)
                self.redis.xack(stream_key, self.consumer_group, event_id)

def replay_events(self, event_type: EventType, since_id="0"):
    """Replay historical events"""
    stream_key = f"hub:{event_type.value}"
    return self.redis.xrange(stream_key, min=since_id)

def trim_stream(self, event_type: EventType, max_len=10000):
    """Trim old events to manage memory"""
    stream_key = f"hub:{event_type.value}"
    self.redis.xtrim(stream_key, maxlen=max_len)

if name == "main": hub = CreativeEventHub()

# Example: Publish a GitHub commit event
hub.publish_event(
    EventType.COMMIT,
    creator_id="gerico1007",
    payload={
        "repo": "simexp",
        "commit_sha": "abc123",
        "message": "Add multi-agent event sync",
        "url": "https://github.com/Gerico1007/simexp/commit/abc123"
    }
)

print("Event published successfully")

EOFPYTHON

6. Test the hub

python event_hub.py

7. Monitor Redis (in separate terminal)

redis-cli MONITOR

8. Cleanup when done

pkill redis-server

rm -rf ~/creative-hub/redis-data

```


SOLUTION 2: Redpanda (Kafka-Compatible, Scalable)

Use Case: Team of 5-50 practitioners, reliable persistence, want Kafka ecosystem, modest infrastructure

Advantages:

  • Drop-in Kafka replacement (99% compatible)
  • C++ implementation (lower resource overhead than Kafka)
  • No ZooKeeper or JVM required
  • Horizontal scaling to multiple nodes
  • Schema registry, HTTP proxy built-in
  • Excellent community, production-tested

Disadvantages:

  • Higher memory footprint than Redis
  • Requires deployment orchestration (Docker/systemd)
  • Learning curve for operations
  • More complex than Redis for simple use cases

Setup Instructions

```bash

1. Install Redpanda (macOS via Homebrew)

brew install redpanda

2. Start Redpanda in development mode

redpanda start --mode dev --kafka-addr 127.0.0.1:9092
--advertised-kafka-addr 127.0.0.1:9092
--pandaproxy-addr 127.0.0.1:8082
--advertised-pandaproxy-addr 127.0.0.1:8082
--schema-registry-addr 127.0.0.1:8081
--advertised-schema-registry-addr 127.0.0.1:8081 &

3. Wait and verify cluster health

sleep 3 rpk cluster info

4. Create topics for creative practices

rpk topic create creative-engineering
--partitions 3
--replication-factor 1
--config retention.ms=604800000

rpk topic create creative-narrative
--partitions 3
--replication-factor 1

rpk topic create creative-ceremony
--partitions 2
--replication-factor 1

rpk topic create creative-feedback
--partitions 5
--replication-factor 1

rpk topic create creative-contributions
--partitions 3
--replication-factor 1

5. Verify topics

rpk topic list

6. Create Python client

pip install kafka-python pydantic

cat > redpanda_hub.py << 'EOFPYTHON' from kafka import KafkaProducer, KafkaConsumer from kafka.admin import KafkaAdminClient, NewTopic import json from typing import Dict, Any, Callable from datetime import datetime from enum import Enum

class EventType(Enum): COMMIT = "creative-engineering" STORY = "creative-narrative" CEREMONY = "creative-ceremony" FEEDBACK = "creative-feedback" CONTRIBUTION = "creative-contributions"

class RedpandaCreativeHub: def init(self, bootstrap_servers="localhost:9092"): self.bootstrap_servers = bootstrap_servers self.producer = KafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8'), key_serializer=lambda k: k.encode('utf-8') if k else None, acks='all' )

def publish_event(self, event_type: EventType, creator_id: str, 
                 payload: Dict[str, Any]) -> str:
    """Publish event with creator_id as partition key for ordering"""
    event = {
        "timestamp": datetime.utcnow().isoformat(),
        "creator": creator_id,
        "payload": payload
    }
    
    future = self.producer.send(
        event_type.value,
        key=creator_id,
        value=event
    )
    record_metadata = future.get(timeout=10)
    return f"{record_metadata.topic}:{record_metadata.partition}:{record_metadata.offset}"

def subscribe(self, event_type: EventType, group_id: str, 
             handler: Callable, from_beginning=False):
    """Subscribe to events with consumer group"""
    consumer = KafkaConsumer(
        event_type.value,
        bootstrap_servers=self.bootstrap_servers,
        group_id=group_id,
        auto_offset_reset='earliest' if from_beginning else 'latest',
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        enable_auto_commit=True
    )
    
    try:
        for message in consumer:
            handler(message.value)
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()

def query_events(self, event_type: EventType, creator_id: str = None):
    """Query historical events"""
    consumer = KafkaConsumer(
        event_type.value,
        bootstrap_servers=self.bootstrap_servers,
        auto_offset_reset='earliest',
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        group_id=f"query-{event_type.value}"
    )
    
    events = []
    for message in consumer:
        if creator_id is None or message.value.get('creator') == creator_id:
            events.append(message.value)
        if len(events) >= 100:
            break
    consumer.close()
    return events

if name == "main": hub = RedpandaCreativeHub()

event_id = hub.publish_event(
    EventType.COMMIT,
    "gerico1007",
    {
        "repo": "simexp",
        "message": "Integrate EDA event hub",
        "files_changed": 7
    }
)
print(f"Published: {event_id}")

EOFPYTHON

7. Test

python redpanda_hub.py

8. Monitor

rpk topic describe creative-engineering rpk cluster health

9. Cleanup

pkill redpanda

```


SOLUTION 3: NATS JetStream (Cloud-Native, Minimal Ops)

Use Case: Distributed team, cloud/hybrid infrastructure, want minimal operational overhead, <50GB data

Advantages:

  • Single binary, zero dependencies
  • Extremely lightweight (15MB image)
  • Built-in subject-based routing
  • Consumer groups with exactly-once delivery options
  • Native clustering for HA
  • Excellent for microservices/edge

Disadvantages:

  • Smaller ecosystem than Kafka
  • Less mature for large-scale data pipelines
  • Fewer monitoring tools
  • Different mental model (subjects vs topics)

Setup Instructions

```bash

1. Install NATS

brew install nats-server nats-cli

2. Create config

cat > nats.conf << 'EOFNATS' port: 4222 http_port: 8222

jetstream { domain: creative_practices store_dir: "./nats-data" max_memory: 4GB max_file: 4GB }

accounts { creative { users: [ { user: agent, password: your_password_here } ] jetstream: enabled } }

cluster { name: creative-hub listen: 127.0.0.1:6222 routes: [ nats://127.0.0.1:6222 ] } EOFNATS

3. Start NATS

nats-server -c nats.conf &

4. Verify

sleep 2 nats server info

5. Create streams

nats stream add
--name creative_engineering
--subjects "creative.engineering.>"
--retention limits
--max-msgs -1
--max-bytes 10GB
--max-age 7d

nats stream add --name creative_narrative --subjects "creative.narrative.>" nats stream add --name creative_ceremony --subjects "creative.ceremony.>" nats stream add --name creative_feedback --subjects "creative.feedback.>" nats stream add --name creative_contributions --subjects "creative.contributions.>"

List streams

nats stream list

6. Python client

pip install nats-py

cat > nats_hub.py << 'EOFPYTHON' import nats import json from datetime import datetime from typing import Dict, Any, Callable from enum import Enum import asyncio

class CreativeSubject(Enum): COMMIT = "creative.engineering.{creator}" STORY = "creative.narrative.{creator}" CEREMONY = "creative.ceremony.{creator}" FEEDBACK = "creative.feedback.{creator}" CONTRIBUTION = "creative.contributions.{creator}"

class NATSCreativeHub: def init(self, servers=["nats://localhost:4222"]): self.servers = servers self.nc = None self.js = None

async def connect(self):
    self.nc = await nats.connect(self.servers)
    self.js = self.nc.jetstream()

async def close(self):
    if self.nc:
        await self.nc.close()

async def publish_event(self, subject_template: CreativeSubject, 
                       creator_id: str, payload: Dict[str, Any]):
    subject = subject_template.value.format(creator=creator_id)
    event = {
        "timestamp": datetime.utcnow().isoformat(),
        "creator": creator_id,
        "payload": payload
    }
    
    ack = await self.js.publish(subject, json.dumps(event).encode())
    return ack.metadata.sequence

async def main(): hub = NATSCreativeHub() await hub.connect()

try:
    seq = await hub.publish_event(
        CreativeSubject.COMMIT,
        "gerico1007",
        {
            "repo": "simexp",
            "message": "Integrate NATS JetStream hub",
            "sha": "def456"
        }
    )
    print(f"Published at sequence: {seq}")
finally:
    await hub.close()

if name == "main": asyncio.run(main()) EOFPYTHON

7. Test with CLI

nats pub "creative.engineering.gerico1007" '{"commit":"test"}'

Monitor

nats sub "creative.engineering.>"

8. Cleanup

pkill nats-server

rm -rf nats-data

```


COMPARISON TABLE

AspectRedis StreamsRedpandaNATS JetStream
Setup Time5 min15 min10 min
Memory Overhead100MB500MB+50MB
ScalabilitySingle machine3+ nodesClusterable
PersistenceRDB/AOFRaftFile-based
Consumer GroupsYesYesYes
Latency<1ms5-10ms1-5ms
Max Topics/StreamsUnlimited*ThousandsUnlimited
Monitoringredis-clirpknats CLI
EcosystemMediumLargeGrowing
Team SizeSolo-55-50+5-100+
Ops ComplexityMinimalModerateMinimal
Cloud ReadyPartialYesYes
CostFreeFreeFree

NEXT STEPS

  1. Choose solution based on team size and infrastructure
  2. Follow setup instructions in terminal
  3. Implement event schema for narrative/engineering/ceremony events
  4. Build MCP servers for GitHub, narrative DB, Drive indexing
  5. Deploy indexing service consuming from hub
  6. Create practitioner interfaces (dashboard, CLI, IDE integration)

SOURCE CITATIONS

[1-19] Web sources indexed above with full reference information available in original research