Source code for ibmcloud_python_sdk.iam.policy

import json

from ibmcloud_python_sdk.config import params
from ibmcloud_python_sdk.auth import get_headers as headers
from ibmcloud_python_sdk.resource import resource_instance
from ibmcloud_python_sdk.utils.common import query_wrapper as qw
from ibmcloud_python_sdk.utils.common import resource_deleted


[docs]class Policy(): def __init__(self): self.cfg = params() self.ri = resource_instance.ResourceInstance()
[docs] def get_policies(self, account): """Retrieve policy list per account :param account: Account ID :type account: str :return: List of policies :rtype: list """ try: # Connect to api endpoint for policies path = ("/v1/policies?account_id={}".format(account)) # Return data return qw("auth", "GET", path, headers())["data"] except Exception as error: print("Error fetching policies for account {}. {}".format( account, error))
[docs] def get_policy(self, policy): """Retrieve specific policy :param account: Account ID :type account: str :param policy: Policy ID :type policy: str :return: Policy information :rtype: dict """ try: # Connect to api endpoint for policies path = ("/v1/policies/{}".format(policy)) # Return data return qw("auth", "GET", path, headers())["data"] except Exception as error: print("Error fetching policy {}. {}".format(policy, error))
[docs] def get_authorizations(self, account): """Retrieve authorization policy list per account :param account: Account ID :type account: str :return: List of authorizations :rtype: list """ try: # Connect to api endpoint for policies path = ("/v1/policies?account_id={}&type=authorization".format( account)) # Return data return qw("auth", "GET", path, headers())["data"] except Exception as error: print("Error fetching authorizations for account {}. {}".format( account, error))
[docs] def get_authorization(self, account, policy): """Retrieve specific authorization policy per account :param account: Account ID :type account: str :param policy: Policy ID :type policy: str :return: Authorization policy information :rtype: dict """ try: # Connect to api endpoint for policies path = ("/v1/policies/{}?account_id={}&type=authorization".format( policy, account)) # Return data return qw("auth", "GET", path, headers())["data"] except Exception as error: print("Error fetching authorization policy {} for account {}." " {}".format(policy, account, error))
[docs] def get_accesses(self, account): """Retrieve access policy list per account :param account: Account ID :type account: str :return: List of accesses :rtype: list """ try: # Connect to api endpoint for policies path = ("/v1/policies?account_id={}&type=access".format( account)) # Return data return qw("auth", "GET", path, headers())["data"] except Exception as error: print("Error fetching accesses for account {}. {}".format( account, error))
[docs] def get_access(self, account, policy): """Retrieve specific access policy list per account :param account: Account ID :type account: str :param policy: Policy ID :type policy: str :return: Access policy information :rtype: dict """ try: # Connect to api endpoint for policies path = ("/v1/policies/{}?account_id={}&type=access".format( policy, account)) # Return data return qw("auth", "GET", path, headers())["data"] except Exception as error: print("Error fetching access policy {} for account {}. {}".format( policy, account, error))
[docs] def create_policy(self, **kwargs): """Create policy :param type: The policy type; either 'access' or 'authorization' :type type: str :param subjects: The subject attribute values that must match in order for this policy to apply in a permission decision :typr subjects: dict :param roles: A set of role cloud resource names (CRNs) granted by the policy :type roles: dict :param resources: The attributes of the resource. Note that only one resource is allowed in a policy :type resources: dict :return: Policy response :rtype: dict """ # Build dict of argument and assign default value when needed args = { 'type': kwargs.get('type'), 'subjects': kwargs.get('subjects'), 'roles': kwargs.get('roles'), 'resources': kwargs.get('resources'), } # Construct payload payload = {} for key, value in args.items(): if value is not None: if key == "subjects": ri_info = None for subject in args['subjects']: for attribute in subject["attributes"]: if attribute.get("name") == "serviceInstance": ri_info = self.ri.get_resource_instance( attribute.get("value")) attribute["value"] = ri_info["guid"] payload["subjects"] = args['subjects'] elif key == "roles": ro = [] for role in args["roles"]: tmp_r = {} tmp_r["role_id"] = role ro.append(tmp_r) payload["roles"] = ro elif key == "resources": ri_info = None for resource in args['resources']: for attribute in resource["attributes"]: if attribute.get("name") == "serviceInstance": ri_info = self.ri.get_resource_instance( attribute.get("value")) attribute["value"] = ri_info["guid"] payload["resources"] = args['resources'] else: payload[key] = value try: # Connect to api endpoint for policies path = "/v1/policies" # Return data return qw("auth", "POST", path, headers(), json.dumps(payload))["data"] except Exception as error: print("Error creating policy. {}".format(error))
[docs] def delete_policy(self, policy): """Delete policy :param policy: Policy ID :type policy: str :return: Deletion status :rtype: dict """ # Check if policy exists and get information policy_info = self.get_policy(policy) if "errors" in policy_info: return policy_info try: # Connect to api endpoint for policies path = ("/v1/policies/{}".format(policy_info["id"])) data = qw("auth", "DELETE", path, headers()) # Return data if data["response"].status != 204: return data["data"] # Return status return resource_deleted() except Exception as error: print("Error deleting policy {}. {}".format(policy, error))