Source code for ibmcloud_python_sdk.vpc.acl

import json
from ibmcloud_python_sdk.config import params
from ibmcloud_python_sdk.auth import get_headers as headers
from ibmcloud_python_sdk.utils.common import query_wrapper as qw
from ibmcloud_python_sdk.vpc import vpc
from ibmcloud_python_sdk.utils.common import resource_not_found
from ibmcloud_python_sdk.utils.common import resource_deleted
from ibmcloud_python_sdk.utils.common import check_args
from ibmcloud_python_sdk.resource import resource_group


[docs]class Acl(): def __init__(self): self.cfg = params() self.vpc = vpc.Vpc() self.rg = resource_group.ResourceGroup()
[docs] def get_network_acls(self): """Retrieve network ACL list :return: List of network ACLs :rtype: list """ try: # Connect to api endpoint for network_acls path = ("/v1/network_acls?version={}&generation={}".format( self.cfg["version"], self.cfg["generation"])) # Return data return qw("iaas", "GET", path, headers())["data"] except Exception as error: print("Error fetching network ACLs. {}".format(error)) raise
[docs] def get_network_acl(self, acl): """Retrieve specific network ACL :param acl: Network ACL name or ID :type acl: str :return: Network ACL information :rtype: dict """ by_name = self.get_network_acl_by_name(acl) if "errors" in by_name: for key_name in by_name["errors"]: if key_name["code"] == "not_found": by_id = self.get_network_acl_by_id(acl) if "errors" in by_id: return by_id return by_id else: return by_name else: return by_name
[docs] def get_network_acl_by_id(self, id): """Retrieve specific network ACL by ID :param id: Network ACL ID :type id: str :return: Network ACL information :rtype: dict """ try: # Connect to api endpoint for network_acls path = ("/v1/network_acls/{}?version={}&generation={}".format( id, self.cfg["version"], self.cfg["generation"])) # Return data return qw("iaas", "GET", path, headers())["data"] except Exception as error: print("Error fetching network ACL with ID {}. {}".format( id, error)) raise
[docs] def get_network_acl_by_name(self, name): """Retrieve specific network ACL by name :param name: Network ACL name :type name: str :return: Network ACL information :rtype: dict """ try: # Retrieve network ACLs data = self.get_network_acls() if "errors" in data: return data # Loop over network ACLs until filter match for acl in data['network_acls']: if acl["name"] == name: # Return data return acl # Return error if no network ACL is found return resource_not_found() except Exception as error: print("Error fetching network ACL with name {}. {}".format( name, error)) raise
[docs] def get_network_acl_rules(self, acl): """Retrieve rules for a specific network ACL :param acl: Network ACL :type acl: str :return: Network ACL rules list :rtype: list """ by_name = self.get_network_acl_rules_by_name(acl) if "errors" in by_name: for key_name in by_name["errors"]: if key_name["code"] == "not_found": by_id = self.get_network_acl_rules_by_id(acl) if "errors" in by_id: return by_id return by_id else: return by_name else: return by_name
[docs] def get_network_acl_rules_by_id(self, id): """Retrieve rules for a specific network ACL by ID :param id: Network ACL ID :type id: str :return: Network ACL rules list :rtype: dict """ try: # Connect to api endpoint for network_acls path = ("/v1/network_acls/{}/rules?version={}" "&generation={}".format(id, self.cfg["version"], self.cfg["generation"])) # Return data return qw("iaas", "GET", path, headers())["data"] except Exception as error: print("Error fetching rules for network ACL with ID" " {}. {}".format(id, error)) raise
[docs] def get_network_acl_rules_by_name(self, name): """Retrieve rules for a specific network ACL by name :param name: Network ACL name :type name: str :return: Network ACL rules list :rtype: list """ # Retrieve network ACL information to get the ID # (mostly useful if a name is provided) acl_info = self.get_network_acl(name) if "errors" in acl_info: return acl_info try: # Connect to api endpoint for network_acls path = ("/v1/network_acls/{}/rules/?version={}" "&generation={}".format(acl_info["id"], self.cfg["version"], self.cfg["generation"])) # Return data return qw("iaas", "GET", path, headers())["data"] except Exception as error: print("Error fetching network ACL with name {}. {}".format( name, error)) raise
# Get specific network ACL's rule
[docs] def get_network_acl_rule(self, acl, rule): """Retrieve specific rule for a specific network ACL :param acl: Network ACL name or ID :type acl: str :param rule: Rule name or ID :type rule: str :return: Network ACL rule information :rtype: dict """ # Retrieve network ACL to get the ID # (mostly useful if a name is provided) acl_info = self.get_network_acl(acl) if "errors" in acl_info: return acl_info by_name = self.get_network_acl_rule_by_name(acl, rule) if "errors" in by_name: for key_name in by_name["errors"]: if key_name["code"] == "not_found": by_id = self.get_network_acl_rule_by_id(acl, rule) if "errors" in by_id: return by_id return by_id else: return by_name else: return by_name
[docs] def get_network_acl_rule_by_id(self, acl, id): """Retrieve specific rule for a specific network ACL by ID :param acl: Network ACL name or ID :type acl: str :param id: Rule ID :type id: str :return: Network ACL rule information :rtype: dict """ # Retrieve network ACL to get the ID # (mostly useful if a name is provided) acl_info = self.get_network_acl(acl) if "errors" in acl_info: return acl_info try: # Connect to api endpoint for network_acls path = ("/v1/network_acls/{}/rules/{}?version={}" "&generation={}".format(acl_info["id"], id, self.cfg["version"], self.cfg["generation"])) # Return data return qw("iaas", "GET", path, headers())["data"] except Exception as error: print("Error fetching rule with ID {} for network ACL" "with ID {}. {}".format(id, acl_info["id"], error)) raise
[docs] def get_network_acl_rule_by_name(self, acl, name): """Retrieve specific rule for a specific network ACL by name :param acl: Network ACL name or ID :type acl: str :param name: Rule name :type name: str :return: Network ACL rule information :rtype: dict """ # Retrieve network ACL to get the ID # (mostly useful if a name is provided) acl_info = self.get_network_acl(acl) if "errors" in acl_info: return acl_info try: # Retrieve network ACL rules data = self.get_network_acl_rules() if "errors" in data: return data # Loop over network ACL rules until filter match for rule in data['rules']: if rule["name"] == name: # Return data return rule # Return error if no network ACL is found return resource_not_found() except Exception as error: print("Error fetching rule with name {} for network ACL" "with ID {}. {}".format(name, acl_info["id"], error)) raise
[docs] def create_network_acl(self, **kwargs): """Create network ACL :param name: The unique user-defined name for this network ACL :type name: str, optional :param resource_group: The resource group to use :type resource_group: str, optional :param vpc: The VPC the network ACL is to be a part of :type vpc: str :param rules: Array of prototype objects for rules to create along with this network ACL :type rules: list, optional :param source_network_acl: Network ACL to copy rules from :type source_network_acl: str, optional """ args = ["vpc"] check_args(args, **kwargs) # Build dict of argument and assign default value when needed args = { 'name': kwargs.get('name'), 'resource_group': kwargs.get('resource_group'), 'vpc': kwargs.get('vpc'), 'rules': kwargs.get('rules'), 'source_network_acl': kwargs.get('source_network_acl'), } # Construct payload payload = {} for key, value in args.items(): if value is not None: if key == "vpc": vpc_info = self.vpc.get_vpc(args["vpc"]) if "errors" in vpc_info: return vpc_info payload["vpc"] = {"id": vpc_info["id"]} elif key == "resource_group": rg_info = self.rg.get_resource_group( args["resource_group"]) if "errors" in rg_info: return rg_info payload["resource_group"] = {"id": rg_info["id"]} elif key == "rules": rs = [] for acl_rules in args["rules"]: rs.append(acl_rules) payload["rules"] = rs elif key == "source_network_acl": payload["source_network_acl"] = { "id": args["source_network_acl"]} else: payload[key] = value try: # Connect to api endpoint for network_acls path = ("/v1/network_acls?version={}&generation={}".format( self.cfg["version"], self.cfg["generation"])) # Return data return qw("iaas", "POST", path, headers(), json.dumps(payload))["data"] except Exception as error: print("Error creating network ACL. {}".format(error)) raise
[docs] def create_network_acl_rule(self, **kwargs): """Create network ACL rule :param name: The unique user-defined name for this network ACL :type name: str, optional :param resource_group: The resource group to use :type resource_group: str, optional :param vpc: The VPC the network ACL is to be a part of :type vpc: str :param rules: Array of prototype objects for rules to create along with this network ACL :type rules: list, optional """ args = ["acl", "action", "destination", "direction", "source"] check_args(args, **kwargs) # Build dict of argument and assign default value when needed args = { "acl": kwargs.get('acl'), 'name': kwargs.get('name'), 'action': kwargs.get('action'), 'destination': kwargs.get('destination'), 'direction': kwargs.get('direction'), 'source': kwargs.get('source'), 'before': kwargs.get('before'), 'protocol': kwargs.get('protocol'), 'destination_port_max': kwargs.get('destination_port_max'), 'destination_port_min': kwargs.get('destination_port_min'), 'source_port_max': kwargs.get('source_port_max'), 'source_port_min': kwargs.get('source_port_min'), } # Construct payload payload = {} for key, value in args.items(): # acl argument should not be in the payload if key != "acl" and value is not None: if key == "before": rg_info = self.rg.get_resource_group( args["resource_group"]) payload["resource_group"] = {"id": rg_info["id"]} else: payload[key] = value # Retrieve network ACL information to get the ID # (mostly useful if a name is provided) acl_info = self.get_network_acl(args["acl"]) if "errors" in acl_info: return acl_info try: # Connect to api endpoint for network_acls path = ("/v1/network_acls/{}/rules?version={}" "&generation={}".format(acl_info["id"], self.cfg["version"], self.cfg["generation"])) # Return data return qw("iaas", "POST", path, headers(), json.dumps(payload))["data"] except Exception as error: print("Error creating network ACL rule. {}".format(error)) raise
[docs] def delete_network_acl(self, acl): """Delete network ACL :param acl: Network ACL name or ID :type acl: str :return: Delete status :rtype: dict """ try: # Check if network ACL exists acl_info = self.get_network_acl(acl) if "errors" in acl_info: return acl_info # Connect to api endpoint for network_acls path = ("/v1/network_acls/{}?version={}&generation={}".format( acl_info["id"], self.cfg["version"], self.cfg["generation"])) data = qw("iaas", "DELETE", path, headers()) # Return data if data["response"].status != 204: return data["data"] # Return status return resource_deleted() except Exception as error: print("Error deleting network ACL with {}. {}".format(acl, error)) raise
[docs] def delete_network_acl_rule(self, acl, rule): """Delete network ACL rule :param acl: Network ACL name or ID :type acl: str :param rule: Rule name or ID :type rule: str :return: Delete status :rtype: dict """ try: # Check if network ACL and network ACL rule exist acl_info = self.get_network_acl(acl) if "errors" in acl_info: return acl_info rule_info = self.get_network_acl_rule(acl_info["id"], rule) if "errors" in rule_info: return rule_info # Connect to api endpoint for network_acls path = ("/v1/network_acls/{}/rules/{}?version={}" "&generation={}".format(acl_info["id"], rule_info["id"], self.cfg["version"], self.cfg["generation"])) data = qw("iaas", "DELETE", path, headers()) # Return data if data["response"].status != 204: return data["data"] # Return status return resource_deleted() except Exception as error: print("Error deleting network ACL rule {} for network" "ACL {}. {}".format(rule, acl, error)) raise