from .client import (
PortalClient,
ProcessorClient
)
from .defines import *
from .exceptions import (
InvalidAuthenticationCredentials,
InvalidConfiguration,
NoAvailableProcessors,
DataNotFound,
AlertNotFound,
InvalidOperation,
InvalidIdentifier,
UnauthorizedOperation,
ArtifactNotFound,
FlagNotFound
)
from requests.exceptions import HTTPError
import errno
import os
import json
import logging
import re
import sys
from uuid import UUID
def frmt_resp(response):
return ': ' + response.text if len(response.text) > 0 else '.'
uuid_regex = re.compile('[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$')
[docs]class APPS(object):
"""
Establish a session with the |APPS| servers.
The APPS class is the main interface for interaction with |APPS|. It greatly simplifies usage of |APPS| by
automatically handling tasks such as data caching to speed up and reduce the amount of back and forth communication
between your local machine and the |APPS| servers. |APPS| is comprised of more than a single server which adds
complexity to interfacing with it. The portal server is the webserver that your account settings and credentials are
housed and accessed from and there are several |APPS| processing servers that data is uploaded to and downloaded
from. This class abstracts away the complexity of interfacing with |APPS| by automatically selecting the current
fastest |APPS| processing server to upload data to and by keeping track of where your data is located and how to
retrieve it.
To use the APPS class you will need your apps_settings file which contains your authorization credential. The APPS
library will sign all your API requests with this credential to keep your data private and secure. Be sure to keep
your settings file in a safe, secret place! If at any time you think your account has been compromised you can
`reset your credential`_.
:param settings_file: Path to the settings file containing your authorization credentials.
:param portal: URL of the APPs portal to use. Most likely you will never need to override the default. Supplying
this parameter will override the parameter from your settings file.
:param api_path: The path of the API to use. Supplying this parameter will override the parameter from your
settings file.
:param key: Manually supply your API credential's key. This will override the parameter from your settings file.
:param secret: Manually supply your API credential's secret. This will override the parameter from your settings
file.
:param download_directory: The directory that any files downloaded from apps will be stored in. By default this
is the current working directory
:param trust_env: Trust environment settings for proxy configuration, default authentication and similar.
:param log_level: Set the logging level, OFF is silent and DEBUG is the highest verbosity. Available levels are
{OFF,DEBUG,INFO,WARNING,ERROR,CRITICAL}. If log_level is None, no logging configuration will be performed.
(see :ref:`logging_and_output` and :meth:`gdgps_apps.apps.APPS.set_log_level`).
:exception InvalidConfiguration: If there is an unrecoverable problem with the supplied configuration.
:exception InvalidAuthenticationCredentials: If the supplied authentication credentials are not supplied or are not
of the correct types.
"""
__logger__ = logging.getLogger(__name__ + '.APPS')
default_settings_path = '~/.apps_settings'
profile_cache = None
data_cache = {}
name_to_id = {}
processors = {}
stream_chunk_size = 8128
MUTABLE_CREDENTIALS = ['is_staff', 'is_superuser']
is_staff_ = False
is_superuser_ = False
OPTIONS = {
'gipsyxdata': {
'name': {
'type': str,
'default': '<filename>',
'help': (
'The name to use for the data item. By default the name of the file will be used. The system '
'enforces name uniqueness, so a numeric may be added to the end after upload if there is a naming '
'clash.'),
'short': 'n'
},
'processing_mode': {
'type': GIPSYXData.normalize_processing_mode,
'default': GIPSYXData.STATIC,
'choices': GIPSYXData.PROCESSING_MODES,
'help': GIPSYXData.PROCESSING_MODE_HELP,
'short': 'm'
},
'model_pressure': {
'type': bool,
'default': False,
'help': GIPSYXData.MODEL_PRESSURE_HELP,
},
'troposphere_model': {
'type': GIPSYXData.normalize_troposphere_model,
'default': False,
'choices': GIPSYXData.TROPOSPHERE_MODELS,
'help': GIPSYXData.TROPOSPHERE_MODEL_HELP,
},
'model_tides': {
'type': bool,
'default': GIPSYXData.PROCESSING_MODE_SETTINGS[GIPSYXData.STATIC]['model_tides'],
'help': GIPSYXData.MODEL_TIDES_HELP,
},
'ocean_loading': {
'type': bool,
'default': GIPSYXData.PROCESSING_MODE_SETTINGS[GIPSYXData.STATIC]['ocean_loading'],
'help': GIPSYXData.OCEAN_LOADING_HELP,
},
'product': {
'type': GIPSYXData.normalize_product,
'default': GIPSYXData.PRODUCT_DEFAULT,
'choices': GIPSYData.PRODUCTS,
'help': GIPSYData.PRODUCT_HELP,
'short': 'p'
},
'elev_dep_weighting': {
'type': GIPSYXData.normalize_elev_dep_weighting,
'default': GIPSYXData.PROCESSING_MODE_SETTINGS[GIPSYXData.STATIC]['elev_dep_weighting'],
'choices': GIPSYXData.ELEV_DEP_WEIGHTINGS,
'help': GIPSYXData.ELEV_DEP_WEIGHTING_HELP
},
'elev_angle_cutoff': {
'type': float,
'default': GIPSYXData.PROCESSING_MODE_SETTINGS[GIPSYXData.STATIC]['elev_angle_cutoff'],
'help': GIPSYXData.ELEV_ANGLE_CUTOFF_HELP
},
'solution_period': {
'type': float,
'default': GIPSYXData.PROCESSING_MODE_SETTINGS[GIPSYXData.STATIC]['solution_period'],
'help': GIPSYXData.SOLUTION_PERIOD_HELP
},
'generate_quaternions': {
'type': bool,
'default': GIPSYXData.PROCESSING_MODE_SETTINGS[GIPSYXData.STATIC]['generate_quaternions'],
'help': GIPSYXData.GENERATE_QUATERNIONS_HELP
},
'email_notify': {
'type': bool,
'default': Data.EMAIL_NOTIFY_DEFAULT,
'help': Data.EMAIL_NOTIFY_HELP,
'short': 'e'
},
},
'profile': {
'name': {
'type': str,
'help': 'Name of the primary account point-of-contact.'
},
'affiliation': {
'type': str,
'help': 'Company or organization this account is affiliated with.'
},
'address': {
'type': str,
'help': 'The mailing address associated with the account.'
},
'compression_preference': {
'type': str,
'choices': UserProfile.COMPRESSION_PREFERENCES,
'help': UserProfile.COMPRESSION_PREFERENCE_HELP
},
'archive_preference': {
'type': str,
'choices': UserProfile.ARCHIVE_PREFERENCES,
'help': UserProfile.ARCHIVE_PREFERENCE_HELP
},
'keep_source_files': {
'type': bool,
'help': UserProfile.KEEP_SOURCE_FILES_HELP
},
'no_prompt_processing': {
'type': bool,
'help': UserProfile.NO_PROMPT_PROCESSING_HELP
},
'email_notify': {
'type': bool,
'help': UserProfile.EMAIL_NOTIFY_HELP
},
'timezone': {
'type': str,
'help': UserProfile.TIMEZONE_HELP
}
}
}
[docs] @staticmethod
def read_settings(settings_file='~/.apps_settings'):
if isinstance(settings_file, str):
if '~' in settings_file:
settings_file = os.path.expanduser(settings_file)
if os.path.exists(settings_file):
with open(settings_file) as f:
try:
settings = json.load(f)
except Exception as e:
raise InvalidConfiguration(str(e))
return settings
raise InvalidConfiguration(
'Settings file %s does not exist! You can download yours from %s' % (
settings_file,
SETTINGS_FILE_LINK
)
)
def __init__(
self,
settings_file='~/.apps_settings',
portal=PORTAL_URL,
api_path=DEFAULT_USER_API_PATH,
label=None,
key=None,
secret=None,
download_directory=None,
trust_env=False,
log_level=None
):
self.download_directory = download_directory
self.trust_env = trust_env
self.set_log_level(log_level)
self.download_directory_ = None
self.api_ = None
self.portal_url_ = None
self.label_ = None # label of the `portal` node
self.settings_file_ = settings_file
self.api_key_ = None
self.api_secret_ = None
if settings_file:
if '~' in settings_file:
self.settings_file_ = os.path.expanduser(settings_file)
self.__logger__.debug('Loading settings from %s', self.settings_file_)
settings = self.read_settings(settings_file)
self.portal_url_ = settings.get('portal', self.portal_url_)
self.api_ = settings.get('api_path', self.api_)
self.api_key_ = settings.get('key', self.api_key_)
self.api_secret_ = settings.get('secret', self.api_secret_)
self.label_ = settings.get('label', self.label_)
self.is_staff_ = settings.get('is_staff', self.is_staff_)
self.is_superuser_ = settings.get('is_superuser', self.is_superuser_)
# passed in parameters may override settings file values
if portal is not None and portal != PORTAL_URL or self.portal_url_ is None:
self.portal_url_ = portal
if api_path is not None and api_path != DEFAULT_USER_API_PATH or self.api_ is None:
self.api_ = api_path
if key is not None:
self.api_key_ = key
if secret is not None:
self.api_secret_ = secret
if label is not None:
self.label_ = label
if self.api_key_ is None or self.api_secret_ is None:
raise InvalidAuthenticationCredentials(
'''APPS must be configured with authentication credentials, '''
'''by either supplying a path to a credential file, or by '''
'''explicitly providing your issued key and secret as a tuple '''
'''(key, secret) to the auth parameter. You can download your settings file at %s''' %
SETTINGS_FILE_LINK
)
if self.portal_url_ is None:
raise InvalidConfiguration('APPS must be configured with a portal url.')
self.__logger__.info('Establishing client interface to %s/%s', self.portal_url_, self.api_)
self.portal_ = PortalClient(
portal_url=self.portal_url_,
label=self.label_,
api=self.api_,
api_key=self.api_key_,
api_secret=self.api_secret_,
trust_env=self.trust_env
)
[docs] def update_credential(self):
"""
The user credential houses some information that might change, to avoid having users re-download it, for certain
updates we can just flash updates to disk from their updated profile.
"""
write = False
settings = self.read_settings(self.settings_file_)
if self.profile_cache is None:
self.profile()
for attr in self.MUTABLE_CREDENTIALS:
value = self.profile_cache.get(attr, getattr(self, attr, None))
if value != settings.get(attr, getattr(self, attr, None)):
write = True
settings[attr] = value
if write:
with open(self.settings_file_, 'w') as settings_out:
json.dump(settings, settings_out, indent=4)
@property
def is_staff(self):
return self.is_staff_
@property
def is_superuser(self):
return self.is_superuser_
[docs] @staticmethod
def set_log_level(level, **kwargs):
"""
This will setup a basic logging config while setting the root logging level to the given level.
(see :ref:`logging_and_output`)
:param level: The root logging level, this argument can be either the integer logger level, i.e. logging.DEBUG
or a string representation of the log level i.e. 'DEBUG'. This function accepts one additional logger level
above what is provided through the Python logging framework, 'OFF' which is equivalent to
logging.CRITICAL + 1. If level is None no logging configuration will be done.
:param kwargs: Additional arguments to pass to
`logging.basicConfig <https://docs.python.org/3/library/logging.html#logging.basicConfig>`_
:return: None
"""
if level is None:
return
if isinstance(level, int):
numeric_level = level
else:
if level == 'OFF':
numeric_level = logging.CRITICAL + 1
else:
numeric_level = getattr(logging, level, None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % level)
logging.basicConfig(level=numeric_level, **kwargs)
[docs] @staticmethod
def is_state(state):
"""
Is the given string a valid data state representation. This method will recognize short and long forms and is
case insensitive.
:param state: a string that might represent a state
:return: The normalized short form state string represented, or None of the input string was not a valid Data
state
"""
try:
return Data.normalize_state(state)
except (AttributeError, KeyError):
return None
[docs] @staticmethod
def is_id(uuid):
"""
Is the given string a valid data UUID_. This method is case insensitive, but canonically in |APPS| UUID_s are
lower case.
:param uuid: a string that might represent a UUID_
:return: The normalized lower case form of the UUID_ or None if it is not a valid UUID.
"""
try:
if uuid_regex.match(uuid) is not None:
return str(UUID(uuid)).lower()
return None
except ValueError:
return None
[docs] @staticmethod
def is_alert_level(lvl):
"""
Is the given string a valid |APPS| Alert level. This method will recognize short and long forms and is
case insensitive.
:param lvl: a string that might represent an Alert level
:return: The normalized form Alert level string or None if it is not a valid Alert level.
"""
try:
return UserAlert.normalize_level(lvl)
except (AttributeError, KeyError):
return None
[docs] @staticmethod
def is_flag_level(lvl):
"""
Is the given string a valid |APPS| DataFlag level. This method will recognize short and long forms and is
case insensitive.
:param lvl: a string that might represent an DataFlag level
:return: The normalized form DataFlag level string or None if it is not a valid DataFlag level.
"""
try:
return DataFlag.normalize_level(lvl)
except (AttributeError, KeyError):
return None
[docs] @staticmethod
def base_url(url):
"""
Parse out the root path of any given URL.
:param url: a valid url string
:return: The root url path, or None if the input was not able to be parsed
"""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
return parsed.scheme + '://' + parsed.netloc
except ImportError:
try:
from urlparse import urlparse
parsed = urlparse(url)
return parsed.scheme + '://' + parsed.netloc
except ImportError:
return None
@property
def download_directory(self):
"""
The str containing the path of the directory where files downloaded from |APPS| will be stored. If the path does
not exist, the directories will be recursively created. If set to None, the current working directory will be
used.
:exception InvalidConfiguration: if the path does not exist and was not able to be created or exists but is not
a directory
"""
if self.download_directory_ is None:
return os.getcwd()
return self.download_directory_
@download_directory.setter
def download_directory(self, dr):
if dr is not None:
if not os.path.exists(dr):
try:
os.makedirs(dr)
except OSError as e:
if e.errno != errno.EEXIST:
raise InvalidConfiguration('Error, cannot create download directory %s: %s' % (dr, str(e)))
elif not os.path.isdir(dr):
raise InvalidConfiguration('Error, download directory is not a directory: %s' % dr)
self.download_directory_ = dr
[docs] def id_for_name(self, name):
"""
Fetch the UUID_ of data that has the given name. This method will optimistically search the local cache first
and if that fails it will query |APPS|.
:param name: A str containing the name of the data to search for.
:return: The UUID_ of the data with the given name as a str, or None if data of the given name does not exist.
"""
if name not in self.name_to_id:
self._refresh_data_cache(query={'filter{name}': name})
return self.name_to_id.get(name, None)
[docs] def exists(self, identifier):
"""
Check if data of a given name or UUID_ exists.
:param identifier: Either a name or UUID_ of a given Data item
:return: True if data of the given name or UUID_ exists, False otherwise
"""
if identifier in self.name_to_id:
return True
if identifier.lower() in self.data_cache:
return True
self._refresh_data_cache()
if identifier in self.name_to_id:
return True
if identifier.lower() in self.data_cache:
return True
return False
[docs] def flags(self, dataid=None, query=None):
if dataid is not None:
return self.detail(dataid, query=query)['flags']
return self.portal_.flags(query=query)
[docs] def delete_flag(self, uuid):
uuid = self._verify_id(uuid)
try:
return self.portal_.delete_flag(uuid).ok
except HTTPError as e:
if e.response.status_code == 404:
raise FlagNotFound('Flag %s was not found!' % (uuid,), uuid=uuid)
elif e.response.status_code == 403:
raise InvalidOperation(e.response.text, uuid=uuid)
raise e
[docs] def data_in_states(self, states):
if isinstance(states, str):
states = [Data.normalize_state(states)]
else:
states = [Data.normalize_state(state) for state in states]
return [uuid for uuid, detail in self.data_cache.items() if detail['state'] in states]
[docs] def profile(self, query=None):
self.profile_cache = self.portal_.profile(query=query)
return self.profile_cache
[docs] def sources(self, query=None):
return self.portal_.sources(query=query)
[docs] def get_source(self, sid, query=None):
"""
Fetch the data source object given its UUID.
:param sid: The UUID of the data submission's source object.
:param query: Additional query parameters (if supported)
:return: A dictionary representation of the data source. TODO see webservice API doc
"""
return self.portal_.get_source(sid, query=query)
[docs] def upload_source(self, dataid, file, source_type=None, stream=False, progress_hook=None, **kwargs):
"""
Upload a file to the specified data submission as an additional source file. Note the data must be in a mutable
state for this operation to work. (i.e. pre-processed and not currently verifying). All upload calls provide
hooks that enable feedback to users regarding upload progress. TQDM_ is a popular python library for providing
in-terminal progress bars. For example, one might provide a progress bar using TQDM_ and this function like so:
.. code-block::
from tqdm import tqdm
with tqdm(
total=os.path.getsize(pressurefile),
ncols=80,
unit='B',
unit_scale=True,
unit_divisor=1024,
postfix={'file': os.path.basename(pressurefile)}
) as progress_bar:
self.apps.upload_source(
data_uuid,
pressurefile,
source_type=defines.GIPSYXData.PRESSURE_FILE,
stream=True,
progress_hook=lambda bytes_read: progress_bar.update(bytes_read)
)
:param dataid: The UUID of the data submission to attach this source file to
:param file: The path to the source file
:param source_type: Optionally tell APPS what type of source file this is.
:param stream: If true, stream the upload. This avoids loading the entire file into memory.
:param progress_hook: The progress hook can be any callable that takes the number of bytes read. Its primary use
is to enable status updates for the upload. Note, passing a progress_hook will force the upload into
streaming mode.
:param kwargs: Additional parameters to pass to APPS for source creation. See the webservices API.
:return:
"""
client = self.portal_
if dataid in self.cached_data():
client = self.get_processor_of_data(dataid)
return client.upload_source(
dataid,
file,
source_type=source_type,
stream=stream,
progress_hook=progress_hook,
**kwargs
)
[docs] def delete_source(self, selector):
"""
Delete the specified source file from the APPS servers.
:param selector: Either a source identifier, or a 2-tuple where the first element is the data identifier and the
second element is either None (for all sources), the source type or the name of the source.
:return: A list containing the sources that were deleted, if any.
"""
if not isinstance(selector, str) and hasattr(selector, '__iter__'):
assert(len(selector) == 2)
assert(self.is_id(selector[0]))
typ = None
name = None
if selector[1]:
assert(isinstance(selector[1], str))
if selector[1]:
try:
typ = GIPSYXData.normalize_source_type(selector[1])
except KeyError:
name = selector[1].lower()
to_delete = []
for src in self.get_data_parameter(selector[0], 'source_types', force_refresh=True):
src['data'] = selector[0]
if typ is None and name is None:
to_delete.append(src)
elif typ is not None and src['source_type'] == typ:
to_delete.append(src)
elif src['name'].lower() == name:
to_delete.append(src)
deleted = []
for src in to_delete:
if self.portal_.delete_source(src['id']).status_code < 300:
deleted.append(src)
return deleted
else:
assert(self.is_id(selector))
src = self.portal_.get_source(selector)
if self.portal_.delete_source(selector).status_code < 300:
return [src]
return 0
[docs] def update_profile(self, **kwargs):
if self.profile_cache is None:
self.profile()
try:
return self.portal_.update_profile(userid=self.profile_cache['id'], **kwargs).ok
except HTTPError as e:
if e.response.status_code == 404:
raise ArtifactNotFound(
'Profile %s was not found%s' % (
self.profile_cache['id'],
': ' + e.response.text if len(e.response.text) > 0 else '.'
),
uuid=self.profile_cache['id']
)
elif e.response.status_code == 403:
raise UnauthorizedOperation(
'Insufficient permissions to update profile parameters %s. %s' % (
dict(**kwargs), frmt_resp(e.response)
)
)
raise e
[docs] def alerts(self, query=None):
return self.portal_.alerts(query=query)
[docs] def delete_alert(self, uuid):
uuid = self._verify_id(uuid)
try:
return self.portal_.delete_alert(uuid).ok
except HTTPError as e:
if e.response.status_code == 404:
raise AlertNotFound('Alert %s was not found!' % (uuid,), uuid=uuid)
elif e.response.status_code == 403:
raise InvalidOperation(e.response.text, uuid=uuid)
raise e
[docs] def approve(self, dataid):
"""
Approve the given data item for processing. This will only have an effect for data in the Verified state.
:param dataid: The UUID of the data item to approve
:return:
"""
dataid = self._verify_id(dataid)
try:
return self.portal_.approve(dataid).ok
except HTTPError as e:
if e.response.status_code == 404:
raise DataNotFound('Data %s was not found!' % (dataid,), uuid=dataid)
elif e.response.status_code == 403:
raise InvalidOperation(
'Unable to approve processing of data %s. %s' % (
dataid,
e.response.text
),
uuid=dataid
)
raise e
[docs] def list_processors(self, query=None):
"""
List the processors that are currently online and available to the user as well as their respective loads
:param query: Additional query parameters
:return: A list containing dictionaries for each available processor. For example:
.. code-block::
{
'label': 'pppx1la', # the label of the node
'load': ,
'active': True,
'products':
}
"""
self._resolve_processors(query=query)
return [{
'label': lbl,
'load': p['load'],
'active': p['active'],
'products': p['products'],
'error': str(p['error'])
} for lbl, p in self.processors.items()]
[docs] def upload_gipsyx(
self,
file,
antCal=None,
pressure=None,
attitude=None,
stream=False,
processor=None,
progress_hook=None,
**kwargs
):
"""
Upload GNSS data for processing by the GipsyX software suite. It is recommended that you compress your data
before upload. Even so, some data is quite large and a progress hook can be supplied to facilitate progression
information to the user.
For instance, TQDM_ is a popular terminal based progress bar library. An example of calling this function with
a TQDM progress bar you could do something like:
.. code-block::
from tqdm import tqdm
with tqdm(
total=os.path.getsize(file),
ncols=80,
unit='B',
unit_scale=True,
unit_divisor=1024,
postfix= {'file': os.path.basename(file)}
) as progress_bar:
ret = apps.upload_gipsyx(
file,
antCal=antCal,
pressure=pressure,
attitude=attitude,
stream=True,
processor=processor,
progress_hook=lambda bytes_read: progress_bar.update(bytes_read),
)
.. note:: If a progress_hook is supplied the call will automatically set stream to True
If interactive user feedback is not desired you may simply upload data like so:
.. code-block::
from pprint import pprint
response = apps.upload_gipsyx(
rinex_file_path,
processing_mode=apps.defines.GIPSYXData.KINEMATIC,
model_pressure=True,
)
upload_id = response['id']
pprint(apps.detail(upload_id))
The above code would upload a RINEX file for kinematic processing and use NCEP data to model atmospheric
pressure. It then pulls the data UUID from the response json and uses it to fetch and pretty print complete
details about the submission.
:param file: A RINEX file, or a tar file containing a RINEX file and any ancillary source files, such as
pressure files or attitude quaternion files. Files may be compressed using bz2, gzip, zip, lzma or Unix
compression.
:param antCal: The path to the antenna calibration file to upload, if there is one.
:param pressure: The path to the pressure file to upload, if there is one.
:param attitude: The path to the attitude quaternion file, if there is one. Only helpful for kinematic
processing.
:param stream: True if the upload should be streamed, False otherwise. If streaming is enabled the return value
of this function will be a requests response object, otherwise the json response of the server will be
returned.
:param processor: The label of the processor to use. You should normally leave this blank, APPS will
automatically select the least loaded processor to send your data to.
:param progress_hook: A callable that exceptions one argument, which is the number of bytes read since the last
:param kwargs: Any additional parameters accepted by the APPS webservice for configuring the processing of GNSS
data. For instance, to set processing mode to Kinematic you would supply
processing_mode=apps.defines.GIPSYXData.KINEMATIC
:return: If not streaming, a json response body from the APPS webservice, if streaming a requests library
response object.
"""
self._resolve_processors()
processor = self._select_processor(label=processor)
if processor is None:
raise NoAvailableProcessors(
'''No processors were reachable for upload. If this condition persists please notify '''
'''the administrators. The following processors were tried: %s''' %
({lbl: str(p['error']) for lbl, p in self.processors.items()},))
# be sure to apply use case defaults here, APPS cant do this at the server we have to do it here, because the
# user is allowed to provide non-default values for a use case
pmode = kwargs.get('processing_mode', None)
if pmode:
for param, default in GIPSYXData.PROCESSING_MODE_SETTINGS[pmode].items():
if param not in kwargs:
kwargs[param] = default
return processor.upload_gipsyx(
file,
antCal=antCal,
pressure=pressure,
attitude=attitude,
stream=stream,
progress_hook=progress_hook,
**kwargs
)
[docs] def cached_data(self):
return self.data_cache
[docs] def list_data(self, query=None):
return self._refresh_data_cache(query=query)
[docs] def delete_data(self, uuid):
uuid = self._verify_id(uuid)
try:
return self.portal_.delete_data(uuid).ok
except HTTPError as e:
if e.response.status_code == 404:
raise DataNotFound('Data %s was not found!' % (uuid,), uuid=uuid)
elif e.response.status_code == 403:
raise InvalidOperation(e.response.text, uuid=uuid)
raise e
[docs] def detail(self, uuid, query=None):
return self._refresh_data_cache(uuid=uuid, query=query)
[docs] def update_data(self, uuid, **kwargs):
uuid = self._verify_id(uuid)
typ = self.data_cache[uuid]['type'] if uuid in self.data_cache else self.detail(uuid)['type']
try:
client = self.get_processor_of_data(uuid)
if client is None:
client = self.portal_
pmode = kwargs.get('processing_mode', None)
if pmode:
for param, default in GIPSYXData.PROCESSING_MODE_SETTINGS[pmode].items():
if param not in kwargs:
kwargs[param] = default
return client.update_data(uuid, endpoint=typ, **kwargs)
except HTTPError as e:
if e.response.status_code == 404:
raise DataNotFound('Data %s was not found!' % (uuid,), uuid=uuid)
elif e.response.status_code == 403:
raise InvalidOperation(e.response.text, uuid=uuid)
raise e
[docs] def download_run(
self,
uuid,
stream=True,
write_to_disk=True,
dr=None,
query=None
):
uuid = self._verify_id(uuid)
processor = self.get_processor_of_data(uuid)
if processor is not None:
return processor.download(
'/'.join([processor.url.rstrip('/'), self.portal_.label, 'files', str(uuid), 'directory/run']),
stream=stream,
write_to_disk=write_to_disk,
dr=dr if dr is not None else self.download_directory,
query=query
)
[docs] def download_source(
self,
uuid,
source_type=None,
stream=True,
write_to_disk=True,
dr=None,
query=None
):
uuid = self._verify_id(uuid)
client_link = self._get_src_link_and_client(uuid, source_type)
if client_link is None:
return None
return client_link[0].download(
client_link[1],
stream=stream,
write_to_disk=write_to_disk,
dr=dr if dr is not None else self.download_directory,
query=query
)
[docs] def download_result(
self,
uuid,
result_type=None,
stream=True,
write_to_disk=True,
dr=None,
query=None,
):
"""
Download the results of a given submission.
:param uuid: The identifier of the data submission to download results for.
:param result_type: Optional, the type of result to get. If None or unspecified all results will be downloaded
as in an archive.
:param stream:
:param write_to_disk:
:param dr: The directory to write to if different that the global APPS configured download directory.
:param query:
:return:
"""
uuid = self._verify_id(uuid)
client_link = self._get_res_link_and_client(uuid, result_type)
if client_link is None:
return None
return client_link[0].download(
client_link[1],
stream=stream,
write_to_disk=write_to_disk,
dr=dr if dr is not None else self.download_directory,
query=query
)
[docs] def get_processor_of_data(self, uuid):
"""
Get the client associated with the processor node that the given data resides on.
:param uuid: The identifier of the data submission in question
:return: The client associated with the data's processor or None if one could not be found
"""
return self.get_processor(self.get_data_parameter(uuid, 'location'))
[docs] def get_processor(self, location, link=None):
"""
Get the client associated with the processor at the specified location or build one from the given link.
:param location: The label of the processor
:param link: A URL at which the processor can be found - will only be used if the label of the processor cant
be found.
:return: The client associated with the processor or None if one could not be found or constructed from a link
"""
if location not in self.processors:
self._resolve_processors()
if location not in self.processors:
# this could happen if a processor has been removed from the list of
# approved processors for a user, but the user still has some data on
# the processor
# DO NOT add this temporary client to the list of processors
if link is not None:
return ProcessorClient(
processor_url=APPS.base_url(link),
api_key=self.api_key_,
api_secret=self.api_secret_,
trust_env=self.trust_env
)
return None
else:
return self.processors[location]['client']
return self.processors[location]['client']
[docs] def ping(self):
"""
Check connectivity to all |APPS| servers. This will also refresh the known loads of the |APPS| processing
servers your account is allowed to use. Its not typically necessary to explicity call this function, other
functions that need to refresh this information will usually do so under the covers.
:return: A tuple containing either ( True, response ) or ( False, None ) if your |APPS| portal was reachable.
The response will be the json string response from the portal containing load data
"""
self._refresh_processors()
return self.portal_.ping()
def _get_res_link_and_client(self, uuid, result_type=None, verify=True):
"""
Build the result download link for the given data identifier and result type.
:param uuid: The identifier of the data submission
:param result_type: The type of the result to get the URI for, or None if you want to fetch all results
:param verify: If True, verify that the result exists before building the link and return None if it does not
:return: A 2-tuple holding the processor client that should handle the download request and the URI link to
download it, or if verify is specified and the requested result does not exist, None is returned
"""
state = self.get_data_parameter(uuid, 'state') if verify else None
if verify and not (state == Data.AVAILABLE or state == Data.RETRIEVED):
return None
processor = self.get_processor_of_data(uuid)
if processor is not None:
link = [processor.url.rstrip('/'), self.portal_.label, 'files', str(uuid), 'result']
if result_type and not result_type == GIPSYXData.ARCHIVE_FILE:
if verify:
found = False
for st in self.get_data_parameter(uuid, 'result_types'):
if st['result_type'] == result_type:
found = True
break
if not found:
return None
link += result_type
return processor, '/'.join(link)
return None
def _get_src_link_and_client(self, uuid, source_type=None, verify=True):
"""
Build the source download link for the given data identifier and source type.
:param uuid: The identifier of the data submission
:param source_type: The type of the source to get the URI for, or None if you want to fetch all sources
:param verify: If True, verify that the result exists before building the link and return None if it does not
:return: A 2-tuple holding the processor client that should handle the download request and the URI link to
download it, or if verify is specified and the requested result does not exist, None is returned
"""
src_available = self.get_data_parameter(uuid, 'src_available') if verify else None
if verify and not src_available:
return None
processor = self.get_processor_of_data(uuid)
if processor is not None:
link = [processor.url.rstrip('/'), self.portal_.label, 'files', str(uuid), 'source']
if source_type:
if verify:
found = False
for st in self.get_data_parameter(uuid, 'source_types'):
if st['source_type'] == source_type:
found = True
break
if not found:
return None
link += source_type
return processor, '/'.join(link)
return None
[docs] def get_data_parameter(self, uuid, param, force_refresh=False):
if force_refresh:
self._refresh_data_cache(uuid)
data = self.data_cache.get(uuid, None)
if data is None:
data = self._refresh_data_cache(uuid) # , query={ 'include[]': param } )
return data.get(param, None)
def _refresh_data_cache(self, uuid=None, query=None):
if uuid is not None:
uuid = self._verify_id(uuid)
try:
if uuid not in self.data_cache:
data = self.portal_.detail(uuid, dtype='data', query=query)
else:
data = self.data_cache[uuid]
data = self.portal_.detail(uuid, dtype=data['type'], query=query)
if uuid in self.data_cache:
self.data_cache[uuid].update(data)
else:
self.data_cache[uuid] = data
self.name_to_id[data['name']] = uuid
return data
except HTTPError as e:
if e.response.status_code == 404:
# todo search processor nodes for data - might not have replicated to portal yet
raise DataNotFound(
'Data %s was not found%s' % (
uuid,
': ' + e.response.text if len(e.response.text) > 0 else '.'
),
uuid=uuid
)
elif e.response.status_code == 403:
raise UnauthorizedOperation(
'Insufficient permissions to fetch details for data %s. %s' % (
uuid, frmt_resp(e.response)
)
)
raise e
else:
try:
data_lst = self.portal_.list_data(query=query)
for d in data_lst:
if 'id' in d:
if d['id'] in self.data_cache:
oldname = self.data_cache[d['id']].get('name', None)
if oldname is not None:
del self.name_to_id[oldname]
self.data_cache[d['id']].update(d)
else:
self.data_cache[d['id']] = d
if 'name' in d:
self.name_to_id[d['name']] = d['id']
except HTTPError as e:
if e.response.status_code == 403:
raise UnauthorizedOperation('Insufficient permissions to list data%s' % (frmt_resp(e.response),))
raise e
return data_lst
def _verify_id(self, uuid):
uuid = uuid.strip()
if not self.is_id(uuid):
raise InvalidIdentifier('%s is not a valid identifier!' % (uuid,), uuid=uuid)
return uuid
def _resolve_processors(self, query=None):
procs = self.portal_.processors(query=query)
self.processors = {
p['label']: {
'client': ProcessorClient(
processor_url=p['url'],
api_key=self.api_key_,
api_secret=self.api_secret_,
api=p['user_api'],
label=p['label'],
),
'active': True,
'load': p['load'],
'products': p['products']
} for p in procs
}
self._refresh_processors()
def _refresh_processors(self):
for lbl, p in self.processors.items():
(p['active'], meta) = p['client'].ping()
if p['active']:
p['load'] = meta['load']
p['error'] = None
else:
p['load'] = None
p['error'] = meta
def _select_processor(self, label=None):
self._refresh_processors()
m = sys.maxsize
processor = None
if label:
processor = self.processors.get(label, {'client': None})['client']
else:
for lbl, p in self.processors.items():
if p['active']:
if p['load'] is not None and p['load'] < m:
m = p['load']
processor = p['client']
return processor