Source code for ibmcloud_python_sdk.vpc.subnet

import json
from ibmcloud_python_sdk.config import params
from ibmcloud_python_sdk.auth import get_headers as headers
from ibmcloud_python_sdk.utils.common import query_wrapper as qw
from ibmcloud_python_sdk.vpc import gateway as gw
from ibmcloud_python_sdk.vpc import vpc
from ibmcloud_python_sdk.vpc import acl
from ibmcloud_python_sdk.utils.common import resource_not_found
from ibmcloud_python_sdk.utils.common import resource_deleted
from ibmcloud_python_sdk.utils.common import check_args
from ibmcloud_python_sdk.resource import resource_group


[docs]class Subnet(): def __init__(self): self.cfg = params() self.vpc = vpc.Vpc() self.gateway = gw.Gateway() self.acl = acl.Acl() self.rg = resource_group.ResourceGroup()
[docs] def get_subnets(self): """Retrieve subnet list :return: List of subnets :rtype: list """ try: # Connect to api endpoint for subnets path = ("/v1/subnets?version={}&generation={}".format( self.cfg["version"], self.cfg["generation"])) # Return data return qw("iaas", "GET", path, headers())["data"] except Exception as error: print("Error fetching subnets. {}".format(error)) raise
[docs] def get_subnet(self, subnet): """Retrieve specific subnet :param subnet: Subnet name or ID :type subnet: str :return: Subnet information :rtype: dict """ by_name = self.get_subnet_by_name(subnet) if "errors" in by_name: for key_name in by_name["errors"]: if key_name["code"] == "not_found": by_id = self.get_subnet_by_id(subnet) if "errors" in by_id: return by_id return by_id else: return by_name else: return by_name
[docs] def get_subnet_by_id(self, id): """Retrieve specific subnet by ID :param id: Subnet ID :type id: str :return: Subnet information :rtype: dict """ try: # Connect to api endpoint for subnets path = ("/v1/subnets/{}?version={}&generation={}".format( id, self.cfg["version"], self.cfg["generation"])) # Return data return qw("iaas", "GET", path, headers())["data"] except Exception as error: print("Error fetching subnet with ID {}. {}".format(id, error)) raise
[docs] def get_subnet_by_name(self, name): """Retrieve specific subnet by name :param name: Subnet name :type name: str :return: Subnet information :rtype: dict """ try: # Retrieve subnets data = self.get_subnets() if "errors" in data: return data # Loop over subnets until filter match for subnet in data["subnets"]: if subnet["name"] == name: # Return data return subnet # Return error if no subnet is found return resource_not_found() except Exception as error: print("Error fetching subnet with name {}. {}".format(name, error)) raise
[docs] def get_subnet_network_acl(self, subnet): """Retrieve network ACL for a specific subnet :param subnet: Subnet name or ID :type subnet: str :return: Network ACL information :rtype: dict """ try: # Check if subnet exists and get information subnet_info = self.get_subnet(subnet) if "errors" in subnet_info: return subnet_info # Connect to api endpoint for subnets path = ("/v1/subnets/{}/network_acl?version={}" "&generation={}".format(subnet_info["id"], self.cfg["version"], self.cfg["generation"])) return qw("iaas", "GET", path, headers())["data"] except Exception as error: print("Error fetching network ACL for subnet {}. {}".format( subnet, error)) raise
[docs] def get_subnet_public_gateway(self, subnet): """Retrieve public gateway for a specific subnet :param subnet: Subnet name or ID :type subnet: str :return: Public gateway information :rtype: dict """ try: # Check if subnet exists and get information subnet_info = self.get_subnet(subnet) if "errors" in subnet_info: return subnet_info # Connect to api endpoint for subnets path = ("/v1/subnets/{}/public_gateway?version={}" "&generation={}".format(subnet_info["id"], self.cfg["version"], self.cfg["generation"])) return qw("iaas", "GET", path, headers())["data"] except Exception as error: print("Error fetching public gateway for subnet {}. {}".format( subnet, error)) raise
[docs] def create_subnet(self, **kwargs): """Create subnet :param name: The unique user-defined name for this subnet :type name: str, optional :param resource_group: The resource group to use :type resource_group: str, optional :param ipv4_cidr_block: The IPv4 range of the subnet, expressed in CIDR format :type ipv4_cidr_block: str :param vpc: The VPC the subnet is to be a part of :type vpc: str :param zone: The zone the subnet is to reside in :type zone: str :param ip_version: The IP version(s) supported :type ip_version: str, optional :param network_acl: The network ACL to use for this subnet :type network_acl: str, optional :param public_gateway: The public gateway to handle internet bound traffic for this subnet :type public_gateway: str, optional :param routing_table: The routing table for this subnet :type routing_table: str, optional :param total_ipv4_address_count: The total number of IPv4 addresses required :type total_ipv4_address_count: int, optional """ args = ["vpc"] check_args(args, **kwargs) # Build dict of argument and assign default value when needed args = { 'name': kwargs.get('name'), 'resource_group': kwargs.get('resource_group'), 'ipv4_cidr_block': kwargs.get('ipv4_cidr_block'), 'vpc': kwargs.get('vpc'), 'zone': kwargs.get('zone'), 'ip_version': kwargs.get('ip_version', 'ipv4'), 'network_acl': kwargs.get('network_acl'), 'public_gateway': kwargs.get('public_gateway'), 'routing_table': kwargs.get('routing_table'), 'total_ipv4_address_count': kwargs.get('total_ipv4_address_count'), } # Construct payload payload = {} for key, value in args.items(): if value is not None: if key == "resource_group": rg_info = self.rg.get_resource_group( args["resource_group"]) if "errors" in rg_info: return rg_info payload["resource_group"] = {"id": rg_info["id"]} elif key == "network_acl": acl_info = self.get_subnet_network_acl(args["network_acl"]) if "errors" in acl_info: return acl_info payload["network_acl"] = {"id": acl_info["id"]} elif key == "public_gateway": gateway_info = self.get_subnet_public_gateway( args["public_gateway"]) if "errors" in gateway_info: return gateway_info payload["public_gateway"] = {"id": gateway_info["id"]} elif key == "routing_table": payload["routing_table"] = {"id": args["routing_table"]} elif key == "vpc": vpc_info = self.vpc.get_vpc(args["vpc"]) if "errors" in vpc_info: return vpc_info payload["vpc"] = {"id": vpc_info["id"]} elif key == "zone": payload["zone"] = {"name": args["zone"]} else: payload[key] = value try: # Connect to api endpoint for subnets path = ("/v1/subnets?version={}&generation={}".format( self.cfg["version"], self.cfg["generation"])) # Return data return qw("iaas", "POST", path, headers(), json.dumps(payload))["data"] except Exception as error: print("Error creating subnet. {}".format(error)) raise
# Attach network ACL to subnet
[docs] def attach_network_acl(self, **kwargs): """Attach network ACL to a subnet :param subnet: Subnet name or ID :type subnet: str :param network_acl: Network ACL name or ID :type network_acl: str """ args = ["subnet", "network_acl"] check_args(args, **kwargs) # Build dict of argument and assign default value when needed args = { 'subnet': kwargs.get('subnet'), 'network_acl': kwargs.get('network_acl'), } # Retrieve subnet and network ACL information to get the ID # (mostly useful if a name is provided) subnet_info = self.get_subnet(args["subnet"]) if "errors" in subnet_info: return subnet_info acl_info = self.acl.get_network_acl(args["network_acl"]) if "errors" in acl_info: return acl_info # Construct payload payload = {} payload["id"] = acl_info["id"] try: # Connect to api endpoint for subnets path = ("/v1/subnets/{}/network_acl?version={}" "&generation={}".format(subnet_info["id"], self.cfg["version"], self.cfg["generation"])) # Return data return qw("iaas", "PUT", path, headers(), json.dumps(payload))["data"] except Exception as error: print("Error attaching network ACL {} to subnet" "{}. {}".format(args["network_acl"], args["subnet"], error)) raise
[docs] def attach_public_gateway(self, **kwargs): """Attach public gateway to a subnet :param subnet: Subnet name or ID :type subnet: str :param public_gateway: Public gateway name or ID :type public_gateway: str """ args = ["subnet", "public_gateway"] check_args(args, **kwargs) # Build dict of argument and assign default value when needed args = { 'subnet': kwargs.get('subnet'), 'public_gateway': kwargs.get('public_gateway'), } # Retrieve subnet and public gateway information to get the ID # (mostly useful if a name is provided) subnet_info = self.get_subnet(args["subnet"]) gateway_info = self.gateway.get_public_gateway(args["public_gateway"]) if "errors" in subnet_info: return subnet_info elif "errors" in gateway_info: return gateway_info # Construct payload payload = {} payload["id"] = gateway_info["id"] try: # Connect to api endpoint for subnets path = ("/v1/subnets/{}/public_gateway?version={}" "&generation={}".format(subnet_info["id"], self.cfg["version"], self.cfg["generation"])) # Return data return qw("iaas", "PUT", path, headers(), json.dumps(payload))["data"] except Exception as error: print("Error attaching public gateway {} to subnet {}. {}".format( args["public_gateway"], args["subnet"], error)) raise
[docs] def detach_public_gateway(self, subnet): """Detach public gateway from a subnet :param subnet: Subnet name or ID :type subnet: str :return: Detach status :rtype: resource_deleted() """ # Retrieve subnet and public gateway information to get the ID # (mostly useful if a name is provided) subnet_info = self.get_subnet(subnet) if "errors" in subnet_info: return subnet_info try: # Connect to api endpoint for subnets path = ("/v1/subnets/{}/public_gateway?version={}" "&generation={}".format(subnet_info["id"], self.cfg["version"], self.cfg["generation"])) data = qw("iaas", "DELETE", path, headers()) # Return data if data["response"].status != 204: return data["data"] # Return status return resource_deleted() except Exception as error: print("Error detaching subnet's public gateway for" "subnet {}. {}".format(subnet, error)) raise
[docs] def delete_subnet(self, subnet): """Delete subnet :param subnet: Subnet name or ID :type subnet: str :return: Delete status :rtype: resource_deleted() """ try: # Check if subnet exists subnet_info = self.get_subnet(subnet) if "errors" in subnet_info: return subnet_info # Connect to api endpoint for subnets path = ("/v1/subnets/{}?version={}&generation={}".format( subnet_info["id"], self.cfg["version"], self.cfg["generation"])) data = qw("iaas", "DELETE", path, headers()) # Return data if data["response"].status != 204: return data["data"] # Return status return resource_deleted() except Exception as error: print("Error deleting subnet with name {}. {}".format(subnet, error)) raise