Today I've been working on Shortlist, a project I introduced in my previous post. Shortlist is going to be an AI-driven profile selection tool, suited to helping house hunters narrow down real estate ads.
I've been working on three simple Go programs and deploying them to a lightweight local Kubernetes cluster I'm running using Kind (making heavy use of kind load docker-image
). One of these programs creates Kubernetes jobs and the other two run within the job's pods.
The Code
In this article, I'll be stepping through the code that talks to Kubernetes' API server and dynamically creates jobs. The full source code including a simple HTTP API can be found here.
Imports
The code required to interact with the k8s API and specify objects is spread between several packages. The kubernetes
and rest
packed provides the means to get an authenticated client struct. corev1
and batchv1
contain the struct definitions for the core and batch API groups. The remaining package metav1
provides shared structs.
import (
"context"
"os"
"github.com/google/uuid"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
schemav1 "github.com/simoncrowe/shortlist-schema/lib/v1"
)
Creating A Client
Getting a pointer to a Clientset
is quite straightforward.
func createClientset() (*kubernetes.Clientset, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
cs, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return cs, nil
}
Besides this short function, all that's required to talk to the k8s API server is a Role
, RoleBinding
and ServiceAccount
associates with the pod running the code.
Creating a Job
The MVP version of this code isn't pretty but it works. Having to write out k8s object specs in a verbose and imperative fashion gave me a newfound appreciation for the Yaml manifests.
Setup
func CreateJob(ctx context.Context, profile schemav1.Profile) (string, error) {
cs, err := createClientset()
if err != nil {
return "", err
}
id := uuid.New()
jobName := id.String()
This is straightforward. In its MVP form, the id
is simply a UUID, which is returned by the HTTP API and used to name the ConfigMap and Job objects.
Creating the Profile ConfigMap
profileData, err := schemav1.EncodeProfileJSON(profile)
if err != nil {
return "", err
}
cmCfg := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: jobName},
Data: map[string]string{"data.json": profileData},
}
cmOpts := metav1.CreateOptions{}
cm, err := cs.CoreV1().ConfigMaps("shortlist").Create(ctx, &cmCfg, cmOpts)
if err != nil {
return "", err
}
profileVol := corev1.Volume{
Name: "profile",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: cm.ObjectMeta.Name,
},
},
},
}
profileVolMnt := corev1.VolumeMount{
Name: "profile",
MountPath: "/etc/shortlist/profile",
}
This code creates a ConfigMap object with the JSON-encoded profile data. It sets up the Volume
and VolumeMount
structs to be used in the Pod
and Container
specs later.
I've considered storing the profiles in a dedicated database. However, profiles are likely to contain a few paragraphs of text. Kubernetes' key-value store etcd
's 1MB size limit for keys shouldn't be an issue. The only downside is that I'll need to run a cron job to delete leftover ConfigMaps.
Creating the Job
The remaining 63 lines build the structs that make up the Job spec. This job spawns pods comprising two containers, an assessor that decides whether the profile is suitable and a relay process that watches for output from the assessor and passes it on to a notifier service.
I'm not sure whether I'll stick with this separation
The Results Volume
resultVol := corev1.Volume{
Name: "assessor-result",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
}
resultVolMnt := corev1.VolumeMount{
Name: "assessor-result",
MountPath: "/etc/shortlist/assessor-result",
}
This volume is mounted by both the relay and assessor. It allows the assessor to communicate its assessment to the relay.
The Assessor Container
assessorCfg := []corev1.EnvVar{
corev1.EnvVar{
Name: "PROFILE_PATH",
Value: "/etc/shortlist/profile/data.json",
},
corev1.EnvVar{
Name: "RESULT_PATH",
Value: "/etc/shortlist/assessor-result/data.json",
},
}
assessor := corev1.Container{
Name: "assessor",
Image: os.Getenv("ASSESSOR_IMAGE"),
Env: assessorCfg,
VolumeMounts: []corev1.VolumeMount{profileVolMnt, resultVolMnt},
}
Eventually, the assessor container will be the brains of Shortlist. It might interpolate the profile text into an LLM prompt and decide based on the output whether to accept the profile. It could run captioning over some images, and then interpolate those captions into an LLM prompt.
The Relay Container
relayCfg := corev1.EnvFromSource{
ConfigMapRef: &corev1.ConfigMapEnvSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: os.Getenv("RELAY_CONFIGMAP_NAME"),
},
},
}
relay := corev1.Container{
Name: "relay",
Image: os.Getenv("RELAY_IMAGE"),
EnvFrom: []corev1.EnvFromSource{relayCfg},
VolumeMounts: []corev1.VolumeMount{resultVolMnt},
}
Right now this is a loop that polls for the assessor's output and, if it's positive, passes it on to a notifier. In my testing so far, I've been using an NGINX helm chart as a dummy notifier service because it listens for HTTP requests.
Putting the Job Together
pod := corev1.PodSpec{
Containers: []corev1.Container{assessor, relay},
Volumes: []corev1.Volume{profileVol, resultVol},
ServiceAccountName: "runner",
RestartPolicy: "Never",
}
jobTemplate := corev1.PodTemplateSpec{Spec: pod}
jobSpec := batchv1.JobSpec{
Template: jobTemplate,
}
jobCfg := batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: jobName},
Spec: jobSpec,
}
jobOpts := metav1.CreateOptions{}
job, err := cs.BatchV1().Jobs("shortlist").Create(ctx, &jobCfg, jobOpts)
if err != nil {
return "", err
}
return job.ObjectMeta.Name, nil
}
This Adds the containers and jobs to the pod specification and creates the job in the shortlist
namespace. The job name is returned, to be included in the API response.
Conclusion
The code I've shared here is a bit rough around the edges. There's still some ambiguity about what this system will do. Things are likely to change and hopefully improve.
Even so, the pattern of creating Kubernetes objects from an infrastructural service seems useful. Creating jobs in this way removes the need for a queue or message broker to store submitted jobs.