자체 구현 — FastAPI + Redis

프레임워크 없이도 핵심은 짤 수 있다. + + 셋이면 충분하다. 이번 장은 그 셋을 묶어 작동하는 A2A 서버를 처음부터 끝까지 조립한다.


전체 구조

세 컴포넌트가 한 줄로 묶인다. HTTP 입구는 , 작업 큐는 , 클라이언트로의 진행 상황은 . 워커는 별 프로세스로 돌면서 큐를 소비한다.

다이어그램 로딩…

Agent Card 노출

표준은 /.well-known/agent-card.json에 자신을 광고한다. 는 이름···인증을 JSON으로 적은 한 장이다. 자세한 스키마는 8장에서 다뤘다. 로는 정적 라우터 한 줄이면 끝이다.

# server/agent_card.py
from fastapi import APIRouter

router = APIRouter()

@router.get("/.well-known/agent-card.json")
def agent_card():
  return {
      "name": "summarizer-bot",
      "description": "긴 문서를 한 단락으로 요약한다.",
      "url": "https://example.com/a2a",
      "version": "0.1.0",
      "skills": [
          {
              "id": "summarize",
              "name": "summarize",
              "description": "텍스트 요약",
              "examples": ["A2A 프로토콜이란?"],
          }
      ],
      "authentication": {"schemes": ["bearer"]},
  }

Task 생성 — POST /tasks

클라이언트가 를 만든다. 서버는 즉시 task_id를 돌려주고, 실제 일은 로 던져 백그라운드 워커에 맡긴다. 이게 의 1단계다.

를 헤더로 받으면 같은 키의 재요청은 동일한 task를 가리킨다. 네트워크 재시도·중복 클릭에 안전해진다.

# server/tasks.py
import uuid, json
from fastapi import APIRouter, Header, HTTPException
import redis.asyncio as redis

r = redis.from_url("redis://localhost", decode_responses=True)
router = APIRouter()

@router.post("/tasks")
async def create_task(
  body: dict,
  idempotency_key: str | None = Header(default=None, alias="Idempotency-Key"),
):
  # 1) 멱등성 키가 이미 있으면 기존 task 반환
  if idempotency_key:
      existing = await r.get(f"idem:{idempotency_key}")
      if existing:
          return {"task_id": existing, "status": "submitted"}

  task_id = str(uuid.uuid4())
  if idempotency_key:
      # NX = 처음일 때만 set, 24h TTL
      await r.set(f"idem:{idempotency_key}", task_id, ex=86400, nx=True)

  # 2) Redis Streams 로 enqueue
  await r.xadd("tasks", {"task_id": task_id, "payload": json.dumps(body)})
  await r.hset(f"task:{task_id}", mapping={"status": "submitted"})
  return {"task_id": task_id, "status": "submitted"}

Redis Streams — 큐의 핵심

xaddappend, xreadgroup으로 컨슈머 그룹이 협력해 소비한다. 일반 와 달리 메시지가 남는다 — 재처리·시간 여행이 공짜다. 는 같은 스트림을 여러 워커가 안전하게 나눠 먹게 만든다.


워커 루프

워커 프로세스는 한 줄로 요약된다 — “에서 한 받아 처리하고 ack 한다”. 처리 중 진행 상황은 별도 로 펍한다. 위에서 무한 루프가 돈다.

# worker/run.py
import asyncio, json
import redis.asyncio as redis

r = redis.from_url("redis://localhost", decode_responses=True)
GROUP, CONSUMER = "workers", "worker-1"

async def ensure_group():
  try:
      await r.xgroup_create("tasks", GROUP, id="0", mkstream=True)
  except redis.ResponseError as e:
      if "BUSYGROUP" not in str(e):
          raise

async def handle(task_id: str, payload: dict) -> str:
  # 진행 상황을 SSE 채널로 흘려보낸다
  await r.publish(f"task:{task_id}:events", json.dumps({"stage": "working"}))
  await asyncio.sleep(0.5)
  return f"요약: {payload.get('text', '')[:80]}..."

async def loop():
  await ensure_group()
  while True:
      resp = await r.xreadgroup(GROUP, CONSUMER, {"tasks": ">"}, count=1, block=5000)
      for _stream, entries in resp:
          for msg_id, data in entries:
              tid = data["task_id"]
              payload = json.loads(data["payload"])
              result = await handle(tid, payload)
              await r.hset(f"task:{tid}", mapping={"status": "completed", "result": result})
              await r.publish(f"task:{tid}:events", json.dumps({"stage": "completed", "result": result}))
              await r.xack("tasks", GROUP, msg_id)

if __name__ == "__main__":
  asyncio.run(loop())

SSE 스트리밍 — GET /tasks/:id/events

진행 상황을 클라이언트에 밀어 보낸다. 는 단방향 HTTP 이라 구현이 단순하다. sse-starletteEventSourceResponse가 표준이고, 와 곧장 붙는다. 이 필요하면 양방향이지만, 진행 알림엔 SSE로 충분하다.

# server/events.py
import json, asyncio
from fastapi import APIRouter
from sse_starlette.sse import EventSourceResponse
import redis.asyncio as redis

r = redis.from_url("redis://localhost", decode_responses=True)
router = APIRouter()

async def event_gen(task_id: str):
  ps = r.pubsub()
  await ps.subscribe(f"task:{task_id}:events")
  try:
      async for msg in ps.listen():
          if msg["type"] != "message":
              continue
          data = json.loads(msg["data"])
          yield {"event": data["stage"], "data": json.dumps(data)}
          if data["stage"] in ("completed", "failed"):
              break
  finally:
      await ps.unsubscribe(f"task:{task_id}:events")

@router.get("/tasks/{task_id}/events")
async def events(task_id: str):
  return EventSourceResponse(event_gen(task_id))

Task 조회 — ETag로 동시성 통제

GET /tasks/:id는 현재 상태를 돌려준다. 를 응답 헤더에 같이 보내면 클라이언트가 If-None-Match변경 없음을 빠르게 확인한다. 폴링 트래픽이 줄고, 동시 수정 시 412 Precondition Failed로 충돌을 잡아낸다. 까지 함께 로깅하면 추적도 깔끔하다.

# server/get_task.py
import hashlib, json
from fastapi import APIRouter, Header, Response
import redis.asyncio as redis

r = redis.from_url("redis://localhost", decode_responses=True)
router = APIRouter()

def etag_of(data: dict) -> str:
  raw = json.dumps(data, sort_keys=True).encode()
  return '"' + hashlib.sha256(raw).hexdigest()[:16] + '"'

@router.get("/tasks/{task_id}")
async def get_task(
  task_id: str,
  response: Response,
  if_none_match: str | None = Header(default=None, alias="If-None-Match"),
):
  data = await r.hgetall(f"task:{task_id}")
  if not data:
      response.status_code = 404
      return {"error": "not found"}
  tag = etag_of(data)
  if if_none_match == tag:
      response.status_code = 304
      return
  response.headers["ETag"] = tag
  return data

멱등성 키 정책

클라이언트가 짧은 시간 안에 같은 요청을 두 번 보낼 때 한 번만 처리됨을 보장한다. 24시간 정도 키-task 매핑을 보관하고, 같은 키로 들어온 새 요청은 기존 ID를 그대로 돌려준다.

키 생성은 보통 클라이언트가 한다. UUID v4가 충분히 안전하다. 서버는 SETNX(원자적 set-if-not-exists)로 경쟁 조건을 막는다. 과 한 쌍이고, 와 함께 로그에 남긴다.


인증

authentication.schemes로 광고한다. Bearer 토큰이 가장 흔하다. Depends(security_scheme)로, fastify는 preHandler 훅으로 깐다. 호출자 카드를 캐싱하고, 필요한 에만 토큰을 허용하는 게 기본 패턴이다.

# server/auth.py
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

bearer = HTTPBearer()

VALID_TOKENS = {"abc123": "client-1"}

def auth(creds: HTTPAuthorizationCredentials = Security(bearer)) -> str:
  caller = VALID_TOKENS.get(creds.credentials)
  if not caller:
      raise HTTPException(status_code=401, detail="invalid token")
  return caller

비동기 IO 전반

전 코드가 위에 돈다. Python 쪽은 redis.asyncio, TypeScript 쪽은 ioredis가 표준이다. 한 이벤트 루프 안에서 HTTP 핸들러· 소비· 발행이 함께 협력한다. 동기 코드와 섞이면 즉시 병목이 잡힌다. 블로킹 함수는 워커 안에서도 to_thread로 빼낸다는 게 1원칙이다.


운영 체크리스트

여섯이 통과되면 직접 짠 도 프로덕션에 충분히 가깝다.


다음 장으로

셋 + 자체 구현까지 봤으니, 이제 어느 걸 언제 쓸지 로 정리할 차례다. 21장에서 ···자체 구현을 케이스별로 점수 매긴다.