Skip to content

Commit a361567

Browse files
authored
feat(backend): Add endpoint to list workloads (#83)
The endpoint can either return workload identifiers, or optionally the workload metadata. Additionally, the path prefix has been removed from the endpoints in the `jobs` router, instead it is now mounted with an appropriate path prefix.
1 parent 689114e commit a361567

File tree

7 files changed

+174
-57
lines changed

7 files changed

+174
-57
lines changed

backend/src/jobs_server/__main__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ async def lifespan(app: FastAPI):
2020
lifespan=lifespan,
2121
)
2222

23-
app.include_router(jobs.router)
23+
app.include_router(jobs.router, prefix="/jobs")
2424

2525

2626
@app.get("/health", include_in_schema=False)

backend/src/jobs_server/models.py

+24-6
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
import json
22
import re
33
from enum import StrEnum
4-
from typing import TYPE_CHECKING, Annotated, Any, Self, TypeAlias
4+
from typing import Annotated, Any, Self, TypeAlias
55

66
from annotated_types import Ge
77
from jobs import JobOptions
88
from pydantic import AfterValidator, BaseModel, Field, StrictStr
99

10-
from jobs_server.utils.kueue import JobId, WorkloadSpec, WorkloadStatus
11-
12-
if TYPE_CHECKING:
13-
from jobs_server.dependencies import ManagedWorkload
10+
from jobs_server.utils.kueue import JobId, KueueWorkload, WorkloadSpec, WorkloadStatus
1411

1512

1613
def validate_image_ref(ref: str) -> str:
@@ -75,6 +72,21 @@ class WorkloadIdentifier(BaseModel):
7572
namespace: StrictStr
7673
uid: StrictStr
7774

75+
@classmethod
76+
def from_kueue_workload(cls, workload: KueueWorkload) -> Self:
77+
if len(workload.metadata.owner_references) != 1:
78+
raise ValueError(
79+
f"Workload {workload.metadata.uid} has multiple owner references: {workload.metadata.owner_references}"
80+
)
81+
owner_ref = workload.metadata.owner_references[0]
82+
return cls(
83+
group=owner_ref.api_version.split("/")[0],
84+
version=owner_ref.api_version.split("/")[1],
85+
kind=owner_ref.kind,
86+
uid=owner_ref.uid,
87+
namespace=workload.metadata.namespace,
88+
)
89+
7890

7991
class JobStatus(StrEnum):
8092
PENDING = "pending"
@@ -94,7 +106,7 @@ class WorkloadMetadata(BaseModel):
94106
kueue_status: WorkloadStatus
95107

96108
@classmethod
97-
def from_managed_workload(cls, workload: "ManagedWorkload") -> Self:
109+
def from_kueue_workload(cls, workload: KueueWorkload) -> Self:
98110
if workload.owner_uid is None:
99111
raise ValueError("Workload has no owner UID")
100112
return WorkloadMetadata(
@@ -111,3 +123,9 @@ class LogOptions(BaseModel):
111123
default=-1,
112124
description="Number of tail lines of logs, -1 for all",
113125
)
126+
127+
128+
class ListWorkloadModel(BaseModel):
129+
name: str
130+
id: WorkloadIdentifier
131+
metadata: WorkloadMetadata | None = None

backend/src/jobs_server/routers/jobs.py

+32-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from typing import Annotated
33

4-
from fastapi import APIRouter, Depends, HTTPException, Response
4+
from fastapi import APIRouter, Depends, HTTPException, Query, Response
55
from fastapi import status as http_status
66
from fastapi.responses import StreamingResponse
77
from jobs import Image, Job
@@ -11,6 +11,7 @@
1111
from jobs_server.models import (
1212
CreateJobModel,
1313
ExecutionMode,
14+
ListWorkloadModel,
1415
LogOptions,
1516
WorkloadIdentifier,
1617
WorkloadMetadata,
@@ -22,7 +23,7 @@
2223
router = APIRouter(tags=["Job management"])
2324

2425

25-
@router.post("/jobs")
26+
@router.post("")
2627
async def submit_job(
2728
opts: CreateJobModel,
2829
k8s: Kubernetes,
@@ -52,20 +53,20 @@ def job_fn(): ...
5253
return workload_id
5354

5455

55-
@router.get("/jobs/{uid}/status")
56+
@router.get("/{uid}/status")
5657
async def status(
5758
workload: ManagedWorkload,
5859
) -> WorkloadMetadata:
5960
try:
60-
return WorkloadMetadata.from_managed_workload(workload)
61+
return WorkloadMetadata.from_kueue_workload(workload)
6162
except ValueError as e:
6263
raise HTTPException(
6364
status_code=http_status.HTTP_404_NOT_FOUND,
6465
detail=f"Workload not found or invalid: {str(e)}",
6566
) from e
6667

6768

68-
@router.get("/jobs/{uid}/logs")
69+
@router.get("/{uid}/logs")
6970
async def logs(
7071
workload: ManagedWorkload,
7172
k8s: Kubernetes,
@@ -86,7 +87,7 @@ async def logs(
8687
raise HTTPException(http_status.HTTP_400_BAD_REQUEST, "pod not ready") from e
8788

8889

89-
@router.post("/jobs/{uid}/stop")
90+
@router.post("/{uid}/stop")
9091
async def stop_workload(
9192
uid: JobId,
9293
workload: ManagedWorkload,
@@ -104,3 +105,28 @@ async def stop_workload(
104105
http_status.HTTP_500_INTERNAL_SERVER_ERROR,
105106
"Failed to terminate workload",
106107
) from e
108+
109+
110+
@router.get("", response_model_exclude_unset=True)
111+
async def list_jobs(
112+
k8s: Kubernetes,
113+
include_metadata: Annotated[bool, Query()] = False,
114+
) -> list[ListWorkloadModel]:
115+
workloads = k8s.list_workloads()
116+
if include_metadata:
117+
return [
118+
ListWorkloadModel(
119+
name=workload.metadata.name,
120+
id=WorkloadIdentifier.from_kueue_workload(workload),
121+
metadata=WorkloadMetadata.from_kueue_workload(workload),
122+
)
123+
for workload in workloads
124+
]
125+
else:
126+
return [
127+
ListWorkloadModel(
128+
name=workload.metadata.name,
129+
id=WorkloadIdentifier.from_kueue_workload(workload),
130+
)
131+
for workload in workloads
132+
]

backend/src/jobs_server/services/k8s.py

+13
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,16 @@ def delete_resource(
113113
namespace=namespace,
114114
body=client.V1DeleteOptions(propagation_policy=propagation_policy),
115115
)
116+
117+
def list_workloads(self, namespace: str | None = None) -> list[KueueWorkload]:
118+
api = client.CustomObjectsApi()
119+
workloads = api.list_namespaced_custom_object(
120+
group="kueue.x-k8s.io",
121+
version="v1beta1",
122+
namespace=namespace or self.namespace,
123+
plural="workloads",
124+
)
125+
return [
126+
KueueWorkload.model_validate(workload)
127+
for workload in workloads.get("items", [])
128+
]

backend/src/jobs_server/utils/kueue.py

+7-12
Original file line numberDiff line numberDiff line change
@@ -125,26 +125,24 @@ class KueueWorkload(BaseModel):
125125
spec: WorkloadSpec
126126
status: WorkloadStatus
127127

128+
model_config = ConfigDict(
129+
arbitrary_types_allowed=True,
130+
)
131+
128132
@field_validator("metadata", mode="before")
129133
def create_metadata(cls, metadata: client.V1ObjectMeta) -> client.V1ObjectMeta:
130134
return build_metadata(metadata)
131135

132-
owner_uid: JobId | None = None
133-
134-
model_config = ConfigDict(
135-
arbitrary_types_allowed=True,
136-
)
136+
@property
137+
def owner_uid(self) -> JobId:
138+
return self.metadata.owner_references[0].uid
137139

138140
@classmethod
139141
def for_managed_resource(cls, uid: str, namespace: str):
140142
workload = workload_by_managed_uid(uid, namespace)
141143
if workload.get("status") is None:
142144
raise WorkloadNotFound(uid=uid, namespace=namespace)
143145
result = cls.model_validate(workload)
144-
145-
# speed up subsequent lookups of associated resource by memoizing the managed resource UID
146-
result.owner_uid = uid
147-
148146
return result
149147

150148
@property
@@ -175,9 +173,6 @@ def managed_resource(self):
175173
def pod(self) -> client.V1Pod:
176174
api = client.CoreV1Api()
177175

178-
if self.owner_uid is None:
179-
self.owner_uid = self.managed_resource.metadata["uid"]
180-
181176
if self.managed_resource.kind == "Job":
182177
# Jobs are simple, they directly control the pods (which we can look up by their controller UID)
183178
controller_uid = self.owner_uid

backend/tests/e2e/test_jobs.py

+18
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
CreateJobModel,
1111
ExecutionMode,
1212
JobStatus,
13+
ListWorkloadModel,
1314
WorkloadIdentifier,
1415
WorkloadMetadata,
1516
)
@@ -72,6 +73,23 @@ def test_job_lifecycle(
7273

7374
time.sleep(0.5)
7475

76+
# Check that the workload is listed
77+
response = client.get("/jobs")
78+
assert response.status_code == 200
79+
workloads = [
80+
ListWorkloadModel.model_validate(workload) for workload in response.json()
81+
]
82+
assert managed_resource_id in [workload.id for workload in workloads]
83+
84+
# Detailed workload listing including metadata
85+
response = client.get("/jobs?include_metadata=true")
86+
assert response.status_code == 200
87+
workloads = [
88+
ListWorkloadModel.model_validate(workload) for workload in response.json()
89+
]
90+
assert managed_resource_id in [workload.id for workload in workloads]
91+
assert all(workload.metadata is not None for workload in workloads)
92+
7593
# Check workload logs (retry if pod is not ready yet)
7694
while True:
7795
response = client.get(f"/jobs/{managed_resource_id.uid}/logs")

0 commit comments

Comments
 (0)