import json
from ibmcloud_python_sdk.config import params
from ibmcloud_python_sdk.auth import get_headers as headers
from ibmcloud_python_sdk.utils.common import query_wrapper as qw
from ibmcloud_python_sdk.vpc import subnet
from ibmcloud_python_sdk.utils.common import resource_not_found
from ibmcloud_python_sdk.utils.common import resource_deleted
from ibmcloud_python_sdk.utils.common import resource_found
from ibmcloud_python_sdk.utils.common import resource_created
from ibmcloud_python_sdk.utils.common import check_args
from ibmcloud_python_sdk.resource import resource_group
[docs]class Vpn():
def __init__(self):
self.cfg = params()
self.subnet = subnet.Subnet()
self.rg = resource_group.ResourceGroup()
[docs] def get_ike_policies(self):
"""Retrieve IKE policy list
:return: List of IKE policies
:rtype: list
"""
try:
# Connect to api endpoint for ike_policies
path = ("/v1/ike_policies?version={}&generation={}".format(
self.cfg["version"], self.cfg["generation"]))
# Return data
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching IKE policies. {}".format(error))
raise
[docs] def get_ike_policy(self, policy):
"""Retrieve specific IKE policy
:param policy: Policy name or ID
:type policy: str
:return: IKE policy information
:rtype: dict
"""
by_name = self.get_ike_policy_by_name(policy)
if "errors" in by_name:
for key_name in by_name["errors"]:
if key_name["code"] == "not_found":
by_id = self.get_ike_policy_by_id(policy)
if "errors" in by_id:
return by_id
return by_id
else:
return by_name
else:
return by_name
[docs] def get_ike_policy_by_id(self, id):
"""Retrieve specific IKE policy by ID
:param id: IKE policy ID
:type id: str
:return: IKE policy information
:rtype: dict
"""
try:
# Connect to api endpoint for ike_policies
path = ("/v1/ike_policies/{}?version={}&generation={}".format(
id, self.cfg["version"], self.cfg["generation"]))
# Return data
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching IKE policy with ID {}. {}".format(id, error))
raise
[docs] def get_ike_policy_by_name(self, name):
"""Retrieve specific IKE policy by name
:param name: IKE policy name
:type name: str
:return: IKE policy information
:rtype: dict
"""
try:
# Retrieve policies
data = self.get_ike_policies()
if "errors" in data:
return data
# Loop over policies until filter match
for policy in data["ike_policies"]:
if policy["name"] == name:
# Return data
return policy
# Return error if no IKE policy is found
return resource_not_found()
except Exception as error:
print("Error fetching IKE policy with name {}. {}".format(
name, error))
raise
[docs] def get_ike_policy_connections(self, policy):
"""Retrieve connections for an IKE policy
:param policy: IKE policy name or ID
:type policy: str
:return: Connections information
:rtype: dict
"""
# Retrieve policy information
policy_info = self.get_ike_policy(policy)
if "errors" in policy_info:
return policy_info
try:
# Connect to api endpoint for ike_policies
path = ("/v1/ike_policies/{}/connections?version={}"
"&generation={}".format(policy_info["id"],
self.cfg["version"],
self.cfg["generation"]))
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching connection for IKE policy {}. {}".format(
policy, error))
raise
[docs] def get_ipsec_policies(self):
"""Retrieve IPsec policy list
:return: List of IPSec policies
:rtype: list
"""
try:
# Connect to api endpoint for ipsec_policies
path = ("/v1/ipsec_policies?version={}&generation={}".format(
self.cfg["version"], self.cfg["generation"]))
# Return data
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching IPsec policies. {}".format(error))
raise
[docs] def get_ipsec_policy(self, policy):
"""Retrieve specific IPsec policy
:param policy: Policy name or ID
:type policy: str
:return: IPSec policy information
:rtype: dict
"""
by_name = self.get_ipsec_policy_by_name(policy)
if "errors" in by_name:
for key_name in by_name["errors"]:
if key_name["code"] == "not_found":
by_id = self.get_ike_policy_by_id(policy)
if "errors" in by_id:
return by_id
return by_id
else:
return by_name
else:
return by_name
[docs] def get_ipsec_policy_by_id(self, id):
"""Retrieve specific IPsec policy by ID
:param id: IPsec policy ID
:type id: str
:return: IPSec policy information
:rtype: dict
"""
try:
# Connect to api endpoint for ipsec_policies
path = ("/v1/ipsec_policies/{}?version={}&generation={}".format(
id, self.cfg["version"], self.cfg["generation"]))
# Return data
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching IPsec policy with ID {}. {}".format(
id, error))
raise
[docs] def get_ipsec_policy_by_name(self, name):
"""Retrieve specific IPsec policy by name
:param name: IPsec policy name
:type name: str
:return: IPSec policy information
:rtype: dict
"""
try:
# Retrieve policies
data = self.get_ipsec_policies()
if "errors" in data:
return data
# Loop over policies until filter match
for policy in data["ipsec_policies"]:
if policy["name"] == name:
# Return data
return policy
# Return error if no IPsec policy is found
return resource_not_found()
except Exception as error:
print("Error fetching IPsec policy with name {}. {}".format(
name, error))
raise
[docs] def get_ipsec_policy_connections(self, policy):
"""Retrieve connections for an IPsec policy
:param policy: IPsec policy name or ID
:type policy: str
:return: Connections information
:rtype: dict
"""
try:
# Retrieve policy information by name to get the ID
policy_info = self.get_ipsec_policy(policy)
if "errors" in policy_info:
return policy_info
# Connect to api endpoint for ipsec_policies
path = ("/v1/ipsec_policies/{}/connections?version={}"
"&generation={}".format(policy_info["id"],
self.cfg["version"],
self.cfg["generation"]))
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching connection for IPsec policy {}. {}".format(
policy, error))
raise
[docs] def get_vpn_gateways(self):
"""Retrieve VPN gateway list
:return: List of gateways
:rtype: list
"""
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways?version={}&generation={}".format(
self.cfg["version"], self.cfg["generation"]))
# Return data
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching VPN gateways. {}".format(error))
raise
[docs] def get_vpn_gateway(self, gateway):
"""Retrieve specific VPN gateway
:param gateway: VPN gateway name or ID
:type gateway: str
:return: Gateway information
:rtype: dict
"""
by_name = self.get_vpn_gateway_by_name(gateway)
if "errors" in by_name:
for key_name in by_name["errors"]:
if key_name["code"] == "not_found":
by_id = self.get_vpn_gateway_by_id(gateway)
if "errors" in by_id:
return by_id
return by_id
else:
return by_name
else:
return by_name
[docs] def get_vpn_gateway_by_id(self, id):
"""Retrieve specific VPN gateway by ID
:param id: VPN gateway ID
:type id: str
:return: Gateway information
:rtype: dict
"""
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways/{}?version={}&generation={}".format(
id, self.cfg["version"], self.cfg["generation"]))
# Return data
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching VPN gateway with ID {}. {}".format(
id, error))
raise
[docs] def get_vpn_gateway_by_name(self, name):
"""Retrieve specific VPN gateway by name
:param name: VPN gateway name
:type name: str
:return: Gateway information
:rtype: dict
"""
try:
# Retrieve gateways
data = self.get_vpn_gateways()
if "errors" in data:
return data
# Loop over gateways until filter match
for gateway in data["vpn_gateways"]:
if gateway["name"] == name:
# Return data
return gateway
# Return error if no VPN gateway is found
return resource_not_found()
except Exception as error:
print("Error fetching VPN gateway with name {}. {}".format(
name, error))
raise
[docs] def get_vpn_gateway_connections(self, gateway):
"""Retrieve connections for a VPN gateway
:param gateway: VPN gateway name or ID
:type gateway: str
:return: List of connections
:rtype: list
"""
# Retrieve gateway information by name to get the ID
gateway_info = self.get_vpn_gateway(gateway)
if "errors" in gateway_info:
return gateway_info
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways/{}/connections?version={}"
"&generation={}".format(gateway_info["id"],
self.cfg["version"],
self.cfg["generation"]))
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching connections for VPN gateway {}. {}".format(
gateway, error))
raise
[docs] def get_vpn_gateway_connection(self, gateway, connection):
"""Retrieve specific connection for a VPN gateway
:param gateway: VPN gateway name or ID
:type gateway: str
:param connection: Connection name or ID
:type connection: str
:return: Connection information
:rtype: dict
"""
by_name = self.get_vpn_gateway_connection_by_name(gateway, connection)
if "errors" in by_name:
for key_name in by_name["errors"]:
if key_name["code"] == "not_found":
by_id = self.get_vpn_gateway_connection_by_id(gateway,
connection)
if "errors" in by_id:
return by_id
return by_id
else:
return by_name
else:
return by_name
[docs] def get_vpn_gateway_connection_by_id(self, gateway, id):
"""Retrieve specific connection for a VPN gateway by ID
:param gateway: VPN gateway name or ID
:type gateway: str
:param id: Connecton ID
:type id: str
:return: Connection information
:rtype: dict
"""
# Retrieve gateway information to get the ID
# (mostly useful if a name is provided)
gateway_info = self.get_vpn_gateway(gateway)
if "errors" in gateway_info:
return gateway_info
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways/{}/connections/{}?version={}"
"&generation={}".format(gateway_info["id"], id,
self.cfg["version"],
self.cfg["generation"]))
# Return data
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching connection with ID {} for VPN gateway with"
" ID {}. {}".format(gateway, id, error))
raise
[docs] def get_vpn_gateway_connection_by_name(self, gateway, name):
"""Retrieve specific connection for a VPN gateway by name
:param gateway: VPN gateway name
:type gateway: str
:param name: Connection name
:type name: str
:return: Connection information
:rtype: dict
"""
# Retrieve gateway information to get the ID
# (mostly useful if a name is provided)
gateway_info = self.get_vpn_gateway(gateway)
if "errors" in gateway_info:
return gateway_info
try:
# Retrieve gateway connections
data = self.get_vpn_gateway_connections(gateway_info["id"])
if "errors" in data:
return data
# Loop over connections until filter match
for connection in data["connections"]:
if connection["name"] == name:
# Return data
return connection
# Return error if no VPN gateway connection is found
return resource_not_found()
except Exception as error:
print("Error fetching connection with name {} for VPN gateway"
" with name {}. {}".format(name, gateway, error))
raise
[docs] def get_vpn_gateway_local_cidrs(self, gateway, connection):
"""Retrieve local CIDR list on specific connection for a VPN gateway
:param gateway: VPN gateway name
:type gateway: str
:param connection: Connection name or ID
:type connection: str
:return: List of local CIDRs
:rtype: list
"""
# Retrieve gateway information to get the ID
gateway_info = self.get_vpn_gateway(gateway)
if "errors" in gateway_info:
return gateway_info
# Retrieve connection information to get the ID
connection_info = self.get_vpn_gateway_connection(gateway_info["id"],
connection)
if "errors" in connection_info:
return connection_info
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways/{}/connections/{}/local_cidrs"
"?version={}&generation={}".format(gateway_info["id"],
connection_info["id"],
self.cfg["version"],
self.cfg["generation"]))
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching local CIDRs for connection {} in VPN gateway"
" {}. {}".format(connection, gateway, error))
raise
[docs] def check_vpn_gateway_local_cidr(self, gateway, connection, prefix_address,
prefix_length):
"""Check if local CIDR exists on specific connection for a VPN gateway
:param gateway: VPN gateway name
:type gateway: str
:param connection: Connection name or ID
:type connection: str
:param prefix_address: The prefix address part of the CIDR
:type prefix_address: str
:param prefix_length: The prefix length part of the CIDR
:type prefix_length: int
:return: Local CIDR information
:rtype: dict
"""
# Retrieve gateway information to get the ID
gateway_info = self.get_vpn_gateway(gateway)
if "errors" in gateway_info:
return gateway_info
# Retrieve connection information to get the ID
connection_info = self.get_vpn_gateway_connection(gateway_info["id"],
connection)
if "errors" in connection_info:
return connection_info
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways/{}/connections/{}/local_cidrs/{}/{}"
"?version={}&generation={}".format(gateway_info["id"],
connection_info["id"],
prefix_address,
prefix_length,
self.cfg["version"],
self.cfg["generation"]))
data = qw("iaas", "GET", path, headers())
# Return data if error
if data["response"].status != 204:
return data["data"]
# Return custom JSON
payload = {"local_cidr": ("{}/{}".format(prefix_address,
prefix_length))}
return resource_found(payload)
except Exception as error:
print("Error fetching local CIDR {}/{} for connection {} in VPN"
" gateway {}. {}".format(prefix_address, prefix_length,
connection, gateway, error))
raise
[docs] def get_vpn_gateway_peer_cidrs(self, gateway, connection):
"""Retrieve peer CIDR list on specific connection for a VPN gateway
:param gateway: VPN gateway name
:type gateway: str
:param connection: Connection name or ID
:type connection: str
:return: List of peer CIDRs
:rtype: list
"""
# Retrieve gateway information to get the ID
gateway_info = self.get_vpn_gateway(gateway)
if "errors" in gateway_info:
return gateway_info
# Retrieve connection information to get the ID
connection_info = self.get_vpn_gateway_connection(gateway_info["id"],
connection)
if "errors" in connection_info:
return connection_info
try:
path = ("/v1/vpn_gateways/{}/connections/{}/peer_cidrs"
"?version={}&generation={}".format(gateway_info["id"],
connection_info["id"],
self.cfg["version"],
self.cfg["generation"]))
return qw("iaas", "GET", path, headers())["data"]
except Exception as error:
print("Error fetching peer CIDRs for connection {} in VPN gateway"
" {}. {}".format(connection, gateway, error))
raise
[docs] def check_vpn_gateway_peer_cidr(self, gateway, connection, prefix_address,
prefix_length):
"""Check if local CIDR exists on specific connection for a VPN gateway
:param gateway: VPN gateway name
:type gateway: str
:param connection: Connection name or ID
:type connection: str
:param prefix_address: The prefix address part of the CIDR
:type prefix_address: str
:param prefix_length: The prefix length part of the CIDR
:type prefix_length: int
:return: Peer CIDR information
:rtype: dict
"""
# Retrieve gateway information to get the ID
gateway_info = self.get_vpn_gateway(gateway)
if "errors" in gateway_info:
return gateway_info
# Retrieve connection information to get the ID
connection_info = self.get_vpn_gateway_connection(gateway_info["id"],
connection)
if "errors" in connection_info:
return connection_info
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways/{}/connections/{}/peer_cidrs/{}/{}"
"?version={}&generation={}".format(gateway_info["id"],
connection_info["id"],
prefix_address,
prefix_length,
self.cfg["version"],
self.cfg["generation"]))
data = qw("iaas", "GET", path, headers())
# Return data if error
if data["response"].status != 204:
return data["data"]
# Return custom JSON
payload = {"peer_cidr": ("{}/{}".format(prefix_address,
prefix_length))}
return resource_found(payload)
except Exception as error:
print("Error fetching peer CIDR {}/{} for connection {} in VPN"
" gateway {}. {}".format(prefix_address, prefix_length,
connection, gateway, error))
raise
[docs] def create_ike_policy(self, **kwargs):
"""Create IKE policy
:param name: The user-defined name for this IKE policy
:type name: str, optional
:param resource_group: The resource group to use
:type resource_group: str, optional
:param authentication_algorithm: The authentication algorithm
:type authentication_algorithm: str
:param dh_group: The Diffie-Hellman group
:type dh_group: str
:param encryption_algorithm: The encryption algorithm
:type encryption_algorithm: str
:param ike_version: The IKE protocol version
:type ike_version: int
:param key_lifetime: The key lifetime in seconds
:type key_lifetime: int, optional
"""
args = ["authentication_algorithm", "dh_group",
"encryption_algorithm", "ike_version"]
check_args(args, **kwargs)
# Build dict of argument and assign default value when needed
args = {
'name': kwargs.get('name'),
'resource_group': kwargs.get('resource_group'),
'authentication_algorithm': kwargs.get('authentication_algorithm'),
'dh_group': kwargs.get('dh_group'),
'encryption_algorithm': kwargs.get('encryption_algorithm'),
'ike_version': kwargs.get('ike_version'),
'key_lifetime': kwargs.get('key_lifetime'),
}
# Construct payload
payload = {}
for key, value in args.items():
if value is not None:
if key == "resource_group":
rg_info = self.rg.get_resource_group(
args["resource_group"])
if "errors" in rg_info:
return rg_info
payload["resource_group"] = {"id": rg_info["id"]}
else:
payload[key] = value
try:
# Connect to api endpoint for ike_policies
path = ("/v1/ike_policies?version={}&generation={}".format(
self.cfg["version"], self.cfg["generation"]))
# Return data
return qw("iaas", "POST", path, headers(),
json.dumps(payload))["data"]
except Exception as error:
print("Error creating IKE policy. {}".format(error))
raise
[docs] def create_ipsec_policy(self, **kwargs):
"""Create IPsec policy
:param name: The user-defined name for this IPsec policy
:type name: str, optional
:param resource_group: The resource group to use
:type resource_group: str, optional
:param authentication_algorithm: The authentication algorithm
:type authentication_algorithm: str
:param pfs: Perfect Forward Secrecy
:type pfs: str
:param encryption_algorithm: The encryption algorithm
:type encryption_algorithm: str
:param key_lifetime: The key lifetime in seconds
:type key_lifetime: int, optional
"""
args = ["authentication_algorithm", "pfs",
"encryption_algorithm"]
check_args(args, **kwargs)
# Build dict of argument and assign default value when needed
args = {
'name': kwargs.get('name'),
'resource_group': kwargs.get('resource_group'),
'authentication_algorithm': kwargs.get('authentication_algorithm'),
'pfs': kwargs.get('pfs'),
'encryption_algorithm': kwargs.get('encryption_algorithm'),
'key_lifetime': kwargs.get('key_lifetime'),
}
# Construct payload
payload = {}
for key, value in args.items():
if value is not None:
if key == "resource_group":
rg_info = self.rg.get_resource_group(
args["resource_group"])
if "errors" in rg_info:
return rg_info
payload["resource_group"] = {"id": rg_info["id"]}
else:
payload[key] = value
try:
# Connect to api endpoint for ipsec_policies
path = ("/v1/ipsec_policies?version={}&generation={}".format(
self.cfg["version"], self.cfg["generation"]))
# Return data
return qw("iaas", "POST", path, headers(),
json.dumps(payload))["data"]
except Exception as error:
print("Error creating IPsec policy. {}".format(error))
raise
[docs] def create_gateway(self, **kwargs):
"""Create gateway
:param name: The user-defined name for this gateway
:type name: str, optional
:param resource_group: The resource group to use
:type resource_group: str, optional
:param subnet: Identifies a subnet by a unique property
:type subnet: str
"""
args = ["subnet"]
check_args(args, **kwargs)
# Build dict of argument and assign default value when needed
args = {
'name': kwargs.get('name'),
'resource_group': kwargs.get('resource_group'),
'subnet': kwargs.get('subnet'),
}
# Retrieve subnet information to get the ID
subnet_info = self.subnet.get_subnet(args["subnet"])
if "errors" in subnet_info:
return subnet_info
# Construct payload
payload = {}
for key, value in args.items():
if value is not None:
if key == "resource_group":
rg_info = self.rg.get_resource_group(
args["resource_group"])
if "errors" in rg_info:
return rg_info
payload["resource_group"] = {"id": rg_info["id"]}
elif key == "subnet":
payload["subnet"] = {"id": subnet_info["id"]}
else:
payload[key] = value
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways?version={}&generation={}".format(
self.cfg["version"], self.cfg["generation"]))
# Return data
return qw("iaas", "POST", path, headers(),
json.dumps(payload))["data"]
except Exception as error:
print("Error creating gateway. {}".format(error))
raise
[docs] def create_connection(self, **kwargs):
"""Create connection
:param gateway: The VPN gateway name or ID
:type gateway: str
:param name: The user-defined name for this connection
:type name: str, gateway
:param peer_address: The IP address of the peer VPN gateway
:type peer_address: str
:param local_cidrs: A collection of local CIDRs
:type local_cidrs: list, optional
:param peer_cidrs: A collection of peer CIDRs
:type peer_cidrs: list, optional
:param psk: The preshared key
:type psk: str
:param admin_state_up: VPN connection shutdown if false, defaults to
`True`
:type admin_state_up: bool, optional
:param interval: Dead Peer Detection interval in seconds
:type interval: int, optional
:param timeout: Dead Peer Detection timeout in seconds
:type timeout: int, optional
:param action: Dead Peer Detection actions
:type action: str, optional
:param encryption_algorithm: The encryption algorithm
:type encryption_algorithm: str, optional
:param key_lifetime: The key lifetime in seconds
:type key_lifetime: int, optional
:param ike_policy: The absence of a policy indicates autonegotiation
:type ike_policy: str, optional
:param ipsec_policy: The absence of a policy indicates autonegotiation
:type ipsec_policy: str, optional
"""
args = ["gateway", "peer_address", "psk"]
check_args(args, **kwargs)
# Build dict of argument and assign default value when needed
args = {
'gateway': kwargs.get('gateway'),
'name': kwargs.get('name'),
'peer_address': kwargs.get('peer_address'),
'local_cidrs': kwargs.get('local_cidrs'),
'peer_cidrs': kwargs.get('peer_cidrs'),
'psk': kwargs.get('psk'),
'admin_state_up': kwargs.get('admin_state_up', True),
'dead_peer_detection': kwargs.get('dead_peer_detection'),
'encryption_algorithm': kwargs.get('encryption_algorithm'),
'key_lifetime': kwargs.get('key_lifetime'),
'ike_policy': kwargs.get('ike_policy'),
'ipsec_policy': kwargs.get('ipsec_policy'),
}
# Construct payload
payload = {}
for key, value in args.items():
if key != "gateway" and value is not None:
if key == "ike_policy":
ike_info = self.get_ike_policy(args["ike_policy"])
if "errors" in ike_info:
return ike_info
payload["ike_policy"] = {"id": ike_info["id"]}
elif key == "ipsec_policy":
ipsec_info = self.get_ipsec_policy(args["ipsec_policy"])
if "errors" in ipsec_info:
return ipsec_info
payload["ipsec_policy"] = {"id": ipsec_info["id"]}
else:
payload[key] = value
# Retrieve gateway information to get the ID
# (mostly useful if a name is provided)
gateway_info = self.get_vpn_gateway(args["gateway"])
if "errors" in gateway_info:
return gateway_info
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways/{}/connections?version={}"
"&generation={}".format(gateway_info["id"],
self.cfg["version"],
self.cfg["generation"]))
# Return data
return qw("iaas", "POST", path, headers(),
json.dumps(payload))["data"]
except Exception as error:
print("Error creating connection. {}".format(error))
raise
[docs] def add_local_cidr_connection(self, **kwargs):
"""Add local CIDR to a connection
:param gateway: The VPN gateway name or ID
:type gateway: str
:param connection: The connection name or ID
:type connection: str
:param prefix_address: The prefix address part of the CIDR
:type prefix_address: str
:param prefix_length: The prefix length part of the CIDR
:type prefix_length: int
"""
args = ["gateway", "connection", "prefix_address", "prefix_length"]
check_args(args, **kwargs)
# Build dict of argument and assign default value when needed
args = {
'gateway': kwargs.get('gateway'),
'connection': kwargs.get('connection'),
'prefix_address': kwargs.get('prefix_address'),
'prefix_length': kwargs.get('prefix_length'),
}
# Retrieve gateway information to get the ID
gateway_info = self.get_vpn_gateway(args["gateway"])
if "errors" in gateway_info:
return gateway_info
# Retrieve connection information to get the ID
connection_info = self.get_vpn_gateway_connection(gateway_info["id"],
args["connection"])
if "errors" in connection_info:
return connection_info
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways/{}/connections/{}/local_cidrs/{}/{}"
"?version={}&generation={}".format(gateway_info["id"],
connection_info["id"],
args["prefix_address"],
args["prefix_length"],
self.cfg["version"],
self.cfg["generation"]))
data = qw("iaas", "PUT", path, headers())
# Return data if error
if data["response"].status != 204:
return data["data"]
# Return custom JSON
payload = {"local_cidr": ("{}/{}".format(args["prefix_address"],
args["prefix_length"]))}
return resource_created(payload)
except Exception as error:
print("Error addind local CIDR {}/{} to connection {} on VPN"
" gateway {}. {}".format(args["prefix_address"],
args["prefix_length"],
args["connection"],
args["gateway"], error))
raise
[docs] def add_peer_cidr_connection(self, **kwargs):
"""Add peer CIDR to a connection
:param gateway: The VPN gateway name or ID
:type gateway: str
:param connection: The connection name or ID
:type connection: str
:param prefix_address: The prefix address part of the CIDR
:type prefix_address: str
:param prefix_length: The prefix length part of the CIDR
:type prefix_length: int
"""
args = ["gateway", "connection", "prefix_address", "prefix_length"]
check_args(args, **kwargs)
# Build dict of argument and assign default value when needed
args = {
'gateway': kwargs.get('gateway'),
'connection': kwargs.get('connection'),
'prefix_address': kwargs.get('prefix_address'),
'prefix_length': kwargs.get('prefix_length'),
}
# Retrieve gateway information to get the ID
gateway_info = self.get_vpn_gateway(args["gateway"])
if "errors" in gateway_info:
return gateway_info
# Retrieve connection information to get the ID
connection_info = self.get_vpn_gateway_connection(gateway_info["id"],
args["connection"])
if "errors" in connection_info:
return connection_info
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways/{}/connections/{}/peer_cidrs/{}/{}"
"?version={}&generation={}".format(gateway_info["id"],
connection_info["id"],
args["prefix_address"],
args["prefix_length"],
self.cfg["version"],
self.cfg["generation"]))
data = qw("iaas", "PUT", path, headers())
# Return data if error
if data["response"].status != 204:
return data["data"]
# Return custom JSON
payload = {"peer_cidr": ("{}/{}".format(args["prefix_address"],
args["prefix_length"]))}
return resource_created(payload)
except Exception as error:
print("Error adding peer CIDR {}/{} to connection {} on VPN"
" gateway {}. {}".format(args["prefix_address"],
args["prefix_length"],
args["connection"],
args["gateway"], error))
raise
[docs] def delete_ike_policy(self, policy):
"""Delete IKE policy
:param policy: IKE policy name or ID
:type policy: str
:return: Delete status
:rtype: dict
"""
# Check if IKE policy exists
policy_info = self.get_ike_policy(policy)
if "errors" in policy_info:
return policy_info
try:
# Connect to api endpoint for ike_policies
path = ("/v1/ike_policies/{}?version={}&generation={}".format(
policy_info["id"], self.cfg["version"],
self.cfg["generation"]))
data = qw("iaas", "DELETE", path, headers())
# Return data
if data["response"].status != 204:
return data["data"]
# Return status
return resource_deleted()
except Exception as error:
print("Error deleting IKE policy {}. {}".format(policy, error))
raise
[docs] def delete_ipsec_policy(self, policy):
"""Delete IPsec policy
:param policy: IPsec policy name or ID
:type policy: str
:return: Delete status
:rtype: dict
"""
# Check if IPsec policy exists
policy_info = self.get_ipsec_policy(policy)
if "errors" in policy_info:
return policy_info
try:
# Connect to api endpoint for ipsec_policies
path = ("/v1/ipsec_policies/{}?version={}&generation={}".format(
policy_info["id"], self.cfg["version"],
self.cfg["generation"]))
data = qw("iaas", "DELETE", path, headers())
# Return data
if data["response"].status != 204:
return data["data"]
# Return status
return resource_deleted()
except Exception as error:
print("Error deleting IPsec policy {}. {}".format(policy, error))
raise
[docs] def delete_gateway(self, gateway):
"""Delete VPN gateway
:param gateway: VPN gateway name or ID
:type gateway: str
:return: Delete status
:rtype: dict
"""
# Check if gateway exists
gateway_info = self.get_vpn_gateway(gateway)
if "errors" in gateway_info:
return gateway_info
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways/{}?version={}&generation={}".format(
gateway_info["id"], self.cfg["version"],
self.cfg["generation"]))
data = qw("iaas", "DELETE", path, headers())
# Return data
if data["response"].status != 202:
return data
# Return status
return resource_deleted()
except Exception as error:
print("Error deleting VPN gateway {}. {}".format(gateway, error))
raise
[docs] def delete_connection(self, gateway, connection):
"""Delete connection
:param gateway: VPN gateway name or ID
:type gateway: str
:param connection: Connection name or ID
:type connection: str
:return: Delete status
:rtype: dict
"""
# Retrieve gateway information to get the ID
gateway_info = self.get_vpn_gateway(gateway)
if "errors" in gateway_info:
return gateway_info
# Retrieve connection information to get the ID
connection_info = self.get_vpn_gateway_connection(gateway_info["id"],
connection)
if "errors" in connection_info:
return connection_info
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways/{}/connections/{}?version={}"
"&generation={}".format(gateway_info["id"],
connection_info["id"],
self.cfg["version"],
self.cfg["generation"]))
data = qw("iaas", "DELETE", path, headers())
# Return data
if data["response"].status != 202:
return data["data"]
# Return status
return resource_deleted()
except Exception as error:
print("Error deleting connection {} from VPN gateway {}."
" {}".format(gateway, connection, error))
raise
[docs] def remove_local_cidr(self, gateway, connection, prefix_address,
prefix_length):
"""Remove local CIDR from a connection
:param gateway: VPN gateway name or ID
:type gateway: str
:param connection: Connection name or ID
:type connection: str
:param prefix_address: The prefix address part of the CIDR
:type prefix_address: str
:param prefix_length: The prefix length part of the CIDR
:type prefix_length: int
:return: Delete status
:rtype: dict
"""
# Retrieve gateway information to get the ID
gateway_info = self.get_vpn_gateway(gateway)
if "errors" in gateway_info:
return gateway_info
# Retrieve connection information to get the ID
connection_info = self.get_vpn_gateway_connection(gateway_info["id"],
connection)
if "errors" in connection_info:
return connection_info
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways/{}/connections/{}/local_cidrs/{}/{}"
"?version={}&generation={}".format(gateway_info["id"],
connection_info["id"],
prefix_address,
prefix_length,
self.cfg["version"],
self.cfg["generation"]))
data = qw("iaas", "DELETE", path, headers())
# Return data
if data["response"].status != 204:
return data["data"]
# Return status
return resource_deleted()
except Exception as error:
print("Error removing local CIDR {}/{} in connection {} from VPN"
" gateway {}. {}".format(prefix_address, prefix_length,
connection,
gateway, error))
raise
# Remove peer CIDR
[docs] def remove_peer_cidr(self, gateway, connection, prefix_address,
prefix_length):
"""Remove peer CIDR from a connection
:param gateway: VPN gateway name or ID
:type gateway: str
:param connection: Connection name or ID
:type connection: str
:param prefix_address: The prefix address part of the CIDR
:type prefix_address: str
:param prefix_length: The prefix length part of the CIDR
:type prefix_length: int
:return: Delete status
:rtype: dict
"""
# Retrieve gateway information to get the ID
gateway_info = self.get_vpn_gateway(gateway)
if "errors" in gateway_info:
return gateway_info
# Retrieve connection information to get the ID
connection_info = self.get_vpn_gateway_connection(gateway_info["id"],
connection)
if "errors" in connection_info:
return connection_info
try:
# Connect to api endpoint for vpn_gateways
path = ("/v1/vpn_gateways/{}/connections/{}/peer_cidrs/{}/{}"
"?version={}&generation={}".format(gateway_info["id"],
connection_info["id"],
prefix_address,
prefix_length,
self.cfg["version"],
self.cfg["generation"]))
data = qw("iaas", "DELETE", path, headers())
# Return data
if data["response"].status != 204:
return data["data"]
# Return status
return resource_deleted()
except Exception as error:
print("Error removing peer CIDR {}/{} in connection {} from VPN"
" gateway {}. {}".format(prefix_address, prefix_length,
connection,
gateway, error))
raise