Source code for ibmcloud_python_sdk.iam.role

import json

from ibmcloud_python_sdk.config import params
from ibmcloud_python_sdk.auth import get_headers as headers
from ibmcloud_python_sdk.resource import resource_instance
from ibmcloud_python_sdk.utils.common import query_wrapper as qw
from ibmcloud_python_sdk.utils.common import resource_not_found
from ibmcloud_python_sdk.utils.common import resource_deleted


[docs]class Role(): def __init__(self): self.cfg = params() self.ri = resource_instance.ResourceInstance()
[docs] def get_system_roles(self, account): """Retrieve system role list per account :param account: Account ID :type account: str :return: List of system roles :rtype: list """ try: # Connect to api endpoint for roles path = ("/v2/roles?account_id={}".format(account)) # Return data return qw("auth", "GET", path, headers())["data"] except Exception as error: print("Error fetching system roles for account {}. {}".format( account, error))
[docs] def get_system_role(self, account, role): """Retrieve specific system role by name or by ID :param account: Account ID :type account: str :param role: Role name or ID :type role: str :return: System role information :rtype: dict """ by_name = self.get_system_role_by_name(account, role) if "errors" in by_name: for key_name in by_name["errors"]: if key_name["code"] == "role_not_found": by_id = self.get_system_role_by_id(account, role) if "errors" in by_id: return by_id return by_id else: return by_name else: return by_name
[docs] def get_system_role_by_id(self, account, id): """Retrieve specific system role by ID :param account: Account ID :type account: str :param id: Role ID :type id: str :return: System role information :rtype: dict """ try: # Connect to api endpoint for roles path = ("/v1/roles/{}?account_id={}".format(id, account)) # Return data return qw("iaas", "GET", path, headers())["data"] except Exception as error: print("Error fetching system role with ID {}. {}".format( id, error))
[docs] def get_system_role_by_name(self, account, name): """Retrieve specific system role by name :param account: Account ID :type account: str :param name: Role name :type name: str :return: System role information :rtype: dict """ try: # Retrieve roles data = self.get_system_roles(account) if "errors" in data: return data # Loop over system roles until filter match for role in data['system_roles']: if ( role.get("display_name") == name or role.get("name", "") == name ): # Return data return role # Return error if no system role is found return resource_not_found() except Exception as error: print("Error fetching system role with name {}. {}".format( name, error))
[docs] def get_service_roles(self, account, service): """Retrieve service role list per account :param account: Account ID :type account: str :param service: Service name :type service: str :return: List of service roles :rtype: list """ try: # Connect to api endpoint for roles path = ("/v2/roles?account_id={}&format=display" "&service_name={}".format(account, service)) # Return data return qw("auth", "GET", path, headers())["data"] except Exception as error: print("Error fetching roles from service {} for account {}." " {}".format(service, account, error))
[docs] def get_service_role(self, account, service, role): """Retrieve specific system role by name or by ID :param account: Account ID :type account: str :param service: Service name :type service: str :param role: Role name or ID :type role: str :return: Service role information :rtype: dict """ by_name = self.get_service_role_by_name(account, service, role) if "errors" in by_name: for key_name in by_name["errors"]: if key_name["code"] == "role_not_found": by_id = self.get_service_role_by_id(account, service, role) if "errors" in by_id: return by_id return by_id else: return by_name else: return by_name
[docs] def get_service_role_by_id(self, account, service, id): """Retrieve specific service role by ID :param account: Account ID :type account: str :param service: Service name :type service: str :param id: Role ID :type id: str :return: Service role information :rtype: dict """ try: # Connect to api endpoint for roles path = ("/v2/roles/{}?account_id={}&format=display" "&service_name={}".format(id, account, service)) # Return data return qw("iaas", "GET", path, headers())["data"] except Exception as error: print("Error fetching role from service {} with ID {}. {}".format( service, id, error))
[docs] def get_service_role_by_name(self, account, service, name): """Retrieve specific service role by name :param account: Account ID :type account: str :param service: Service name :type service: str :param name: Role name :type name: str :return: Service role information :rtype: dict """ try: # Retrieve service roles data = self.get_service_roles(account, service) if "errors" in data: return data # Loop over service roles until filter match for role in data['service_roles']: if ( role.get("display_name") == name or role.get("name", "") == name ): # Return data return role # Return error if no service role is found return resource_not_found() except Exception as error: print("Error fetching role from service {} with name {}." " {}".format(service, name, error))
[docs] def create_role(self, **kwargs): """Create role :param name: The name of the role :type name: str :param account_id: The account GUID :type account_id: str :param service_name: The service name :type service_name: str :param display_name: The display name of the role :type display_name: str :param actions: The actions of the role :type actions: list :param description: The description of the role :type description: str :return: Rolle creation response :rtype: dict """ # Build dict of argument and assign default value when needed args = { 'name': kwargs.get('name'), 'account_id': kwargs.get('account_id'), 'service_name': kwargs.get('service_name'), 'display_name': kwargs.get('display_name'), 'actions': kwargs.get('actions'), 'description': kwargs.get('description'), } # Construct payload payload = {} for key, value in args.items(): if value is not None: if key == "actions": ac = [] for action in args["actions"]: ac.append(action) payload["actions"] = ac else: payload[key] = value try: # Connect to api endpoint for roles path = "/v2/roles" # Return data return qw("auth", "POST", path, headers(), json.dumps(payload))["data"] except Exception as error: print("Error creating role. {}".format(error))
[docs] def delete_role(self, role): """Delete role :param role: Role name or ID :type role: str :return: Deletion status :rtype: dict """ # Check if role exists and get information role_info = self.get_role(role) if "errors" in role_info: return role_info try: # Connect to api endpoint for roles path = ("/v1/roles/{}".format(role_info["id"])) data = qw("auth", "DELETE", path, headers()) # Return data if data["response"].status != 204: return data["data"] # Return status return resource_deleted() except Exception as error: print("Error deleting polirolecy {}. {}".format(role, error))