import json
from ibmcloud_python_sdk.config import params
from ibmcloud_python_sdk.auth import get_headers as headers
from ibmcloud_python_sdk.utils.common import query_wrapper as qw
from ibmcloud_python_sdk.resource import resource_instance
from ibmcloud_python_sdk.utils.common import resource_deleted
from ibmcloud_python_sdk.vpc import vpc
from ibmcloud_python_sdk.utils.common import check_args
[docs]class Dns():
def __init__(self):
self.cfg = params()
self.resource_instance = resource_instance.ResourceInstance()
self.vpc = vpc.Vpc()
# resource_group_id and self.resource_plan_id for free dns instance
self.resource_group_id = "aef66560191746fe804b9a66874f62b1"
self.resource_plan_id = "dc1460a6-37bd-4e2b-8180-d0f86ff39baa"
[docs] def get_dns_zones(self, **kwargs):
"""Get all DNS zones hosted by a resource instance
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
:return: DNS zones
:rtype: list
"""
args = ['resource_instance']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'resource_instance': kwargs.get('resource_instance'),
}
# Get resource instance guid
temp_ri = self.resource_instance.get_resource_instance(
args['resource_instance'])
if "errors" in temp_ri:
return temp_ri
resource_instance_guid = temp_ri["guid"]
try:
# Connect to api endpoint for dns zones
path = ("/v1/instances/{}/dnszones").format(
resource_instance_guid)
return qw("dns", "GET", path, headers())["data"]
except Exception as error:
print("Error creating dns zone. {}".format(error))
raise
[docs] def get_dns_zone(self, **kwargs):
"""Get a specific DNS zone hosted by a resource instance
:param dns_zone: DNS zone name or ID to query
:type dns_zone: str
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
:return: DNS zone information
:rtype: dict
"""
# Required parameters
args = ['dns_zone', 'resource_instance']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'dns_zone': kwargs.get('dns_zone'),
'resource_instance': kwargs.get('resource_instance'),
}
by_name = self.get_dns_zone_by_name(dns_zone=args['dns_zone'],
resource_instance=args[
'resource_instance'
])
if "errors" in by_name:
for key_name in by_name["errors"]:
if key_name["code"] == "not_found":
by_guid = self.get_dns_zone_by_id(dns_zone=args[
'dns_zone'],
resource_instance=args[
'resource_instance'])
if "errors" in by_guid:
return by_guid
return by_guid
return by_name
else:
return by_name
[docs] def get_dns_zone_by_name(self, **kwargs):
"""Get DNS zone by name
:param dns_zone: DNS zone name to query
:type dns_zone: str
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
:return: DNS zone information
:rtype: dict
"""
# Required parameters
args = ['dns_zone', 'resource_instance']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'dns_zone': kwargs.get('dns_zone'),
'resource_instance': kwargs.get('resource_instance'),
}
# get resource instance guid
temp_ri = self.resource_instance.get_resource_instance(args[
'resource_instance'])
if "errors" in temp_ri:
return temp_ri
resource_instance_guid = temp_ri["guid"]
try:
# Connect to api endpoint for dns zones
path = ("/v1/instances/{}/dnszones").format(
resource_instance_guid)
dns_zones = qw("dns", "GET", path, headers())["data"]
if "errors" in dns_zones or not dns_zones:
return ({"errors": [{"code": "not_found",
"message": "No dns zones found."}]})
# Find the existing domain matching the query
for dns_zone in dns_zones['dnszones']:
if dns_zone["name"] == args['dns_zone']:
return dns_zone
# Return error message if no existing domain matches the query
return ({"errors": [{"code": "not_found",
"message": "No dns zone found."}]})
except Exception as error:
print("Error creating dns zone. {}".format(error))
raise
[docs] def get_dns_zone_by_id(self, **kwargs):
"""Get DNS zone by id
:param dns_zone: DNS zone ID to query
:type dns_zone: str
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
:return: DNS zone information
:rtype: dict
"""
# Required parameters
args = ['dns_zone', 'resource_instance']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'dns_zone': kwargs.get('dns_zone'),
'resource_instance': kwargs.get('resource_instance'),
}
# get resource instance guid
temp_ri = self.resource_instance.get_resource_instance(
args['resource_instance']
)
if "errors" in temp_ri:
return temp_ri
resource_instance_guid = temp_ri["guid"]
try:
# Connect to api endpoint for dns zones
path = ("/v1/instances/{}/dnszones").format(
resource_instance_guid)
dns_zones = qw("dns", "GET", path, headers())["data"]
if "errors" in dns_zones or not dns_zones:
return ({"errors": [{"code": "not_found",
"message": "No dns zone found."}]})
# Find the existing domain matching the query
for dns_zone in dns_zones['dnszones']:
if dns_zone["id"] == args['dns_zone']:
return dns_zone
# Return error message if no existing domain matches the query
return ({"errors": [{"code": "not_found",
"message": "No dns zone found."}]})
except Exception as error:
print("Error creating dns zone. {}".format(error))
raise
# DO WE NEED TO KEEP THIS FUNCTION?
# def get_dns_zone_id(self, **kwargs):
# """Get DNS zone by ID
# :param dns_zone: the dns zone name.
# :param resource_instance: the associated resource instance.
# """
# required_args = ['dns_zone', 'resource_instance']
# check_args(required_args, **kwargs)
# # Set default value if required paramaters are not defined
# args = {
# 'dns_zone': kwargs.get('dns_zone'),
# 'resource_instance': kwargs.get('resource_instance'),
# }
# # Get Zone id
# try:
# zone = self.get_dns_zone_by_name(dns_zone=args['dns_zone'],
# resource_instance=args[
# 'resource_instance'])
# if "errors" in zone:
# for key_name in zone["errors"]:
# if key_name["code"] == "not_found":
# return zone
# return zone['id']
# except Exception as error:
# print("Error getting dns zone id: {}".format(error))
# raise
[docs] def create_zone(self, **kwargs):
"""Create a zone in a specified resource instance
:param dns_zone: The user-defined name to create
:type dns_zone: str
:param description: A description for the domain
:type description: str, optional
:param label: A label for the domain
:type label: str, optional
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
"""
# Required parameters
args = ['dns_zone', 'resource_instance']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'name': kwargs.get('dns_zone'),
'description': kwargs.get('description') or "",
'label': kwargs.get('label') or "",
}
# get resource instance guid
temp_ri = self.resource_instance.get_resource_instance(kwargs.get(
'resource_instance'))
if "errors" in temp_ri:
return temp_ri
resource_instance_guid = temp_ri["guid"]
zone = self.get_dns_zone(
dns_zone=args['name'],
resource_instance=resource_instance_guid)
if "errors" in zone:
for key_zone in zone["errors"]:
if key_zone["code"] == "not_found":
payload = {}
# Construct payload
for key, value in args.items():
payload[key] = value
try:
# Connect to api endpoint for dns zone
path = ("/v1/instances/{}/dnszones").format(
resource_instance_guid)
# Return data
return qw("dns", "POST", path, headers(),
json.dumps(payload))["data"]
except Exception as error:
print("Error creating dns zone. {}".format(error))
raise
return zone
[docs] def delete_zone(self, **kwargs):
"""Delete a zone in from a specified resource instance
:param dns_zone: DNS zone name to delete
:type dns_zone: str
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
"""
# Required parameters
args = ['dns_zone', 'resource_instance']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'name': kwargs.get('dns_zone'),
'resource_instance': kwargs.get('resource_instance'),
}
# get resource instance guid
temp_ri = self.resource_instance.get_resource_instance(
args['resource_instance'])
if "errors" in temp_ri:
return temp_ri
resource_instance_guid = temp_ri["guid"]
dns_zone = self.get_dns_zone(dns_zone=args['name'],
resource_instance=args[
'resource_instance'])
if "errors" in dns_zone:
return dns_zone
dns_zone_id = dns_zone['id']
try:
# Connect to api endpoint for dns zone
path = ("/v1/instances/{}/dnszones/{}").format(
resource_instance_guid,
dns_zone_id)
# Return data
result = qw("dns", "DELETE", path, headers())
if result["data"] is None:
if result["response"].getcode() == 204:
return({"message":
"deletion request successfully initiated"})
return result
except Exception as error:
print("Error creating dns zone. {}".format(error))
raise
[docs] def add_permitted_network(self, **kwargs):
"""Add permitted network to DNS zone
:param dns_zone: DNS zone name
:type dns_zone: str
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
:param vpc: The allowed VPC name or ID
:type vpc: str
"""
# Required parameters
args = ['dns_zone', 'resource_instance', 'vpc']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'dns_zone': kwargs.get('dns_zone'),
'vpc': kwargs.get('vpc'),
'resource_instance': kwargs.get('resource_instance'),
}
# get resource instance guid
temp_ri = self.resource_instance.get_resource_instance(
args['resource_instance'])
if "errors" in temp_ri:
return temp_ri
resource_instance_guid = temp_ri["guid"]
# get vpc crn
temp_vpc = self.vpc.get_vpc(args["vpc"])
if "errors" in temp_vpc:
return temp_vpc
vpc_crn = temp_vpc["crn"]
# Get zone ID
zone_id = self.get_dns_zone_id(
dns_zone=args['dns_zone'],
resource_instance=resource_instance_guid)
if "errors" in zone_id:
return zone_id
payload = {}
# Construct payload
payload["type"] = "vpc"
payload["permitted_network"] = {}
payload["permitted_network"]["vpc_crn"] = vpc_crn
try:
# Connect to api endpoint for permitted network
path = ("/v1/instances/{}/dnszones/{}/permitted_networks").format(
resource_instance_guid, zone_id)
return qw("dns", "POST", path, headers(),
json.dumps(payload))["data"]
except Exception as error:
print("Error adding permitted network. {}".format(error))
raise
[docs] def delete_permitted_network(self, **kwargs):
"""Delete permitted network to dns zone
:param dns_zone: DNS zone name
:type dns_zone: str
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
:param vpc: The allowed VPC name or ID
:type vpc: str
"""
# Required parameters
args = ['dns_zone', 'resource_instance', 'vpc']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'dns_zone': kwargs.get('dns_zone'),
'vpc': kwargs.get('vpc'),
'resource_instance': kwargs.get('resource_instance'),
}
# get resource instance guid
temp_ri = self.resource_instance.get_resource_instance(
args['resource_instance']
)
if "errors" in temp_ri:
return temp_ri
resource_instance_guid = temp_ri["guid"]
# get vpc id
temp_vpc = self.vpc.get_vpc(args["vpc"])
if "errors" in temp_vpc:
return temp_vpc
vpc_id = temp_vpc["id"]
# get zone ID
zone_id = self.get_dns_zone_id(
dns_zone=args['dns_zone'],
resource_instance=resource_instance_guid,)
if "errors" in zone_id:
return zone_id
# Construct payload
try:
# Connect to api endpoint for permitted network
path = (
"/v1/instances/{}/dnszones/{}/permitted_networks/{}").format(
resource_instance_guid, zone_id, vpc_id)
result = qw("dns", "DELETE", path, headers())
if result["data"] is None:
if result["response"].getcode() == 204:
return({"message":
"deletion request successfully initiated"})
return result
return result
except Exception as error:
print("Error removing permitted network. {}".format(error))
raise
[docs] def create_resource_record(self, **kwargs):
"""Add record in a specified zone
:param dns_zone: DNS zone name
:type dns_zone: str
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
:param record: The record to add in the zone, example:
'{"name": "testB", "type": "A", "rdata": {"ip": "4.5.6.7"}}'
:type record: dict
"""
# Required parameters
args = ['dns_zone', 'resource_instance', 'record']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'dns_zone': kwargs.get('dns_zone'),
'resource_instance': kwargs.get('resource_instance'),
'record': kwargs.get('record'),
}
# get resource instance guid
temp_ri = self.resource_instance.get_resource_instance(
args['resource_instance'])
if "errors" in temp_ri:
return temp_ri
resource_instance_guid = temp_ri["guid"]
# Get Zone id
try:
zone = self.get_dns_zone(dns_zone=args['dns_zone'],
resource_instance=args[
'resource_instance'])
zone_id = zone['id']
except Exception as error:
print("Error getting zone id. {}".format(error))
raise
try:
# Connect to api endpoint for resource records
path = ("/v1/instances/{}/dnszones/{}/resource_records").format(
resource_instance_guid, zone_id)
return qw("dns", "POST", path, headers(),
json.dumps(args['record']))["data"]
except Exception as error:
print("Error adding resource record. {}".format(error))
raise
[docs] def get_resource_records(self, **kwargs):
"""Get record list for a specified DNS zone
:param dns_zone: DNS zone name
:type dns_zone: str
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
:return: Record list
:rtype: list
"""
# Required parameters
args = ['dns_zone', 'resource_instance']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'dns_zone': kwargs.get('dns_zone'),
'resource_instance': kwargs.get('resource_instance'),
}
# get resource instance guid
temp_ri = self.resource_instance.get_resource_instance(
args['resource_instance'])
if "errors" in temp_ri:
return temp_ri
resource_instance_guid = temp_ri["guid"]
# Get zone ID
zone_id = self.get_dns_zone_id(
dns_zone=args['dns_zone'],
resource_instance=args['resource_instance'])
try:
# Connect to api endpoint for resource records
path = ("/v1/instances/{}/dnszones/{}/resource_records").format(
resource_instance_guid, zone_id)
return qw("dns", "GET", path, headers())["data"]
except Exception as error:
print("Error getting resource records. {}".format(error))
raise
[docs] def delete_resource_record(self, **kwargs):
"""Delete record in a specified zone
:param dns_zone: DNS zone name
:type dns_zone: str
:param record: Record name to delete
:type record: str
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
"""
# Required parameters
args = ['dns_zone', 'record', 'resource_instance']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'dns_zone': kwargs.get('dns_zone'),
'resource_instance': kwargs.get('resource_instance'),
'record': kwargs.get('record'),
}
# get resource instance guid
temp_ri = self.resource_instance.get_resource_instance(
args['resource_instance'])
if "errors" in temp_ri:
return temp_ri
resource_instance_guid = temp_ri["guid"]
# Get zone ID and resource instane GUID
zone_id = self.get_dns_zone_id(
dns_zone=args["dns_zone"],
resource_instance=args["resource_instance"])
# Get record ID
record = self.get_resource_record(dns_zone=args["dns_zone"],
resource_instance=args[
"resource_instance"],
record=args["record"])
if "errors" in record:
return record
record_id = record["id"]
try:
# Connect to api endpoint for resource records
path = ("/v1/instances/{}/dnszones/{}/resource_records/{}").format(
resource_instance_guid,
zone_id, record_id)
result = qw("dns", "DELETE", path, headers())["data"]
if result is None:
return resource_deleted()
return result
except Exception as error:
print("Error deleting zone. {}".format(error))
raise
else:
return ({"errors": [{"code": "not_found",
"message": "No dns record found"}]})
[docs] def get_resource_record(self, **kwargs):
"""Get specific resource record from a DNS zone
:param dns_zone: DNS zone name
:type dns_zone: str
:param record: Record name
:type record: str
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
:return: Record information
:rtype: dict
"""
# Required parameters
args = ['dns_zone', 'record', 'resource_instance']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'dns_zone': kwargs.get('dns_zone'),
'resource_instance': kwargs.get('resource_instance'),
'record': kwargs.get('record'),
}
# get resource instance guid
temp_ri = self.resource_instance.get_resource_instance(
args['resource_instance'])
if "errors" in temp_ri:
return temp_ri
resource_instance_guid = temp_ri["guid"]
by_name = self.get_resource_record_by_name(dns_zone=args[
'dns_zone'],
resource_instance=resource_instance_guid,
record_name=args['record'])
if "errors" in by_name:
for key_name in by_name["errors"]:
if key_name["code"] == "not_found":
by_id = self.get_resource_record_by_id(dns_zone=args[
'dns_zone'],
resource_instance=resource_instance_guid,
record_id=args['record'])
if "errors" in by_id:
return by_id
return by_id
return by_name
else:
return by_name
[docs] def get_resource_record_by_name(self, **kwargs):
"""Get record by name
:param dns_zone: DNS zone name
:type dns_zone: str
:param record_name: Record name
:type record_name: str
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
:return: Record information
:rtype: dict
"""
# Required parameters
args = ['dns_zone', 'record_name', 'resource_instance']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'dns_zone': kwargs.get('dns_zone'),
'resource_instance': kwargs.get('resource_instance'),
'record_name': kwargs.get('record_name'),
}
# Get list of records
records = self.get_resource_records(
dns_zone=args['dns_zone'],
resource_instance=args["resource_instance"])['resource_records']
for record in records:
if record['name'] == args['record_name']:
return record
return ({"errors": [{"code": "not_found",
"message": "No record found"}]})
[docs] def get_resource_record_by_id(self, **kwargs):
"""Get record by ID
:param dns_zone: DNS zone name
:type dns_zone: str
:param record_id: Record ID
:type record_name: str
:param resource_instance: Name or GUID of the resource instance
:type resource_instance: str
:return: Record information
:rtype: dict
"""
# Required parameters
args = ['dns_zone', 'record_id', 'resource_instance']
check_args(args, **kwargs)
# Set default value if required paramaters are not defined
args = {
'dns_zone': kwargs.get('dns_zone'),
'resource_instance': kwargs.get('resource_instance'),
'record_id': kwargs.get('record_id'),
}
# Get zone ID and resource instane GUID
records = self.get_resource_records(
dns_zone=args['dns_zone'],
resource_instance=args["resource_instance"])['resource_records']
for record in records:
if record['id'] == args['record_id']:
return record
return ({"errors": [{"code": "not_found",
"message": "No record found"}]})