자체 구현 — FastAPI + Redis
프레임워크 없이도 핵심은 짤 수 있다. + + 셋이면 충분하다. 이번 장은 그 셋을 묶어 작동하는 A2A 서버를 처음부터 끝까지 조립한다.
전체 구조
세 컴포넌트가 한 줄로 묶인다. HTTP 입구는 , 작업 큐는 , 클라이언트로의 진행 상황은 . 워커는 별 프로세스로 돌면서 큐를 소비한다.
Agent Card 노출
표준은 /.well-known/agent-card.json에 자신을 광고한다. 는 이름···인증을 JSON으로 적은 한 장이다. 자세한 스키마는 8장에서 다뤘다. 로는 정적 라우터 한 줄이면 끝이다.
# server/agent_card.py
from fastapi import APIRouter
router = APIRouter()
@router.get("/.well-known/agent-card.json")
def agent_card():
return {
"name": "summarizer-bot",
"description": "긴 문서를 한 단락으로 요약한다.",
"url": "https://example.com/a2a",
"version": "0.1.0",
"skills": [
{
"id": "summarize",
"name": "summarize",
"description": "텍스트 요약",
"examples": ["A2A 프로토콜이란?"],
}
],
"authentication": {"schemes": ["bearer"]},
}Task 생성 — POST /tasks
클라이언트가 를 만든다. 서버는 즉시 task_id를 돌려주고, 실제 일은 로 던져 백그라운드 워커에 맡긴다. 이게 의 1단계다.
를 헤더로 받으면 같은 키의 재요청은 동일한 task를 가리킨다. 네트워크 재시도·중복 클릭에 안전해진다.
# server/tasks.py
import uuid, json
from fastapi import APIRouter, Header, HTTPException
import redis.asyncio as redis
r = redis.from_url("redis://localhost", decode_responses=True)
router = APIRouter()
@router.post("/tasks")
async def create_task(
body: dict,
idempotency_key: str | None = Header(default=None, alias="Idempotency-Key"),
):
# 1) 멱등성 키가 이미 있으면 기존 task 반환
if idempotency_key:
existing = await r.get(f"idem:{idempotency_key}")
if existing:
return {"task_id": existing, "status": "submitted"}
task_id = str(uuid.uuid4())
if idempotency_key:
# NX = 처음일 때만 set, 24h TTL
await r.set(f"idem:{idempotency_key}", task_id, ex=86400, nx=True)
# 2) Redis Streams 로 enqueue
await r.xadd("tasks", {"task_id": task_id, "payload": json.dumps(body)})
await r.hset(f"task:{task_id}", mapping={"status": "submitted"})
return {"task_id": task_id, "status": "submitted"}Redis Streams — 큐의 핵심
xadd로 를 append, xreadgroup으로 컨슈머 그룹이 협력해 소비한다. 일반 와 달리 메시지가 남는다 — 재처리·시간 여행이 공짜다. 는 같은 스트림을 여러 워커가 안전하게 나눠 먹게 만든다.
워커 루프
워커 프로세스는 한 줄로 요약된다 — “에서 한 받아 처리하고 ack 한다”. 처리 중 진행 상황은 별도 로 펍한다. 위에서 무한 루프가 돈다.
# worker/run.py
import asyncio, json
import redis.asyncio as redis
r = redis.from_url("redis://localhost", decode_responses=True)
GROUP, CONSUMER = "workers", "worker-1"
async def ensure_group():
try:
await r.xgroup_create("tasks", GROUP, id="0", mkstream=True)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
async def handle(task_id: str, payload: dict) -> str:
# 진행 상황을 SSE 채널로 흘려보낸다
await r.publish(f"task:{task_id}:events", json.dumps({"stage": "working"}))
await asyncio.sleep(0.5)
return f"요약: {payload.get('text', '')[:80]}..."
async def loop():
await ensure_group()
while True:
resp = await r.xreadgroup(GROUP, CONSUMER, {"tasks": ">"}, count=1, block=5000)
for _stream, entries in resp:
for msg_id, data in entries:
tid = data["task_id"]
payload = json.loads(data["payload"])
result = await handle(tid, payload)
await r.hset(f"task:{tid}", mapping={"status": "completed", "result": result})
await r.publish(f"task:{tid}:events", json.dumps({"stage": "completed", "result": result}))
await r.xack("tasks", GROUP, msg_id)
if __name__ == "__main__":
asyncio.run(loop())SSE 스트리밍 — GET /tasks/:id/events
진행 상황을 클라이언트에 밀어 보낸다. 는 단방향 HTTP 이라 구현이 단순하다. sse-starlette의 EventSourceResponse가 표준이고, 와 곧장 붙는다. 이 필요하면 양방향이지만, 진행 알림엔 SSE로 충분하다.
# server/events.py
import json, asyncio
from fastapi import APIRouter
from sse_starlette.sse import EventSourceResponse
import redis.asyncio as redis
r = redis.from_url("redis://localhost", decode_responses=True)
router = APIRouter()
async def event_gen(task_id: str):
ps = r.pubsub()
await ps.subscribe(f"task:{task_id}:events")
try:
async for msg in ps.listen():
if msg["type"] != "message":
continue
data = json.loads(msg["data"])
yield {"event": data["stage"], "data": json.dumps(data)}
if data["stage"] in ("completed", "failed"):
break
finally:
await ps.unsubscribe(f"task:{task_id}:events")
@router.get("/tasks/{task_id}/events")
async def events(task_id: str):
return EventSourceResponse(event_gen(task_id))Task 조회 — ETag로 동시성 통제
GET /tasks/:id는 현재 상태를 돌려준다. 를 응답 헤더에 같이 보내면 클라이언트가 If-None-Match로 변경 없음을 빠르게 확인한다. 폴링 트래픽이 줄고, 동시 수정 시 412 Precondition Failed로 충돌을 잡아낸다. 까지 함께 로깅하면 추적도 깔끔하다.
# server/get_task.py
import hashlib, json
from fastapi import APIRouter, Header, Response
import redis.asyncio as redis
r = redis.from_url("redis://localhost", decode_responses=True)
router = APIRouter()
def etag_of(data: dict) -> str:
raw = json.dumps(data, sort_keys=True).encode()
return '"' + hashlib.sha256(raw).hexdigest()[:16] + '"'
@router.get("/tasks/{task_id}")
async def get_task(
task_id: str,
response: Response,
if_none_match: str | None = Header(default=None, alias="If-None-Match"),
):
data = await r.hgetall(f"task:{task_id}")
if not data:
response.status_code = 404
return {"error": "not found"}
tag = etag_of(data)
if if_none_match == tag:
response.status_code = 304
return
response.headers["ETag"] = tag
return data멱등성 키 정책
는 클라이언트가 짧은 시간 안에 같은 요청을 두 번 보낼 때 한 번만 처리됨을 보장한다. 24시간 정도 키-task 매핑을 보관하고, 같은 키로 들어온 새 요청은 기존 ID를 그대로 돌려준다.
키 생성은 보통 클라이언트가 한다. UUID v4가 충분히 안전하다. 서버는 SETNX(원자적 set-if-not-exists)로 경쟁 조건을 막는다. 과 한 쌍이고, 와 함께 로그에 남긴다.
인증
은 의 authentication.schemes로 광고한다. Bearer 토큰이 가장 흔하다. 는 Depends(security_scheme)로, fastify는 preHandler 훅으로 깐다. 호출자 카드를 캐싱하고, 필요한 에만 토큰을 허용하는 게 기본 패턴이다.
# server/auth.py
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
bearer = HTTPBearer()
VALID_TOKENS = {"abc123": "client-1"}
def auth(creds: HTTPAuthorizationCredentials = Security(bearer)) -> str:
caller = VALID_TOKENS.get(creds.credentials)
if not caller:
raise HTTPException(status_code=401, detail="invalid token")
return caller비동기 IO 전반
전 코드가 위에 돈다. Python 쪽은 redis.asyncio, TypeScript 쪽은 ioredis가 표준이다. 한 이벤트 루프 안에서 HTTP 핸들러· 소비· 발행이 함께 협력한다. 동기 코드와 섞이면 즉시 병목이 잡힌다. 블로킹 함수는 워커 안에서도 to_thread로 빼낸다는 게 1원칙이다.
운영 체크리스트
- 모든 변경 엔드포인트가 를 받는가
- 조회 엔드포인트가 +
If-None-Match를 지원하는가 - 의 컨슈머 그룹과
XPENDING회수 정책이 있는가 - 연결의 끝맺음(
completed/failed)이 명확한가 - 이 로그·트레이스에서 마스킹되는가
- 장애 시 그레이스풀 셧다운이 되는가
여섯이 통과되면 직접 짠 도 프로덕션에 충분히 가깝다.
다음 장으로
셋 + 자체 구현까지 봤으니, 이제 어느 걸 언제 쓸지 로 정리할 차례다. 21장에서 ···자체 구현을 케이스별로 점수 매긴다.