import os import json import time import uuid import kubernetes.client from kubernetes.client.rest import ApiException MINIKUBE_URI = 'https://127.0.0.1:8443' MINIKUBE_HOME = os.environ.get('MINIKUBE_HOME', os.environ['HOME']) # TODO: create a default pod spec of BDB BDB_POD_SPEC = {} class Node(): def __init__(self, namespace='itest-setup', name=None, spec=BDB_POD_SPEC): config = kubernetes.client.Configuration() config.host = MINIKUBE_URI config.key_file = os.path.join(MINIKUBE_HOME, '.minikube/client.key') config.cert_file = os.path.join(MINIKUBE_HOME, '.minikube/client.crt') config.verify_ssl = False self.api_instance = kubernetes.client.CoreV1Api(kubernetes.client.ApiClient(config)) self.namespace = namespace self.name = name or uuid.uuid4().hex pod = kubernetes.client.V1Pod() pod.version = 'v1' pod.kind = 'Pod' pod.metadata = {"name": self.name} pod.spec = {"containers": [{"name": "myapp-container", "image": "busybox", "command": ["sh", "-c", "echo Hello Kubernetes! && sleep 3600"]}]} self.pod = pod def start(self, return_after_running=True): """ Start node.""" try: self.api_instance.create_namespaced_pod(self.namespace, self.pod) if return_after_running: for i in range(1, 20): if self.is_running: break time.sleep(1) except ApiException as e: raise e def stop(self, return_after_stopping=True): """ Stop node.""" body = kubernetes.client.V1DeleteOptions() body.api_version = 'v1' body.grace_period_seconds = 0 self.api_instance.delete_namespaced_pod(self.name, self.namespace, body) if return_after_stopping: for i in range(1, 20): if not self.is_running: break time.sleep(1) @property def is_running(self): """Get the current status of node""" try: resp = self.api_instance.read_namespaced_pod(self.name, self.namespace, exact=True) if resp.status.phase == 'Running': return True else: return False except ApiException as e: return False @property def uri(self): if self.is_running: resp = self.api_instance.read_namespaced_pod(self.name, self.namespace, exact=True) return resp.status.pod_ip else: return False def _create_namespace(self, namespace): namespace_spec = kubernetes.client.V1Namespace() namespace_spec.api_version = 'v1' namespace_spec.metadata = {'name': namespace} try: self.api_instance.create_namespace(namespace_spec) return True except ApiException as e: resp_body = json.loads(e.body) if resp_body.reason == 'AlreadyExists': return True else: raise e