Source code for gdgps_apps.apps

from .client import (
    PortalClient,
    ProcessorClient
)
from .defines import *
from .exceptions import (
    InvalidAuthenticationCredentials,
    InvalidConfiguration,
    NoAvailableProcessors,
    DataNotFound,
    AlertNotFound,
    InvalidOperation,
    InvalidIdentifier,
    UnauthorizedOperation,
    ArtifactNotFound,
    FlagNotFound
)
from requests.exceptions import HTTPError
import errno
import os
import json
import logging
import re
import sys
from uuid import UUID


def frmt_resp(response):
    return ': ' + response.text if len(response.text) > 0 else '.'


uuid_regex = re.compile('[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$')


[docs]class APPS(object): """ Establish a session with the |APPS| servers. The APPS class is the main interface for interaction with |APPS|. It greatly simplifies usage of |APPS| by automatically handling tasks such as data caching to speed up and reduce the amount of back and forth communication between your local machine and the |APPS| servers. |APPS| is comprised of more than a single server which adds complexity to interfacing with it. The portal server is the webserver that your account settings and credentials are housed and accessed from and there are several |APPS| processing servers that data is uploaded to and downloaded from. This class abstracts away the complexity of interfacing with |APPS| by automatically selecting the current fastest |APPS| processing server to upload data to and by keeping track of where your data is located and how to retrieve it. To use the APPS class you will need your apps_settings file which contains your authorization credential. The APPS library will sign all your API requests with this credential to keep your data private and secure. Be sure to keep your settings file in a safe, secret place! If at any time you think your account has been compromised you can `reset your credential`_. :param settings_file: Path to the settings file containing your authorization credentials. :param portal: URL of the APPs portal to use. Most likely you will never need to override the default. Supplying this parameter will override the parameter from your settings file. :param api_path: The path of the API to use. Supplying this parameter will override the parameter from your settings file. :param key: Manually supply your API credential's key. This will override the parameter from your settings file. :param secret: Manually supply your API credential's secret. This will override the parameter from your settings file. :param download_directory: The directory that any files downloaded from apps will be stored in. By default this is the current working directory :param trust_env: Trust environment settings for proxy configuration, default authentication and similar. :param log_level: Set the logging level, OFF is silent and DEBUG is the highest verbosity. Available levels are {OFF,DEBUG,INFO,WARNING,ERROR,CRITICAL}. If log_level is None, no logging configuration will be performed. (see :ref:`logging_and_output` and :meth:`gdgps_apps.apps.APPS.set_log_level`). :exception InvalidConfiguration: If there is an unrecoverable problem with the supplied configuration. :exception InvalidAuthenticationCredentials: If the supplied authentication credentials are not supplied or are not of the correct types. """ __logger__ = logging.getLogger(__name__ + '.APPS') default_settings_path = '~/.apps_settings' profile_cache = None data_cache = {} name_to_id = {} processors = {} stream_chunk_size = 8128 MUTABLE_CREDENTIALS = ['is_staff', 'is_superuser'] is_staff_ = False is_superuser_ = False OPTIONS = { 'gipsyxdata': { 'name': { 'type': str, 'default': '<filename>', 'help': ( 'The name to use for the data item. By default the name of the file will be used. The system ' 'enforces name uniqueness, so a numeric may be added to the end after upload if there is a naming ' 'clash.'), 'short': 'n' }, 'processing_mode': { 'type': GIPSYXData.normalize_processing_mode, 'default': GIPSYXData.STATIC, 'choices': GIPSYXData.PROCESSING_MODES, 'help': GIPSYXData.PROCESSING_MODE_HELP, 'short': 'm' }, 'model_pressure': { 'type': bool, 'default': False, 'help': GIPSYXData.MODEL_PRESSURE_HELP, }, 'troposphere_model': { 'type': GIPSYXData.normalize_troposphere_model, 'default': False, 'choices': GIPSYXData.TROPOSPHERE_MODELS, 'help': GIPSYXData.TROPOSPHERE_MODEL_HELP, }, 'model_tides': { 'type': bool, 'default': GIPSYXData.PROCESSING_MODE_SETTINGS[GIPSYXData.STATIC]['model_tides'], 'help': GIPSYXData.MODEL_TIDES_HELP, }, 'ocean_loading': { 'type': bool, 'default': GIPSYXData.PROCESSING_MODE_SETTINGS[GIPSYXData.STATIC]['ocean_loading'], 'help': GIPSYXData.OCEAN_LOADING_HELP, }, 'product': { 'type': GIPSYXData.normalize_product, 'default': GIPSYXData.PRODUCT_DEFAULT, 'choices': GIPSYData.PRODUCTS, 'help': GIPSYData.PRODUCT_HELP, 'short': 'p' }, 'elev_dep_weighting': { 'type': GIPSYXData.normalize_elev_dep_weighting, 'default': GIPSYXData.PROCESSING_MODE_SETTINGS[GIPSYXData.STATIC]['elev_dep_weighting'], 'choices': GIPSYXData.ELEV_DEP_WEIGHTINGS, 'help': GIPSYXData.ELEV_DEP_WEIGHTING_HELP }, 'elev_angle_cutoff': { 'type': float, 'default': GIPSYXData.PROCESSING_MODE_SETTINGS[GIPSYXData.STATIC]['elev_angle_cutoff'], 'help': GIPSYXData.ELEV_ANGLE_CUTOFF_HELP }, 'solution_period': { 'type': float, 'default': GIPSYXData.PROCESSING_MODE_SETTINGS[GIPSYXData.STATIC]['solution_period'], 'help': GIPSYXData.SOLUTION_PERIOD_HELP }, 'generate_quaternions': { 'type': bool, 'default': GIPSYXData.PROCESSING_MODE_SETTINGS[GIPSYXData.STATIC]['generate_quaternions'], 'help': GIPSYXData.GENERATE_QUATERNIONS_HELP }, 'email_notify': { 'type': bool, 'default': Data.EMAIL_NOTIFY_DEFAULT, 'help': Data.EMAIL_NOTIFY_HELP, 'short': 'e' }, }, 'profile': { 'name': { 'type': str, 'help': 'Name of the primary account point-of-contact.' }, 'affiliation': { 'type': str, 'help': 'Company or organization this account is affiliated with.' }, 'address': { 'type': str, 'help': 'The mailing address associated with the account.' }, 'compression_preference': { 'type': str, 'choices': UserProfile.COMPRESSION_PREFERENCES, 'help': UserProfile.COMPRESSION_PREFERENCE_HELP }, 'archive_preference': { 'type': str, 'choices': UserProfile.ARCHIVE_PREFERENCES, 'help': UserProfile.ARCHIVE_PREFERENCE_HELP }, 'keep_source_files': { 'type': bool, 'help': UserProfile.KEEP_SOURCE_FILES_HELP }, 'no_prompt_processing': { 'type': bool, 'help': UserProfile.NO_PROMPT_PROCESSING_HELP }, 'email_notify': { 'type': bool, 'help': UserProfile.EMAIL_NOTIFY_HELP }, 'timezone': { 'type': str, 'help': UserProfile.TIMEZONE_HELP } } }
[docs] @staticmethod def read_settings(settings_file='~/.apps_settings'): if isinstance(settings_file, str): if '~' in settings_file: settings_file = os.path.expanduser(settings_file) if os.path.exists(settings_file): with open(settings_file) as f: try: settings = json.load(f) except Exception as e: raise InvalidConfiguration(str(e)) return settings raise InvalidConfiguration( 'Settings file %s does not exist! You can download yours from %s' % ( settings_file, SETTINGS_FILE_LINK ) )
def __init__( self, settings_file='~/.apps_settings', portal=PORTAL_URL, api_path=DEFAULT_USER_API_PATH, label=None, key=None, secret=None, download_directory=None, trust_env=False, log_level=None ): self.download_directory = download_directory self.trust_env = trust_env self.set_log_level(log_level) self.download_directory_ = None self.api_ = None self.portal_url_ = None self.label_ = None # label of the `portal` node self.settings_file_ = settings_file self.api_key_ = None self.api_secret_ = None if settings_file: if '~' in settings_file: self.settings_file_ = os.path.expanduser(settings_file) self.__logger__.debug('Loading settings from %s', self.settings_file_) settings = self.read_settings(settings_file) self.portal_url_ = settings.get('portal', self.portal_url_) self.api_ = settings.get('api_path', self.api_) self.api_key_ = settings.get('key', self.api_key_) self.api_secret_ = settings.get('secret', self.api_secret_) self.label_ = settings.get('label', self.label_) self.is_staff_ = settings.get('is_staff', self.is_staff_) self.is_superuser_ = settings.get('is_superuser', self.is_superuser_) # passed in parameters may override settings file values if portal is not None and portal != PORTAL_URL or self.portal_url_ is None: self.portal_url_ = portal if api_path is not None and api_path != DEFAULT_USER_API_PATH or self.api_ is None: self.api_ = api_path if key is not None: self.api_key_ = key if secret is not None: self.api_secret_ = secret if label is not None: self.label_ = label if self.api_key_ is None or self.api_secret_ is None: raise InvalidAuthenticationCredentials( '''APPS must be configured with authentication credentials, ''' '''by either supplying a path to a credential file, or by ''' '''explicitly providing your issued key and secret as a tuple ''' '''(key, secret) to the auth parameter. You can download your settings file at %s''' % SETTINGS_FILE_LINK ) if self.portal_url_ is None: raise InvalidConfiguration('APPS must be configured with a portal url.') self.__logger__.info('Establishing client interface to %s/%s', self.portal_url_, self.api_) self.portal_ = PortalClient( portal_url=self.portal_url_, label=self.label_, api=self.api_, api_key=self.api_key_, api_secret=self.api_secret_, trust_env=self.trust_env )
[docs] def update_credential(self): """ The user credential houses some information that might change, to avoid having users re-download it, for certain updates we can just flash updates to disk from their updated profile. """ write = False settings = self.read_settings(self.settings_file_) if self.profile_cache is None: self.profile() for attr in self.MUTABLE_CREDENTIALS: value = self.profile_cache.get(attr, getattr(self, attr, None)) if value != settings.get(attr, getattr(self, attr, None)): write = True settings[attr] = value if write: with open(self.settings_file_, 'w') as settings_out: json.dump(settings, settings_out, indent=4)
@property def is_staff(self): return self.is_staff_ @property def is_superuser(self): return self.is_superuser_
[docs] @staticmethod def set_log_level(level, **kwargs): """ This will setup a basic logging config while setting the root logging level to the given level. (see :ref:`logging_and_output`) :param level: The root logging level, this argument can be either the integer logger level, i.e. logging.DEBUG or a string representation of the log level i.e. 'DEBUG'. This function accepts one additional logger level above what is provided through the Python logging framework, 'OFF' which is equivalent to logging.CRITICAL + 1. If level is None no logging configuration will be done. :param kwargs: Additional arguments to pass to `logging.basicConfig <https://docs.python.org/3/library/logging.html#logging.basicConfig>`_ :return: None """ if level is None: return if isinstance(level, int): numeric_level = level else: if level == 'OFF': numeric_level = logging.CRITICAL + 1 else: numeric_level = getattr(logging, level, None) if not isinstance(numeric_level, int): raise ValueError('Invalid log level: %s' % level) logging.basicConfig(level=numeric_level, **kwargs)
[docs] @staticmethod def is_state(state): """ Is the given string a valid data state representation. This method will recognize short and long forms and is case insensitive. :param state: a string that might represent a state :return: The normalized short form state string represented, or None of the input string was not a valid Data state """ try: return Data.normalize_state(state) except (AttributeError, KeyError): return None
[docs] @staticmethod def is_id(uuid): """ Is the given string a valid data UUID_. This method is case insensitive, but canonically in |APPS| UUID_s are lower case. :param uuid: a string that might represent a UUID_ :return: The normalized lower case form of the UUID_ or None if it is not a valid UUID. """ try: if uuid_regex.match(uuid) is not None: return str(UUID(uuid)).lower() return None except ValueError: return None
[docs] @staticmethod def is_alert_level(lvl): """ Is the given string a valid |APPS| Alert level. This method will recognize short and long forms and is case insensitive. :param lvl: a string that might represent an Alert level :return: The normalized form Alert level string or None if it is not a valid Alert level. """ try: return UserAlert.normalize_level(lvl) except (AttributeError, KeyError): return None
[docs] @staticmethod def is_flag_level(lvl): """ Is the given string a valid |APPS| DataFlag level. This method will recognize short and long forms and is case insensitive. :param lvl: a string that might represent an DataFlag level :return: The normalized form DataFlag level string or None if it is not a valid DataFlag level. """ try: return DataFlag.normalize_level(lvl) except (AttributeError, KeyError): return None
[docs] @staticmethod def base_url(url): """ Parse out the root path of any given URL. :param url: a valid url string :return: The root url path, or None if the input was not able to be parsed """ try: from urllib.parse import urlparse parsed = urlparse(url) return parsed.scheme + '://' + parsed.netloc except ImportError: try: from urlparse import urlparse parsed = urlparse(url) return parsed.scheme + '://' + parsed.netloc except ImportError: return None
@property def download_directory(self): """ The str containing the path of the directory where files downloaded from |APPS| will be stored. If the path does not exist, the directories will be recursively created. If set to None, the current working directory will be used. :exception InvalidConfiguration: if the path does not exist and was not able to be created or exists but is not a directory """ if self.download_directory_ is None: return os.getcwd() return self.download_directory_ @download_directory.setter def download_directory(self, dr): if dr is not None: if not os.path.exists(dr): try: os.makedirs(dr) except OSError as e: if e.errno != errno.EEXIST: raise InvalidConfiguration('Error, cannot create download directory %s: %s' % (dr, str(e))) elif not os.path.isdir(dr): raise InvalidConfiguration('Error, download directory is not a directory: %s' % dr) self.download_directory_ = dr
[docs] @staticmethod def format_size(byts): """ Given a number of bytes, return the most compact representation of it in SI units. The scale factor used is 1024 not 1000. For example: .. code-block:: >>> from gdgps_apps.apps import APPS >>> APPS.format_size(12312312) '11.742MB' :param byts: The number of bytes to format, any type convertible to a float is accepted :return: the scaled str representation of the bytes with the with the appropriate SI symbol :exception ValueError: If input is not float or convertible to a float """ if not isinstance(byts, float): byts = float(byts) for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']: if abs(byts) < 1024.0: return "%3.3f%s" % (byts, unit) byts /= 1024.0 return "%.3f%s" % (byts, 'YB')
[docs] def id_for_name(self, name): """ Fetch the UUID_ of data that has the given name. This method will optimistically search the local cache first and if that fails it will query |APPS|. :param name: A str containing the name of the data to search for. :return: The UUID_ of the data with the given name as a str, or None if data of the given name does not exist. """ if name not in self.name_to_id: self._refresh_data_cache(query={'filter{name}': name}) return self.name_to_id.get(name, None)
[docs] def exists(self, identifier): """ Check if data of a given name or UUID_ exists. :param identifier: Either a name or UUID_ of a given Data item :return: True if data of the given name or UUID_ exists, False otherwise """ if identifier in self.name_to_id: return True if identifier.lower() in self.data_cache: return True self._refresh_data_cache() if identifier in self.name_to_id: return True if identifier.lower() in self.data_cache: return True return False
[docs] def flags(self, dataid=None, query=None): if dataid is not None: return self.detail(dataid, query=query)['flags'] return self.portal_.flags(query=query)
[docs] def delete_flag(self, uuid): uuid = self._verify_id(uuid) try: return self.portal_.delete_flag(uuid).ok except HTTPError as e: if e.response.status_code == 404: raise FlagNotFound('Flag %s was not found!' % (uuid,), uuid=uuid) elif e.response.status_code == 403: raise InvalidOperation(e.response.text, uuid=uuid) raise e
[docs] def data_in_states(self, states): if isinstance(states, str): states = [Data.normalize_state(states)] else: states = [Data.normalize_state(state) for state in states] return [uuid for uuid, detail in self.data_cache.items() if detail['state'] in states]
[docs] def profile(self, query=None): self.profile_cache = self.portal_.profile(query=query) return self.profile_cache
[docs] def sources(self, query=None): return self.portal_.sources(query=query)
[docs] def get_source(self, sid, query=None): """ Fetch the data source object given its UUID. :param sid: The UUID of the data submission's source object. :param query: Additional query parameters (if supported) :return: A dictionary representation of the data source. TODO see webservice API doc """ return self.portal_.get_source(sid, query=query)
[docs] def upload_source(self, dataid, file, source_type=None, stream=False, progress_hook=None, **kwargs): """ Upload a file to the specified data submission as an additional source file. Note the data must be in a mutable state for this operation to work. (i.e. pre-processed and not currently verifying). All upload calls provide hooks that enable feedback to users regarding upload progress. TQDM_ is a popular python library for providing in-terminal progress bars. For example, one might provide a progress bar using TQDM_ and this function like so: .. code-block:: from tqdm import tqdm with tqdm( total=os.path.getsize(pressurefile), ncols=80, unit='B', unit_scale=True, unit_divisor=1024, postfix={'file': os.path.basename(pressurefile)} ) as progress_bar: self.apps.upload_source( data_uuid, pressurefile, source_type=defines.GIPSYXData.PRESSURE_FILE, stream=True, progress_hook=lambda bytes_read: progress_bar.update(bytes_read) ) :param dataid: The UUID of the data submission to attach this source file to :param file: The path to the source file :param source_type: Optionally tell APPS what type of source file this is. :param stream: If true, stream the upload. This avoids loading the entire file into memory. :param progress_hook: The progress hook can be any callable that takes the number of bytes read. Its primary use is to enable status updates for the upload. Note, passing a progress_hook will force the upload into streaming mode. :param kwargs: Additional parameters to pass to APPS for source creation. See the webservices API. :return: """ client = self.portal_ if dataid in self.cached_data(): client = self.get_processor_of_data(dataid) return client.upload_source( dataid, file, source_type=source_type, stream=stream, progress_hook=progress_hook, **kwargs )
[docs] def delete_source(self, selector): """ Delete the specified source file from the APPS servers. :param selector: Either a source identifier, or a 2-tuple where the first element is the data identifier and the second element is either None (for all sources), the source type or the name of the source. :return: A list containing the sources that were deleted, if any. """ if not isinstance(selector, str) and hasattr(selector, '__iter__'): assert(len(selector) == 2) assert(self.is_id(selector[0])) typ = None name = None if selector[1]: assert(isinstance(selector[1], str)) if selector[1]: try: typ = GIPSYXData.normalize_source_type(selector[1]) except KeyError: name = selector[1].lower() to_delete = [] for src in self.get_data_parameter(selector[0], 'source_types', force_refresh=True): src['data'] = selector[0] if typ is None and name is None: to_delete.append(src) elif typ is not None and src['source_type'] == typ: to_delete.append(src) elif src['name'].lower() == name: to_delete.append(src) deleted = [] for src in to_delete: if self.portal_.delete_source(src['id']).status_code < 300: deleted.append(src) return deleted else: assert(self.is_id(selector)) src = self.portal_.get_source(selector) if self.portal_.delete_source(selector).status_code < 300: return [src] return 0
[docs] def update_profile(self, **kwargs): if self.profile_cache is None: self.profile() try: return self.portal_.update_profile(userid=self.profile_cache['id'], **kwargs).ok except HTTPError as e: if e.response.status_code == 404: raise ArtifactNotFound( 'Profile %s was not found%s' % ( self.profile_cache['id'], ': ' + e.response.text if len(e.response.text) > 0 else '.' ), uuid=self.profile_cache['id'] ) elif e.response.status_code == 403: raise UnauthorizedOperation( 'Insufficient permissions to update profile parameters %s. %s' % ( dict(**kwargs), frmt_resp(e.response) ) ) raise e
[docs] def alerts(self, query=None): return self.portal_.alerts(query=query)
[docs] def delete_alert(self, uuid): uuid = self._verify_id(uuid) try: return self.portal_.delete_alert(uuid).ok except HTTPError as e: if e.response.status_code == 404: raise AlertNotFound('Alert %s was not found!' % (uuid,), uuid=uuid) elif e.response.status_code == 403: raise InvalidOperation(e.response.text, uuid=uuid) raise e
[docs] def approve(self, dataid): """ Approve the given data item for processing. This will only have an effect for data in the Verified state. :param dataid: The UUID of the data item to approve :return: """ dataid = self._verify_id(dataid) try: return self.portal_.approve(dataid).ok except HTTPError as e: if e.response.status_code == 404: raise DataNotFound('Data %s was not found!' % (dataid,), uuid=dataid) elif e.response.status_code == 403: raise InvalidOperation( 'Unable to approve processing of data %s. %s' % ( dataid, e.response.text ), uuid=dataid ) raise e
[docs] def list_processors(self, query=None): """ List the processors that are currently online and available to the user as well as their respective loads :param query: Additional query parameters :return: A list containing dictionaries for each available processor. For example: .. code-block:: { 'label': 'pppx1la', # the label of the node 'load': , 'active': True, 'products': } """ self._resolve_processors(query=query) return [{ 'label': lbl, 'load': p['load'], 'active': p['active'], 'products': p['products'], 'error': str(p['error']) } for lbl, p in self.processors.items()]
[docs] def upload_gipsyx( self, file, antCal=None, pressure=None, attitude=None, stream=False, processor=None, progress_hook=None, **kwargs ): """ Upload GNSS data for processing by the GipsyX software suite. It is recommended that you compress your data before upload. Even so, some data is quite large and a progress hook can be supplied to facilitate progression information to the user. For instance, TQDM_ is a popular terminal based progress bar library. An example of calling this function with a TQDM progress bar you could do something like: .. code-block:: from tqdm import tqdm with tqdm( total=os.path.getsize(file), ncols=80, unit='B', unit_scale=True, unit_divisor=1024, postfix= {'file': os.path.basename(file)} ) as progress_bar: ret = apps.upload_gipsyx( file, antCal=antCal, pressure=pressure, attitude=attitude, stream=True, processor=processor, progress_hook=lambda bytes_read: progress_bar.update(bytes_read), ) .. note:: If a progress_hook is supplied the call will automatically set stream to True If interactive user feedback is not desired you may simply upload data like so: .. code-block:: from pprint import pprint response = apps.upload_gipsyx( rinex_file_path, processing_mode=apps.defines.GIPSYXData.KINEMATIC, model_pressure=True, ) upload_id = response['id'] pprint(apps.detail(upload_id)) The above code would upload a RINEX file for kinematic processing and use NCEP data to model atmospheric pressure. It then pulls the data UUID from the response json and uses it to fetch and pretty print complete details about the submission. :param file: A RINEX file, or a tar file containing a RINEX file and any ancillary source files, such as pressure files or attitude quaternion files. Files may be compressed using bz2, gzip, zip, lzma or Unix compression. :param antCal: The path to the antenna calibration file to upload, if there is one. :param pressure: The path to the pressure file to upload, if there is one. :param attitude: The path to the attitude quaternion file, if there is one. Only helpful for kinematic processing. :param stream: True if the upload should be streamed, False otherwise. If streaming is enabled the return value of this function will be a requests response object, otherwise the json response of the server will be returned. :param processor: The label of the processor to use. You should normally leave this blank, APPS will automatically select the least loaded processor to send your data to. :param progress_hook: A callable that exceptions one argument, which is the number of bytes read since the last :param kwargs: Any additional parameters accepted by the APPS webservice for configuring the processing of GNSS data. For instance, to set processing mode to Kinematic you would supply processing_mode=apps.defines.GIPSYXData.KINEMATIC :return: If not streaming, a json response body from the APPS webservice, if streaming a requests library response object. """ self._resolve_processors() processor = self._select_processor(label=processor) if processor is None: raise NoAvailableProcessors( '''No processors were reachable for upload. If this condition persists please notify ''' '''the administrators. The following processors were tried: %s''' % ({lbl: str(p['error']) for lbl, p in self.processors.items()},)) # be sure to apply use case defaults here, APPS cant do this at the server we have to do it here, because the # user is allowed to provide non-default values for a use case pmode = kwargs.get('processing_mode', None) if pmode: for param, default in GIPSYXData.PROCESSING_MODE_SETTINGS[pmode].items(): if param not in kwargs: kwargs[param] = default return processor.upload_gipsyx( file, antCal=antCal, pressure=pressure, attitude=attitude, stream=stream, progress_hook=progress_hook, **kwargs )
[docs] def cached_data(self): return self.data_cache
[docs] def list_data(self, query=None): return self._refresh_data_cache(query=query)
[docs] def delete_data(self, uuid): uuid = self._verify_id(uuid) try: return self.portal_.delete_data(uuid).ok except HTTPError as e: if e.response.status_code == 404: raise DataNotFound('Data %s was not found!' % (uuid,), uuid=uuid) elif e.response.status_code == 403: raise InvalidOperation(e.response.text, uuid=uuid) raise e
[docs] def detail(self, uuid, query=None): return self._refresh_data_cache(uuid=uuid, query=query)
[docs] def update_data(self, uuid, **kwargs): uuid = self._verify_id(uuid) typ = self.data_cache[uuid]['type'] if uuid in self.data_cache else self.detail(uuid)['type'] try: client = self.get_processor_of_data(uuid) if client is None: client = self.portal_ pmode = kwargs.get('processing_mode', None) if pmode: for param, default in GIPSYXData.PROCESSING_MODE_SETTINGS[pmode].items(): if param not in kwargs: kwargs[param] = default return client.update_data(uuid, endpoint=typ, **kwargs) except HTTPError as e: if e.response.status_code == 404: raise DataNotFound('Data %s was not found!' % (uuid,), uuid=uuid) elif e.response.status_code == 403: raise InvalidOperation(e.response.text, uuid=uuid) raise e
[docs] def download_run( self, uuid, stream=True, write_to_disk=True, dr=None, query=None ): uuid = self._verify_id(uuid) processor = self.get_processor_of_data(uuid) if processor is not None: return processor.download( '/'.join([processor.url.rstrip('/'), self.portal_.label, 'files', str(uuid), 'directory/run']), stream=stream, write_to_disk=write_to_disk, dr=dr if dr is not None else self.download_directory, query=query )
[docs] def download_source( self, uuid, source_type=None, stream=True, write_to_disk=True, dr=None, query=None ): uuid = self._verify_id(uuid) client_link = self._get_src_link_and_client(uuid, source_type) if client_link is None: return None return client_link[0].download( client_link[1], stream=stream, write_to_disk=write_to_disk, dr=dr if dr is not None else self.download_directory, query=query )
[docs] def download_result( self, uuid, result_type=None, stream=True, write_to_disk=True, dr=None, query=None, ): """ Download the results of a given submission. :param uuid: The identifier of the data submission to download results for. :param result_type: Optional, the type of result to get. If None or unspecified all results will be downloaded as in an archive. :param stream: :param write_to_disk: :param dr: The directory to write to if different that the global APPS configured download directory. :param query: :return: """ uuid = self._verify_id(uuid) client_link = self._get_res_link_and_client(uuid, result_type) if client_link is None: return None return client_link[0].download( client_link[1], stream=stream, write_to_disk=write_to_disk, dr=dr if dr is not None else self.download_directory, query=query )
[docs] def get_processor_of_data(self, uuid): """ Get the client associated with the processor node that the given data resides on. :param uuid: The identifier of the data submission in question :return: The client associated with the data's processor or None if one could not be found """ return self.get_processor(self.get_data_parameter(uuid, 'location'))
[docs] def get_processor(self, location, link=None): """ Get the client associated with the processor at the specified location or build one from the given link. :param location: The label of the processor :param link: A URL at which the processor can be found - will only be used if the label of the processor cant be found. :return: The client associated with the processor or None if one could not be found or constructed from a link """ if location not in self.processors: self._resolve_processors() if location not in self.processors: # this could happen if a processor has been removed from the list of # approved processors for a user, but the user still has some data on # the processor # DO NOT add this temporary client to the list of processors if link is not None: return ProcessorClient( processor_url=APPS.base_url(link), api_key=self.api_key_, api_secret=self.api_secret_, trust_env=self.trust_env ) return None else: return self.processors[location]['client'] return self.processors[location]['client']
[docs] def ping(self): """ Check connectivity to all |APPS| servers. This will also refresh the known loads of the |APPS| processing servers your account is allowed to use. Its not typically necessary to explicity call this function, other functions that need to refresh this information will usually do so under the covers. :return: A tuple containing either ( True, response ) or ( False, None ) if your |APPS| portal was reachable. The response will be the json string response from the portal containing load data """ self._refresh_processors() return self.portal_.ping()
def _get_res_link_and_client(self, uuid, result_type=None, verify=True): """ Build the result download link for the given data identifier and result type. :param uuid: The identifier of the data submission :param result_type: The type of the result to get the URI for, or None if you want to fetch all results :param verify: If True, verify that the result exists before building the link and return None if it does not :return: A 2-tuple holding the processor client that should handle the download request and the URI link to download it, or if verify is specified and the requested result does not exist, None is returned """ state = self.get_data_parameter(uuid, 'state') if verify else None if verify and not (state == Data.AVAILABLE or state == Data.RETRIEVED): return None processor = self.get_processor_of_data(uuid) if processor is not None: link = [processor.url.rstrip('/'), self.portal_.label, 'files', str(uuid), 'result'] if result_type and not result_type == GIPSYXData.ARCHIVE_FILE: if verify: found = False for st in self.get_data_parameter(uuid, 'result_types'): if st['result_type'] == result_type: found = True break if not found: return None link += result_type return processor, '/'.join(link) return None def _get_src_link_and_client(self, uuid, source_type=None, verify=True): """ Build the source download link for the given data identifier and source type. :param uuid: The identifier of the data submission :param source_type: The type of the source to get the URI for, or None if you want to fetch all sources :param verify: If True, verify that the result exists before building the link and return None if it does not :return: A 2-tuple holding the processor client that should handle the download request and the URI link to download it, or if verify is specified and the requested result does not exist, None is returned """ src_available = self.get_data_parameter(uuid, 'src_available') if verify else None if verify and not src_available: return None processor = self.get_processor_of_data(uuid) if processor is not None: link = [processor.url.rstrip('/'), self.portal_.label, 'files', str(uuid), 'source'] if source_type: if verify: found = False for st in self.get_data_parameter(uuid, 'source_types'): if st['source_type'] == source_type: found = True break if not found: return None link += source_type return processor, '/'.join(link) return None
[docs] def get_data_parameter(self, uuid, param, force_refresh=False): if force_refresh: self._refresh_data_cache(uuid) data = self.data_cache.get(uuid, None) if data is None: data = self._refresh_data_cache(uuid) # , query={ 'include[]': param } ) return data.get(param, None)
def _refresh_data_cache(self, uuid=None, query=None): if uuid is not None: uuid = self._verify_id(uuid) try: if uuid not in self.data_cache: data = self.portal_.detail(uuid, dtype='data', query=query) else: data = self.data_cache[uuid] data = self.portal_.detail(uuid, dtype=data['type'], query=query) if uuid in self.data_cache: self.data_cache[uuid].update(data) else: self.data_cache[uuid] = data self.name_to_id[data['name']] = uuid return data except HTTPError as e: if e.response.status_code == 404: # todo search processor nodes for data - might not have replicated to portal yet raise DataNotFound( 'Data %s was not found%s' % ( uuid, ': ' + e.response.text if len(e.response.text) > 0 else '.' ), uuid=uuid ) elif e.response.status_code == 403: raise UnauthorizedOperation( 'Insufficient permissions to fetch details for data %s. %s' % ( uuid, frmt_resp(e.response) ) ) raise e else: try: data_lst = self.portal_.list_data(query=query) for d in data_lst: if 'id' in d: if d['id'] in self.data_cache: oldname = self.data_cache[d['id']].get('name', None) if oldname is not None: del self.name_to_id[oldname] self.data_cache[d['id']].update(d) else: self.data_cache[d['id']] = d if 'name' in d: self.name_to_id[d['name']] = d['id'] except HTTPError as e: if e.response.status_code == 403: raise UnauthorizedOperation('Insufficient permissions to list data%s' % (frmt_resp(e.response),)) raise e return data_lst def _verify_id(self, uuid): uuid = uuid.strip() if not self.is_id(uuid): raise InvalidIdentifier('%s is not a valid identifier!' % (uuid,), uuid=uuid) return uuid def _resolve_processors(self, query=None): procs = self.portal_.processors(query=query) self.processors = { p['label']: { 'client': ProcessorClient( processor_url=p['url'], api_key=self.api_key_, api_secret=self.api_secret_, api=p['user_api'], label=p['label'], ), 'active': True, 'load': p['load'], 'products': p['products'] } for p in procs } self._refresh_processors() def _refresh_processors(self): for lbl, p in self.processors.items(): (p['active'], meta) = p['client'].ping() if p['active']: p['load'] = meta['load'] p['error'] = None else: p['load'] = None p['error'] = meta def _select_processor(self, label=None): self._refresh_processors() m = sys.maxsize processor = None if label: processor = self.processors.get(label, {'client': None})['client'] else: for lbl, p in self.processors.items(): if p['active']: if p['load'] is not None and p['load'] < m: m = p['load'] processor = p['client'] return processor