import kubernetes
from kubernetes import client, config, utils
from kubernetes.client import V1VolumeMount, V1Volume, V1ConfigMapVolumeSource,
V1KeyToPath, V1ObjectMeta
def kube_create_job_object(name, container_image, namespace="default",
container_name="jobcontainer", env_vars={}):
"""
Create a k8 Job Object
Minimum definition of a job object:
{'api_version': None, - Str
'kind': None, - Str
'metadata': None, - Metada Object
'spec': None, -V1JobSpec
'status': None} - V1Job Status
Docs: https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Job.md
Docs2: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/#writing-a-job-spec
Also docs are pretty pretty bad. Best way is to ´pip install kubernetes´ and go via the autogenerated code
And figure out the chain of objects that you need to hold a final valid object So for a job object you need:
V1Job -> V1ObjectMeta
-> V1JobStatus
-> V1JobSpec -> V1PodTemplate -> V1PodTemplateSpec -> V1Container
Now the tricky part, is that V1Job.spec needs a .template, but not a PodTemplateSpec, as such
you need to build a PodTemplate, add a template field (template.template) and make sure
template.template.spec is now the PodSpec.
Then, the V1Job.spec needs to be a JobSpec which has a template the template.template field of the PodTemplate.
Failure to do so will trigger an API error.
Also Containers must be a list!
Docs3: https://github.com/kubernetes-client/python/issues/589
"""
# Body is the object Body
body = client.V1Job(api_version="batch/v1", kind="Job")
# Body needs Metadata
# Attention: Each JOB must have a different name!
body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
# And a Status
body.status = client.V1JobStatus()
# Now we start with the Template...
template = client.V1PodTemplate()
template.template = client.V1PodTemplateSpec()
# Passing Arguments in Env:
env_list = []
for env_name, env_value in env_vars.items():
env_list.append(client.V1EnvVar(name=env_name, value=env_value))
container = client.V1Container(name=container_name, image=container_image,
env=env_list)
container.command = ['python3', '/usr/src/app/test.py']
container.image_pull_policy = "IfNotPresent"
volume_mount = V1VolumeMount(name="config-volume", mount_path='/etc/config')
container.volume_mounts = [volume_mount]
config_map = V1ConfigMapVolumeSource(name='configmap-test')
volumes = V1Volume(name="config-volume", config_map=config_map)
template.template.spec = client.V1PodSpec(containers=[container],
restart_policy='Never',
volumes=[volumes])
# And finaly we can create our V1JobSpec!
body.spec = client.V1JobSpec(ttl_seconds_after_finished=600,
template=template.template)
return body
def kube_create_configmap_object(name):
data = {'path.json': "{'test':1}"}
print(type(data))
metadata = V1ObjectMeta(name=name)
body = kubernetes.client.V1ConfigMap(data=data, metadata=metadata)
return body
if __name__ == '__main__':
# result = kube_create_job_object(name='kube-job',
# container_image="job-test:v0.1",
# container_name="kube-job")
config.load_kube_config(config_file='config')
configuration = kubernetes.client.Configuration()
# api_instance = kubernetes.client.BatchV1Api(
# kubernetes.client.ApiClient(configuration))
# api_response = api_instance.create_namespaced_job("default", result,
# pretty=True)
api_instance = kubernetes.client.CoreV1Api(
kubernetes.client.ApiClient(configuration))
body = kube_create_configmap_object(name="config-map-python-1")
api_response_config_map = api_instance.create_namespaced_config_map(
namespace='default', body=body, pretty=True)
print(api_response_config_map)
```
### Watch
try:
config.load_kube_config(config_file='config')
core_v1 = client.CoreV1Api()
core_w = watch.Watch()
for event in core_w.stream(core_v1.list_namespaced_pod,
namespace='default'):
print(event)
print(
"Event: %s %s" % (event['type'], event['object'].metadata.name))
except Exception as ex:
print(ex)
finally:
core_w.stop()
try:
config.load_kube_config(config_file='config')
batch_v1 = client.BatchV1Api()
batch_w = watch.Watch()
for event in batch_w.stream(batch_v1.list_namespaced_job,
namespace='default'):
# print(event)
print(
"Event: %s %s" % (event['type'], event['object'].metadata.name))
except Exception as ex:
print(ex)
finally:
batch_w.stop()