class MooncakeBootstrapServer:
"""
A centralized server running on the global rank 0 prefiller worker.
Prefiller workers register their connection info (IP, port, ranks) here.
"""
def __init__(self, vllm_config: VllmConfig, host: str, port: int):
self.workers: dict[int, EngineEntry] = {}
self.host = host
self.port = port
self.app = FastAPI()
self._register_routes()
self.server_thread: threading.Thread | None = None
self.server: uvicorn.Server | None = None
def __del__(self):
self.shutdown()
def _register_routes(self):
# All methods are async. No need to use lock to protect data.
self.app.post("/register")(self.register_worker)
self.app.get("/query", response_model=dict[int, EngineEntry])(self.query)
def start(self):
if self.server_thread:
return
config = uvicorn.Config(app=self.app, host=self.host, port=self.port)
self.server = uvicorn.Server(config=config)
self.server_thread = threading.Thread(
target=self.server.run, name="mooncake_bootstrap_server", daemon=True
)
self.server_thread.start()
while not self.server.started:
time.sleep(0.1) # Wait for the server to start
logger.info("Mooncake Bootstrap Server started at %s:%d", self.host, self.port)
def shutdown(self):
if self.server_thread is None or self.server is None or not self.server.started:
return
self.server.should_exit = True
self.server_thread.join()
logger.info("Mooncake Bootstrap Server stopped.")
async def register_worker(self, payload: RegisterWorkerPayload):
"""Handles registration of a prefiller worker."""
if payload.dp_rank not in self.workers:
self.workers[payload.dp_rank] = EngineEntry(
engine_id=payload.engine_id,
worker_addr={},
)
dp_entry = self.workers[payload.dp_rank]
if dp_entry.engine_id != payload.engine_id:
raise HTTPException(
status_code=400,
detail=(
f"Engine ID mismatch for dp_rank={payload.dp_rank}: "
f"expected {dp_entry.engine_id}, got {payload.engine_id}"
),
)
if payload.tp_rank not in dp_entry.worker_addr:
dp_entry.worker_addr[payload.tp_rank] = {}
tp_entry = dp_entry.worker_addr[payload.tp_rank]
if payload.pp_rank in tp_entry:
raise HTTPException(
status_code=400,
detail=(
f"Worker with dp_rank={payload.dp_rank}, "
f"tp_rank={payload.tp_rank}, pp_rank={payload.pp_rank} "
f"is already registered at "
f"{tp_entry[payload.pp_rank]}, "
f"but still want to register at {payload.addr}"
),
)
tp_entry[payload.pp_rank] = payload.addr
logger.debug(
"Registered worker: engine_id=%s, dp_rank=%d, tp_rank=%d, pp_rank=%d at %s",
payload.engine_id,
payload.dp_rank,
payload.tp_rank,
payload.pp_rank,
payload.addr,
)
return {"status": "ok"}
async def query(self) -> dict[int, EngineEntry]:
return self.workers