Skip to content

Commit 8542b55

Browse files
committed
feat(backend): Support for image pull policies
This commit adds support for pull policies in the runners. The job submission endpoint passes the pull policy to the runner, by default the policy is set to `Always` to prevent issues with stale images. The e2e test has been updated to pass `IfNotPresent` as the pull policy, since the job image for the test suite is built locally and cannot be pulled from a registry.
1 parent f252a77 commit 8542b55

File tree

6 files changed

+53
-16
lines changed

6 files changed

+53
-16
lines changed

backend/src/jobq_server/routers/jobs.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def job_fn(): ...
5151
)
5252

5353
image = Image(opts.image_ref, opts.pull_policy)
54-
workload_id = runner.run(job, image, opts.submission_context)
54+
workload_id = runner.run(job, image, opts.submission_context, opts.pull_policy)
5555
return workload_id
5656

5757

backend/src/jobq_server/runner/base.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import abc
22
from typing import ClassVar, Self
33

4-
from jobq import Image, Job
4+
from jobq import Image, ImagePullPolicy, Job
55

66
from jobq_server.models import ExecutionMode, SubmissionContext, WorkloadIdentifier
77

@@ -11,7 +11,11 @@ class Runner(abc.ABC):
1111

1212
@abc.abstractmethod
1313
def run(
14-
self, job: Job, image: Image, context: SubmissionContext
14+
self,
15+
job: Job,
16+
image: Image,
17+
context: SubmissionContext,
18+
pull_policy: ImagePullPolicy = ImagePullPolicy.ALWAYS,
1519
) -> WorkloadIdentifier | None: ...
1620

1721
@classmethod

backend/src/jobq_server/runner/docker.py

+14-2
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
import textwrap
33

44
import docker
5-
from jobq import Image, Job
5+
from jobq import Image, ImagePullPolicy, Job
66
from jobq.job import DockerResourceOptions
7+
from typing_extensions import override
78

89
from jobq_server.models import ExecutionMode, SubmissionContext
910
from jobq_server.runner.base import Runner, _make_executor_command
@@ -15,7 +16,18 @@ def __init__(self, **kwargs):
1516
super().__init__()
1617
self._client = docker.from_env()
1718

18-
def run(self, job: Job, image: Image, context: SubmissionContext) -> None:
19+
@override
20+
def run(
21+
self,
22+
job: Job,
23+
image: Image,
24+
context: SubmissionContext,
25+
pull_policy: ImagePullPolicy = ImagePullPolicy.ALWAYS,
26+
) -> None:
27+
if pull_policy == ImagePullPolicy.ALWAYS:
28+
logging.debug("Pulling container image %s", image.tag)
29+
self._client.images.pull(image.tag)
30+
1931
command = _make_executor_command(job)
2032

2133
resource_kwargs: DockerResourceOptions = {

backend/src/jobq_server/runner/kueue.py

+15-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import logging
22
from dataclasses import asdict
33

4-
from jobq import Image, Job
4+
from jobq import Image, ImagePullPolicy, Job
55
from jobq.types import K8sResourceKind
66
from kubernetes import client
7+
from typing_extensions import override
78

89
from jobq_server.models import ExecutionMode, SubmissionContext, WorkloadIdentifier
910
from jobq_server.runner.base import Runner, _make_executor_command
@@ -24,7 +25,11 @@ def __init__(self, k8s: KubernetesService, **kwargs: str) -> None:
2425
self._queue = kwargs.get("local_queue", "user-queue")
2526

2627
def _make_job_crd(
27-
self, job: Job, image: Image, context: SubmissionContext
28+
self,
29+
job: Job,
30+
image: Image,
31+
context: SubmissionContext,
32+
pull_policy: ImagePullPolicy,
2833
) -> client.V1Job:
2934
if not job.options:
3035
raise ValueError("Job options must be specified")
@@ -40,7 +45,7 @@ def _make_job_crd(
4045
# Job container
4146
container = client.V1Container(
4247
image=image.tag,
43-
image_pull_policy="IfNotPresent",
48+
image_pull_policy=pull_policy.value,
4449
name="workload",
4550
command=_make_executor_command(job),
4651
resources=(
@@ -70,12 +75,17 @@ def _make_job_crd(
7075
),
7176
)
7277

78+
@override
7379
def run(
74-
self, job: Job, image: Image, context: SubmissionContext
80+
self,
81+
job: Job,
82+
image: Image,
83+
context: SubmissionContext,
84+
pull_policy: ImagePullPolicy = ImagePullPolicy.ALWAYS,
7585
) -> WorkloadIdentifier:
7686
logging.info(f"Submitting job {job.name} to Kueue")
7787

78-
k8s_job = self._make_job_crd(job, image, context)
88+
k8s_job = self._make_job_crd(job, image, context, pull_policy)
7989
batch_api = client.BatchV1Api()
8090
resource: client.V1Job = batch_api.create_namespaced_job(
8191
self._k8s.namespace, k8s_job

backend/src/jobq_server/runner/ray.py

+16-6
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
from dataclasses import asdict
66

77
import yaml
8-
from jobq import Image, Job
8+
from jobq import Image, ImagePullPolicy, Job
99
from jobq.types import K8sResourceKind
1010
from kubernetes import client
11+
from typing_extensions import override
1112

1213
from jobq_server.models import ExecutionMode, SubmissionContext, WorkloadIdentifier
1314
from jobq_server.runner.base import Runner, _make_executor_command
@@ -29,7 +30,11 @@ def __init__(self, k8s: KubernetesService, **kwargs):
2930
self._k8s = k8s
3031

3132
def _create_ray_job(
32-
self, job: Job, image: Image, context: SubmissionContext
33+
self,
34+
job: Job,
35+
image: Image,
36+
context: SubmissionContext,
37+
pull_policy: ImagePullPolicy,
3338
) -> dict:
3439
"""Create a ``RayJob`` Kubernetes resource for the Kuberay operator."""
3540

@@ -79,7 +84,7 @@ def _create_ray_job(
7984
{
8085
"name": "head",
8186
"image": image.tag,
82-
"imagePullPolicy": "IfNotPresent",
87+
"imagePullPolicy": pull_policy.value,
8388
"resources": {
8489
"requests": res_opts.to_kubernetes(
8590
kind=K8sResourceKind.REQUESTS
@@ -101,7 +106,7 @@ def _create_ray_job(
101106
{
102107
"name": "ray-submit",
103108
"image": image.tag,
104-
"imagePullPolicy": "IfNotPresent",
109+
"imagePullPolicy": pull_policy.value,
105110
}
106111
],
107112
},
@@ -111,14 +116,19 @@ def _create_ray_job(
111116

112117
return manifest
113118

119+
@override
114120
def run(
115-
self, job: Job, image: Image, context: SubmissionContext
121+
self,
122+
job: Job,
123+
image: Image,
124+
context: SubmissionContext,
125+
pull_policy: ImagePullPolicy,
116126
) -> WorkloadIdentifier:
117127
logging.info(
118128
f"Submitting RayJob {job.name} to namespace {self._k8s.namespace!r}"
119129
)
120130

121-
manifest = self._create_ray_job(job, image, context)
131+
manifest = self._create_ray_job(job, image, context, pull_policy)
122132
api = client.CustomObjectsApi()
123133
obj = api.create_namespaced_custom_object(
124134
"ray.io", "v1", self._k8s.namespace, "rayjobs", manifest

backend/tests/e2e/test_jobs.py

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def test_job_lifecycle(
4545
),
4646
resources=ResourceOptions(cpu="1", memory="512Mi"),
4747
),
48+
pull_policy="IfNotPresent",
4849
)
4950

5051
# Submit a job for execution

0 commit comments

Comments
 (0)