Source code for ibmcloud_python_sdk.power.task

from ibmcloud_python_sdk.config import params
from ibmcloud_python_sdk.utils.common import query_wrapper as qw
from ibmcloud_python_sdk.utils.common import resource_deleted
from ibmcloud_python_sdk.power import get_power_headers as headers


[docs]class Task(): def __init__(self): self.cfg = params()
[docs] def get_task(self, task): """Retrieve specific task :param task: Task ID :type task: str :return: Task information :rtype: dict """ try: # Connect to api endpoint for tasks path = ("/pcloud/v1/tasks/{}".format(task)) # Return data return qw("power", "GET", path, headers())["data"] except Exception as error: print("Error fetching task {}. {}".format(task, error))
[docs] def delete_task(self, task): """Delete task :param task: Task ID :type task: str :return: Deletion status :rtype: dict """ # Check if task exists and get information task_info = self.get_task(task) if "errors" in task_info: return task_info try: # Connect to api endpoint for tasks path = ("/pcloud/v1/tasks/{}".format(task_info["taskID"])) data = qw("power", "DELETE", path, headers()) # Return data if data["response"].status != 200: return data["data"] # Return status return resource_deleted() except Exception as error: print("Error deleting task {}. {}".format(task, error))