"""Core Swimlane client class"""
import logging
import jwt
import pendulum
import requests
from pyuri import URI
from requests.compat import json
from requests.packages import urllib3
from requests.structures import CaseInsensitiveDict
from six.moves.urllib.parse import urljoin
from swimlane.core.adapters import GroupAdapter, UserAdapter, AppAdapter, HelperAdapter
from swimlane.core.cache import ResourcesCache
from swimlane.core.resolver import SwimlaneResolver
from swimlane.core.resources.usergroup import User
from swimlane.exceptions import SwimlaneHTTP400Error, InvalidSwimlaneProductVersion
from swimlane.utils.version import get_package_version, compare_versions
from swimlane.core.wrappedsession import WrappedSession
# Disable insecure request warnings
urllib3.disable_warnings()
logger = logging.getLogger(__name__)
# pylint: disable=invalid-name
_lib_full_version = get_package_version()
_lib_major_version, _lib_minor_version = _lib_full_version.split('.')[0:2]
[docs]class Swimlane(object):
"""Swimlane API client
Core class used throughout library for all API requests and server interactions
Args:
host (str): Full RFC-1738 URL pointing to Swimlane host. Defaults will be provided for all parts
username (str): Authentication username
password (str): Authentication password
verify_ssl (bool): Verify SSL (ignored on HTTP). Disable to use self-signed certificates
default_timeout (int): Default request connect and read timeout in seconds for all requests
verify_server_version (bool): Verify server version has same major version as client package. May require
additional requests, set False to disable check
resource_cache_size (int): Maximum number of each resource type to keep in memory cache. Set 0 to disable
caching. Disabled by default
access_token (str): Authentication token, used in lieu of a username and password
write_to_read_only (bool): Enable the ability to write to Read-only fields
Attributes:
host (pyuri.URI): Full RFC-1738 URL pointing to Swimlane host
apps (AppAdapter): :class:`~swimlane.core.adapters.app.AppAdapter` configured for current Swimlane instance
users (UserAdapter): :class:`~swimlane.core.adapters.usergroup.UserAdapter` configured for current
Swimlane instance
groups (GroupAdapter): :class:`~swimlane.core.adapters.usergroup.GroupAdapter` configured for current
Swimlane instance
resources_cache (ResourcesCache): Cache checked by all supported adapters for current Swimlane instance
Examples:
::
# Establish connection using username password
swimlane = Swimlane(
'https://192.168.1.1',
'username',
'password',
verify_ssl=False
)
# Or establish connection using personal access token
swimlane = Swimlane(
'https://192.168.1.1',
access_token='abcdefg',
verify_ssl=False
)
# Retrieve an app
app = swimlane.apps.get(name='Target App')
"""
_api_root = '/api/'
def __init__(
self,
host,
username=None,
password=None,
verify_ssl=True,
default_timeout=60,
verify_server_version=True,
resource_cache_size=0,
access_token=None,
write_to_read_only=False
):
self.__verify_auth_params(username, password, access_token)
self.host = URI(host)
self.host.scheme = (self.host.scheme or 'https').lower()
self.host.path = None
self.resources_cache = ResourcesCache(resource_cache_size)
self.__settings = None
self.__user = None
self._write_to_read_only = write_to_read_only
self._default_timeout = default_timeout
self._session = WrappedSession()
self._session.verify = verify_ssl
if username is not None and password is not None:
self._session.auth = SwimlaneJwtAuth(
self,
username,
password
)
else:
self._session.auth = SwimlaneTokenAuth(
self,
access_token
)
self.apps = AppAdapter(self)
self.users = UserAdapter(self)
self.groups = GroupAdapter(self)
self.helpers = HelperAdapter(self)
if verify_server_version:
self.__verify_server_version()
@staticmethod
def __verify_auth_params(username, password, access_token):
"""Verify that valid authentication parameters were passed to __init__"""
if all(v is not None for v in [username, password, access_token]):
raise ValueError('Cannot supply a username/password and a access token')
if (username is None or password is None) and access_token is None:
raise ValueError('Must supply a username/password or access token')
def __verify_server_version(self):
"""Verify connected to supported server product version
Notes:
Logs warning if connecting to a newer minor server version
Raises:
swimlane.exceptions.InvalidServerVersion: If server major version is higher than package major version
"""
if compare_versions('.'.join([_lib_major_version, _lib_minor_version]), self.product_version) > 0:
logger.warning('Client version {} connecting to server with newer minor release {}.'.format(
_lib_full_version,
self.product_version
))
if compare_versions(_lib_major_version, self.product_version) != 0:
raise InvalidSwimlaneProductVersion(
self,
'{}.0'.format(_lib_major_version),
'{}.0'.format(str(int(_lib_major_version) + 1))
)
def __repr__(self):
return '<{cls}: {user} @ {host} v{version}>'.format(
cls=self.__class__.__name__,
user=self.user,
host=self.host,
version=self.version
)
[docs] def request(self, method, api_endpoint, **kwargs):
"""Wrapper for underlying :class:`requests.Session`
Handles generating full API URL, session reuse and auth, request defaults, and invalid response status codes
Used throughout library as the core underlying request/response method for all interactions with server
Args:
method (str): Request method (get, post, put, etc.)
api_endpoint (str): Portion of URL matching API endpoint route as listed in platform /docs help page
**kwargs (dict): Remaining arguments passed through to actual request call
Notes:
All other provided kwargs are passed to underlying :meth:`requests.Session.request()` call
Raises:
swimlane.exceptions.SwimlaneHTTP400Error: On 400 responses with additional context about the exception
requests.HTTPError: Any other 4xx/5xx HTTP responses
Returns:
requests.Response: Successful response instances
Examples:
Request and parse server settings endpoint response
>>> server_settings = swimlane.request('get', 'settings').json()
"""
while api_endpoint.startswith('/'):
api_endpoint = api_endpoint[1:]
# Ensure a timeout is set
kwargs.setdefault('timeout', self._default_timeout)
# Manually grab and dump json data to have full control over serialization
# Emulate default requests behavior
json_data = kwargs.pop('json', None)
if json_data is not None:
headers = CaseInsensitiveDict(kwargs.get('headers', {}))
headers.setdefault('Content-Type', 'application/json')
kwargs['headers'] = headers
kwargs['data'] = json.dumps(json_data, sort_keys=True, separators=(',', ':'))
response = self._session.request(method, urljoin(str(self.host) + self._api_root, api_endpoint), **kwargs)
# Roll 400 errors up into SwimlaneHTTP400Errors with specific Swimlane error code support
try:
response.raise_for_status()
except requests.HTTPError as error:
if error.response.status_code == 400:
raise SwimlaneHTTP400Error(error)
else:
raise error
return response
@property
def settings(self):
"""Retrieve and cache settings from server"""
if not self.__settings:
self.__settings = self.request('get', 'settings').json()
return self.__settings
@property
def version(self):
"""Full Swimlane version, <product_version>+<build_version>+<build_number>"""
return self.settings['apiVersion']
@property
def product_version(self):
"""Swimlane product version"""
version_separator = '+'
if version_separator in self.version:
# Post product/build version separation
return self.version.split(version_separator)[0]
# Pre product/build version separation
return self.version.split('-')[0]
@property
def build_version(self):
"""Swimlane semantic build version
Falls back to product version in pre-2.18 releases
"""
version_separator = '+'
if version_separator in self.version:
# Post product/build version separation
# This will handle <product_version>+<build_version>+<build_number>
# or <build_version>+<build_number> formats of the version
return self.version.split(version_separator)[-2]
# Pre product/build version separation
return self.product_version
@property
def build_number(self):
"""Swimlane build number"""
version_separator = '+'
if version_separator in self.version:
# Post product/build version separation
return self.version.split(version_separator)[2]
# Pre product/build version separation
return self.version.split('-')[1]
@property
def user(self):
"""User record instance for authenticated user"""
return self._session.auth.user
[docs]class SwimlaneTokenAuth(SwimlaneResolver):
"""Handles token authentication for all requests
.. versionadded:: 4.1.0
"""
def __init__(self, swimlane, access_token):
super(SwimlaneTokenAuth, self).__init__(swimlane)
self._access_token = access_token
self.user = None
def __call__(self, request):
"""Attach necessary headers to all requests"""
headers = {
'Private-Token': self._access_token
}
request.headers.update(headers)
# Only make the call to user/authorize to get the user's profile if we haven't retrieved it
# already
if self.user is not None:
return request
# Temporarily remove auth from Swimlane session for auth request to avoid recursive loop during the request
self._swimlane._session.auth = None
resp = self._swimlane.request(
'get',
'user/authorize',
headers=headers
)
self._swimlane._session.auth = self
json_content = resp.json()
self.user = User(self._swimlane, _user_raw_from_login_content(json_content))
return request
[docs]class SwimlaneJwtAuth(SwimlaneResolver):
"""Handles authentication for all requests"""
_token_expiration_buffer = pendulum.Duration(minutes=5)
def __init__(self, swimlane, username, password):
super(SwimlaneJwtAuth, self).__init__(swimlane)
self._username = username
self._password = password
self.user = None
self._login_headers = {}
self._token_expiration = pendulum.now()
def __call__(self, request):
"""Attach necessary headers to all requests
Automatically reauthenticate before sending request when nearing token expiration
"""
# Refresh token if it expires soon
if pendulum.now() + self._token_expiration_buffer >= self._token_expiration:
self.authenticate()
request.headers.update(self._login_headers)
return request
[docs] def authenticate(self):
"""Send login request and update User instance, login headers, and token expiration"""
# Temporarily remove auth from Swimlane session for auth request to avoid recursive loop during login request
self._swimlane._session.auth = None
resp = self._swimlane.request(
'post',
'user/login',
json={
'userName': self._username,
'password': self._password
},
)
self._swimlane._session.auth = self
# Get JWT from response content
json_content = resp.json()
token = json_content.pop('token', None)
# Grab token expiration
token_data = jwt.decode(token, algorithms=[''], options={'verify_signature': False})
token_expiration = pendulum.from_timestamp(token_data['exp'])
headers = {
'Authorization': 'Bearer {}'.format(token)
}
# Create User instance for authenticating user from login response data
user = User(self._swimlane, _user_raw_from_login_content(json_content))
self._login_headers = headers
self.user = user
self._token_expiration = token_expiration
def _user_raw_from_login_content(login_content):
"""Returns a User instance with appropriate raw data parsed from login response content"""
matching_keys = [
'displayName',
'lastLogin',
'active',
'name',
'isMe',
'lastPasswordChangedDate',
'passwordResetRequired',
'groups',
'roles',
'email',
'isAdmin',
'createdDate',
'modifiedDate',
'createdByUser',
'modifiedByUser',
'userName',
'id',
'disabled'
]
raw_data = {
'$type': User._type,
}
for key in matching_keys:
if key in login_content:
raw_data[key] = login_content[key]
return raw_data