Source code for ibmcloud_python_sdk.cis.storage.client

import ibm_boto3
import re

from ibmcloud_python_sdk.utils.object_regions import endpoints
from ibmcloud_python_sdk.resource import resource_instance
from ibmcloud_python_sdk.utils.common import resource_not_found
from ibmcloud_python_sdk.config import params
from ibmcloud_python_sdk.auth import decode_token
from botocore.client import Config


def _get_endpoint(**kwargs):
    """Get endpoint storage URL from lookup based on mode and region

    :param mode: Access mode
    :type mode: str
    :param region: Region where to host the bucket
    :type region: str
    :return URL endpoint
    :rtype: str
    """
    args = {
        'mode': kwargs.get('mode', 'regional'),
        'region': kwargs.get('region', 'us-south'),
    }

    try:
        url = "https://{}".format(endpoints[args["mode"]][args["region"]])
        return url

    except Exception:
        return resource_not_found()


[docs]def cos_client(**kwargs): """Create Cloud Object Storage client :param mode: Access mode :type mode: str :param location: Region where to host the bucket :type location: str :param service_instance: Resource instance name or ID :type service_instance: str :return: Cloud Object Storage client :rtype: dict """ cfg = params() # Build dict of argument and assign default value when needed args = { 'mode': kwargs.get('mode'), 'location': kwargs.get('location'), 'service_instance': kwargs.get('service_instance'), 'account': kwargs.get('account', decode_token()['account']['bss']), } ri = resource_instance.ResourceInstance() try: # Check if endpoint exists endpoint = _get_endpoint(mode=args["mode"], location=args["location"]) if "errors" in endpoint: return endpoint ri_info = None if args['service_instance']: # Check if resource instance exists and retrieve information ri_info = ri.get_resource_instance(args["service_instance"]) if "errors" in ri_info: return ri_info else: # Automatically detect if cloud-object-storage service exists. # If multiple resource instance exist then the last one to match # the regex will be used. service = "cloud-object-storage" regex = "crn:v1:bluemix:public:{}:global:a/{}".format( service, args['account']) data = ri.get_resource_instances() for instance in data['resources']: if re.search(regex, instance['id']): ri_info = instance client = ibm_boto3.client( 's3', ibm_api_key_id=cfg["key"], ibm_service_instance_id=ri_info['id'], config=Config(signature_version='oauth'), endpoint_url=endpoint ) return client except Exception as error: print("Error creating Cloud Object Storage client. {}".format(error))