Skip to content

Commit ff36c96

Browse files
maxmynterAdrianoKF
andauthored
Support user-specified image pull policy (#132)
* (wip) Add image pull policy to CLI kwargs * deps: Bump client dependency in backend This is needed to access the newly added types in the client package. * tests(backend): Don't install deps into external cluster for E2E tests * feat(backend): Support for image pull policies This commit adds support for pull policies in the runners. The job submission endpoint passes the pull policy to the runner, by default the policy is set to `Always` to prevent issues with stale images. The e2e test has been updated to pass `IfNotPresent` as the pull policy, since the job image for the test suite is built locally and cannot be pulled from a registry. * chore: Regenerate OpenAPI client * fix(client): Adapt image pull policy type for API * chore: Remove FIXME * refactor: Move ImagePullPolicy type into backend The `Runner` interface has been changed to accept an `ImageRef`, instead of importing the (useless) `Image` type from the client package. Co-authored-by: Max Mynter <maxmynter@users.noreply.github.com> * docs: Update mention of default image pull policy --------- Co-authored-by: Adrian Rumpold <a.rumpold@gmail.com> Co-authored-by: Max Mynter <maxmynter@users.noreply.github.com>
1 parent abdc993 commit ff36c96

File tree

18 files changed

+170
-49
lines changed

18 files changed

+170
-49
lines changed

backend/pyproject.toml

+1-7
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,7 @@ maintainers = [
1616
{ name = "Adrian Rumpold", email = "a.rumpold@appliedai-institute.de" },
1717
]
1818
license = { text = "Apache-2.0" }
19-
dependencies = [
20-
"fastapi",
21-
"uvicorn",
22-
"docker",
23-
"kubernetes",
24-
"aai-jobq",
25-
]
19+
dependencies = ["fastapi", "uvicorn", "docker", "kubernetes", "aai-jobq"]
2620
dynamic = ["version"]
2721

2822
[project.optional-dependencies]

backend/src/jobq_server/models.py

+8
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ def validate_image_ref(ref: str) -> str:
3232

3333
ImageRef = Annotated[str, AfterValidator(validate_image_ref)]
3434

35+
36+
class ImagePullPolicy(StrEnum):
37+
ALWAYS = "Always"
38+
NEVER = "Never"
39+
IFNOTPRESENT = "IfNotPresent"
40+
41+
3542
SubmissionContext: TypeAlias = dict[str, Any]
3643

3744

@@ -60,6 +67,7 @@ class CreateJobModel(BaseModel):
6067
image_ref: ImageRef
6168
mode: ExecutionMode
6269
options: JobOptions
70+
pull_policy: ImagePullPolicy = ImagePullPolicy.ALWAYS
6371
submission_context: SubmissionContext = Field(default_factory=dict)
6472

6573

backend/src/jobq_server/routers/jobs.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from fastapi import APIRouter, Depends, HTTPException, Query, Response
77
from fastapi import status as http_status
88
from fastapi.responses import StreamingResponse
9-
from jobq import Image, Job
9+
from jobq import Job
1010

1111
from jobq_server.dependencies import Kubernetes, ManagedWorkload
1212
from jobq_server.exceptions import PodNotReadyError
@@ -50,8 +50,9 @@ def job_fn(): ...
5050
detail=f"unsupported job execution mode: {opts.mode!r}",
5151
)
5252

53-
image = Image(opts.image_ref)
54-
workload_id = runner.run(job, image, opts.submission_context)
53+
workload_id = runner.run(
54+
job, opts.image_ref, opts.submission_context, opts.pull_policy
55+
)
5556
return workload_id
5657

5758

backend/src/jobq_server/runner/base.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
11
import abc
22
from typing import ClassVar, Self
33

4-
from jobq import Image, Job
4+
from jobq import Job
55

6-
from jobq_server.models import ExecutionMode, SubmissionContext, WorkloadIdentifier
6+
from jobq_server.models import (
7+
ExecutionMode,
8+
ImagePullPolicy,
9+
ImageRef,
10+
SubmissionContext,
11+
WorkloadIdentifier,
12+
)
713

814

915
class Runner(abc.ABC):
1016
_impls: ClassVar[dict[ExecutionMode, type[Self]]] = {}
1117

1218
@abc.abstractmethod
1319
def run(
14-
self, job: Job, image: Image, context: SubmissionContext
20+
self,
21+
job: Job,
22+
image: ImageRef,
23+
context: SubmissionContext,
24+
pull_policy: ImagePullPolicy = ImagePullPolicy.ALWAYS,
1525
) -> WorkloadIdentifier | None: ...
1626

1727
@classmethod

backend/src/jobq_server/runner/docker.py

+22-5
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,16 @@
22
import textwrap
33

44
import docker
5-
from jobq import Image, Job
5+
from jobq import Job
66
from jobq.job import DockerResourceOptions
7-
8-
from jobq_server.models import ExecutionMode, SubmissionContext
7+
from typing_extensions import override
8+
9+
from jobq_server.models import (
10+
ExecutionMode,
11+
ImagePullPolicy,
12+
ImageRef,
13+
SubmissionContext,
14+
)
915
from jobq_server.runner.base import Runner, _make_executor_command
1016
from jobq_server.utils.helpers import remove_none_values
1117

@@ -15,7 +21,18 @@ def __init__(self, **kwargs):
1521
super().__init__()
1622
self._client = docker.from_env()
1723

18-
def run(self, job: Job, image: Image, context: SubmissionContext) -> None:
24+
@override
25+
def run(
26+
self,
27+
job: Job,
28+
image: ImageRef,
29+
context: SubmissionContext,
30+
pull_policy: ImagePullPolicy = ImagePullPolicy.ALWAYS,
31+
) -> None:
32+
if pull_policy == ImagePullPolicy.ALWAYS:
33+
logging.debug("Pulling container image %s", image)
34+
self._client.images.pull(image)
35+
1936
command = _make_executor_command(job)
2037

2138
resource_kwargs: DockerResourceOptions = {
@@ -27,7 +44,7 @@ def run(self, job: Job, image: Image, context: SubmissionContext) -> None:
2744
resource_kwargs = res.to_docker()
2845

2946
container: docker.api.client.ContainerApiMixin = self._client.containers.run(
30-
image=image.tag,
47+
image=image,
3148
command=command,
3249
detach=True,
3350
**remove_none_values(resource_kwargs),

backend/src/jobq_server/runner/kueue.py

+24-8
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
import logging
22
from dataclasses import asdict
33

4-
from jobq import Image, Job
4+
from jobq import Job
55
from jobq.types import K8sResourceKind
66
from kubernetes import client
7-
8-
from jobq_server.models import ExecutionMode, SubmissionContext, WorkloadIdentifier
7+
from typing_extensions import override
8+
9+
from jobq_server.models import (
10+
ExecutionMode,
11+
ImagePullPolicy,
12+
ImageRef,
13+
SubmissionContext,
14+
WorkloadIdentifier,
15+
)
916
from jobq_server.runner.base import Runner, _make_executor_command
1017
from jobq_server.services.k8s import KubernetesService
1118
from jobq_server.utils.k8s import (
@@ -24,7 +31,11 @@ def __init__(self, k8s: KubernetesService, **kwargs: str) -> None:
2431
self._queue = kwargs.get("local_queue", "user-queue")
2532

2633
def _make_job_crd(
27-
self, job: Job, image: Image, context: SubmissionContext
34+
self,
35+
job: Job,
36+
image: ImageRef,
37+
context: SubmissionContext,
38+
pull_policy: ImagePullPolicy,
2839
) -> client.V1Job:
2940
if not job.options:
3041
raise ValueError("Job options must be specified")
@@ -39,8 +50,8 @@ def _make_job_crd(
3950

4051
# Job container
4152
container = client.V1Container(
42-
image=image.tag,
43-
image_pull_policy="IfNotPresent",
53+
image=image,
54+
image_pull_policy=pull_policy.value,
4455
name="workload",
4556
command=_make_executor_command(job),
4657
resources=(
@@ -70,12 +81,17 @@ def _make_job_crd(
7081
),
7182
)
7283

84+
@override
7385
def run(
74-
self, job: Job, image: Image, context: SubmissionContext
86+
self,
87+
job: Job,
88+
image: ImageRef,
89+
context: SubmissionContext,
90+
pull_policy: ImagePullPolicy = ImagePullPolicy.ALWAYS,
7591
) -> WorkloadIdentifier:
7692
logging.info(f"Submitting job {job.name} to Kueue")
7793

78-
k8s_job = self._make_job_crd(job, image, context)
94+
k8s_job = self._make_job_crd(job, image, context, pull_policy)
7995
batch_api = client.BatchV1Api()
8096
resource: client.V1Job = batch_api.create_namespaced_job(
8197
self._k8s.namespace, k8s_job

backend/src/jobq_server/runner/ray.py

+26-13
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,18 @@
55
from dataclasses import asdict
66

77
import yaml
8-
from jobq import Image, Job
8+
from jobq import Job
99
from jobq.types import K8sResourceKind
1010
from kubernetes import client
11-
12-
from jobq_server.models import ExecutionMode, SubmissionContext, WorkloadIdentifier
11+
from typing_extensions import override
12+
13+
from jobq_server.models import (
14+
ExecutionMode,
15+
ImagePullPolicy,
16+
ImageRef,
17+
SubmissionContext,
18+
WorkloadIdentifier,
19+
)
1320
from jobq_server.runner.base import Runner, _make_executor_command
1421
from jobq_server.services.k8s import KubernetesService
1522
from jobq_server.utils.k8s import (
@@ -29,7 +36,11 @@ def __init__(self, k8s: KubernetesService, **kwargs):
2936
self._k8s = k8s
3037

3138
def _create_ray_job(
32-
self, job: Job, image: Image, context: SubmissionContext
39+
self,
40+
job: Job,
41+
image: ImageRef,
42+
context: SubmissionContext,
43+
pull_policy: ImagePullPolicy,
3344
) -> dict:
3445
"""Create a ``RayJob`` Kubernetes resource for the Kuberay operator."""
3546

@@ -49,9 +60,6 @@ def _create_ray_job(
4960
suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=4))
5061
job_id = f"{job.name}-{suffix}"
5162

52-
# FIXME: Image pull policy should be configurable
53-
# It is currently hardcoded to "IfNotPresent" to support running
54-
# the E2E tests in a cluster without a proper image registry.
5563
manifest = {
5664
"apiVersion": "ray.io/v1",
5765
"kind": "RayJob",
@@ -78,8 +86,8 @@ def _create_ray_job(
7886
"containers": [
7987
{
8088
"name": "head",
81-
"image": image.tag,
82-
"imagePullPolicy": "IfNotPresent",
89+
"image": image,
90+
"imagePullPolicy": pull_policy.value,
8391
"resources": {
8492
"requests": res_opts.to_kubernetes(
8593
kind=K8sResourceKind.REQUESTS
@@ -100,8 +108,8 @@ def _create_ray_job(
100108
"containers": [
101109
{
102110
"name": "ray-submit",
103-
"image": image.tag,
104-
"imagePullPolicy": "IfNotPresent",
111+
"image": image,
112+
"imagePullPolicy": pull_policy.value,
105113
}
106114
],
107115
},
@@ -111,14 +119,19 @@ def _create_ray_job(
111119

112120
return manifest
113121

122+
@override
114123
def run(
115-
self, job: Job, image: Image, context: SubmissionContext
124+
self,
125+
job: Job,
126+
image: ImageRef,
127+
context: SubmissionContext,
128+
pull_policy: ImagePullPolicy,
116129
) -> WorkloadIdentifier:
117130
logging.info(
118131
f"Submitting RayJob {job.name} to namespace {self._k8s.namespace!r}"
119132
)
120133

121-
manifest = self._create_ray_job(job, image, context)
134+
manifest = self._create_ray_job(job, image, context, pull_policy)
122135
api = client.CustomObjectsApi()
123136
obj = api.create_namespaced_custom_object(
124137
"ray.io", "v1", self._k8s.namespace, "rayjobs", manifest

backend/tests/e2e/conftest.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ def cluster() -> Generator[KubernetesCluster, None, None]:
6666
"""Create a Kubernetes cluster for testing based on environment variable"""
6767
cluster_type = os.getenv("E2E_CLUSTER_TYPE", "minikube").lower()
6868
context = os.getenv("E2E_K8S_CONTEXT")
69+
70+
# Don't install dependencies into an external cluster
71+
install_kuberay = context is None
72+
install_kueue = context is None
73+
6974
if cluster_type == "minikube":
7075
cluster = MinikubeCluster(name=context)
7176
else:
@@ -74,8 +79,10 @@ def cluster() -> Generator[KubernetesCluster, None, None]:
7479
try:
7580
# Install Kuberay first, so that the CRDs (RayJob, RayCluster)
7681
# are available for Kueue to be watched.
77-
setup_kuberay(cluster)
78-
setup_kueue(cluster)
82+
if install_kuberay:
83+
setup_kuberay(cluster)
84+
if install_kueue:
85+
setup_kueue(cluster)
7986
yield cluster
8087
finally:
8188
cluster.delete()

backend/tests/e2e/test_jobs.py

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def test_job_lifecycle(
4545
),
4646
resources=ResourceOptions(cpu="1", memory="512Mi"),
4747
),
48+
# e2e test image is built locally and sideloaded into the cluster
49+
pull_policy="IfNotPresent",
4850
)
4951

5052
# Submit a job for execution

backend/uv.lock

+3-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/src/cli/commands/submit.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from jobq import Image, Job
1212
from jobq.submission_context import SubmissionContext
1313
from openapi_client import ExecutionMode
14+
from openapi_client.models.image_pull_policy import ImagePullPolicy
1415

1516

1617
def submit(args: argparse.Namespace, settings: Settings) -> None:
@@ -32,6 +33,7 @@ def _submit_remote_job(
3233
client: openapi_client.JobManagementApi,
3334
job: Job,
3435
mode: ExecutionMode,
36+
pull_policy: ImagePullPolicy,
3537
settings: Settings,
3638
) -> None:
3739
# Job options sent to server do not need image options
@@ -44,6 +46,7 @@ def _submit_remote_job(
4446
file=job.file,
4547
image_ref=_build_image(job, mode).tag,
4648
mode=mode,
49+
pull_policy=openapi_client.ImagePullPolicy(pull_policy.value),
4750
options=openapi_client.JobOptions.model_validate(job.options.model_dump()),
4851
submission_context=SubmissionContext().to_dict(),
4952
)
@@ -57,13 +60,14 @@ def submit_job(
5760
settings: Settings,
5861
) -> None:
5962
mode = args.mode
63+
pull_policy = args.pull_policy
6064
logging.debug(f"Execution mode: {mode}")
6165
match mode:
6266
case ExecutionMode.LOCAL:
6367
# Run the job locally
6468
job()
6569
case _:
66-
_submit_remote_job(job, mode, settings=settings)
70+
_submit_remote_job(job, mode, pull_policy, settings=settings)
6771

6872

6973
def discover_job(args: argparse.Namespace) -> Job:
@@ -120,6 +124,12 @@ def add_parser(subparsers: Any, parent: argparse.ArgumentParser) -> None:
120124
choices=list(ExecutionMode),
121125
type=ExecutionMode,
122126
)
127+
parser.add_argument(
128+
"--pull-policy",
129+
default=ImagePullPolicy.ALWAYS,
130+
choices=list(ImagePullPolicy),
131+
type=ImagePullPolicy,
132+
)
123133

124134
parser.add_argument("entrypoint")
125135
# TODO: Factor out into command class

0 commit comments

Comments
 (0)