from galileo_sdk.compat import requests, urlencode, URLError
from datetime import datetime, timedelta
import os, urllib, json, webbrowser, time, tempfile

# wrapper class for Galileo sdk authentication. It automatically handles authentication for the user via the default webbrowser
[docs]class AuthSdk: """Helper class for user authentication."""
[docs] def __init__(self, client_id="oDmH6Nf4DN3oILcNk7cQqBchXUfv7fpD", audience=""): """ Constructor for AuthSdk class. :param client_id: string: Optional argument for tracking which integration is calling the sdk. :param mode: string: Optional argument, can be prod (default) or dev. :param audience: string: Optional argument :return: AuthSdk Example: >>> from galileo_sdk import GalileoSdk, AuthSdk >>> myauth = AuthSdk() >>> access_token, refresh_token, expiry_time = myauth.initialize() >>> galileo = GalileoSdk(auth_token=access_token, refresh_token=refresh_token) """ self.domain = "" self.headers_default = { "content-type": "application/x-www-form-urlencoded" } self.client_id = client_id # if a custom audience is supplied, use it if audience: self.audience = audience else: self.audience = ""
[docs] def initialize(self, refresh_token_path=os.path.join(os.path.expanduser("~"), ".galileo")): """ Automatically authenticate the user either through a webbrowser or auth-link printed to the terminal. :param refresh_token_path: string: An optional file path for where to store auth token information. If you don not want authentication info to be stored, pass an empty string, i.e. refresh_token_path=''. The default location is in you home directory in a file named .galileo. :return: access_token, refresh_token, expiration Example: >>> from galileo_sdk import GalileoSdk, AuthSdk >>> myauth = AuthSdk() >>> access_token, refresh_token, expiry_time = myauth.initialize() >>> galileo = GalileoSdk(auth_token=access_token, refresh_token=refresh_token) """ if os.path.exists(refresh_token_path): access_token, refresh_token, expires_in = self.refresh_token_file_flow( refresh_token_path) else: access_token, refresh_token, expires_in = self.device_flow( refresh_token_path) return access_token, refresh_token, expires_in
[docs] def device_flow(self, refresh_token_path=""): """ Attempts to bring up a webbrowser for the user to authenticate. Otherwise, it will print a URL and access code to stdout. :param refresh_token_path: string: An optional file path for where to store auth token information. If you do not provide a file path, the programmer is responsible for handling persistence. :return: access_token, refresh_token, expiration """ ( user_url, user_code, device_code, interval, user_url_complete, ) = self._request_device_authorization() # Try to open a window to the auth0 audience bowser_available = if bowser_available: # Then arrange code confirmation also in the webbrowser self.show_user_code(user_code) else: mystring = "Go to: " + user_url + "\nEnter the code: " + user_code print(mystring) access_token, refresh_token, expires_in = self._poll_for_tokens( interval, device_code) data = self._store_token_info(refresh_token, expires_in, refresh_token_path) return access_token, refresh_token, data["expires_in"]
def show_user_code(self, user_code): html = ("""<!DOCTYPE html> <html> <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"> <meta name="robots" content="noindex, nofollow"> <link rel="stylesheet" href=""> <style id="custom-styles-container"> body { background: #354962; font-family: ulp-font, -apple-system, BlinkMacSystemFont, Roboto, Helvetica, sans-serif; } .main-wrapper { background: #354962; } .ulp-alert.danger { background: #D00E17; } .ulp-alert.success { background: #0A8852; } @supports (mask-image: url('/static/img/branding-generic/copy-icon.svg')) { @supports not (-ms-ime-align: auto) { .input-container.error::before { background-color: #D00E17; } } } .input.ulp-input-error { border-color: #D00E17; } .error-cloud { background-color: #D00E17; } .error-fatal { background-color: #D00E17; } </style> <title>Connect to Galileo SDK/CLI</title> </head> <body> <main class="ulp-outer device-code-confirmation"> <section class="ulp-box no-badge"> <div class="ulp-box-inner _prompt-box"> <div class="ulp-main"> <header class="ulp-header _prompt-header"> <img class="header-logo _header-logo" id="prompt-logo-center" src="" alt="Device Confirmation"> <h1 class="header-title _header-title">Device Confirmation</h1> <div class="header-description _header-description"> <p class="text-simple _text ">Please confirm this code in the other tab:</p> </div> <div class="header-description _header-description"> <p class="text-simple _text " style="font-size:25px;">""" + str(user_code) + """</p> </div> </header> </div> </div> </body> </html> """) with tempfile.NamedTemporaryFile("w", delete=False, suffix=".html") as f: url = "file://" + f.write(html), new=1) def refresh_token_file_flow(self, refresh_token_file): """ Refreshes access token if given the location of a refresh token file. :param refresh_token_file: string: File path for where auth token information is stored. :return: access_token, refresh_token, expiration """ if not os.path.exists(refresh_token_file): print("No refresh token file") with open(refresh_token_file, "r") as fd: refresh_token = refresh_json = json.loads(refresh_token) refresh_json["expires_in"] = datetime.strptime( refresh_json["expires_in"], "%Y-%m-%d %H:%M:%S.%f") access_token = self._get_new_access_token( refresh_json["refresh_token"], refresh_token_file) return access_token, refresh_json["refresh_token"], refresh_json[ "expires_in"] def refresh_token_flow(self, refresh_token, expires_in): """ Refreshes access token if given a refresh token. :param refresh_token: string: String object representing the token. :param expires_in: string: String object representing the expiry date of the refresh token. :return: access_token, refresh_token, expiration """ access_token = self._get_new_access_token(refresh_token, "") return access_token, refresh_token, expires_in def datetime_converter(self, o): """ :param o: object: Object to convert to datetime. :return: datetime: str: String representation of the object. """ if isinstance(o, datetime): return o.__str__() def _store_token_info(self, refresh_token, expires_in, refresh_token_path): """ :param refresh_token: string: refresh token. :param expires_in: string: expiration date of the refresh token. :param refresh_token_path: string: file path to store the refresh token. :return: token_info: dict: Dictionary containing the refresh token, expiration date """ tmp = None data = { "refresh_token": refresh_token, "expires_in": + timedelta(0, expires_in), } if len(refresh_token_path) > 0: try: tmp = open(refresh_token_path, "w") tmp.write(json.dumps(data, default=self.datetime_converter)) finally: if tmp: tmp.close() return data def _request_device_authorization(self): """ :return: Tuple containing the user_url, user_code, device_code, interval, and complete_url. """ r = "{domain}/oauth/device/code".format(domain=self.domain), headers=self.headers_default, data=urlencode( { "client_id": self.client_id, "audience": self.audience, "scope": "email profile openid offline_access", }, doseq=True, ), ) r = r.json() user_url = r["verification_uri"] user_code = r["user_code"] device_code = r["device_code"] interval = r["interval"] user_url_complete = r["verification_uri_complete"] return user_url, user_code, device_code, interval, user_url_complete def _poll_for_tokens(self, interval, device_code): """ :param interval: string: time in seconds between polling for tokens. :param device_code: string: device code. :return: Tuple containing the access_token, refresh_token, and expiration. """ interval = interval * 2 url_str = urlencode( { "grant_type": "urn:ietf:params:oauth:grant-type:device_code", "device_code": device_code, "client_id": self.client_id, }, doseq=True, ) while True: try: r = "{domain}/oauth/token".format(domain=self.domain), headers=self.headers_default, data=urlencode( { "grant_type": "urn:ietf:params:oauth:grant-type:device_code", "device_code": device_code, "client_id": self.client_id, }, doseq=True, ), ) except Exception as e: if hasattr(e, "code") and e.code == 429: interval = interval * 2 continue if r.status_code == 200: break time.sleep(interval) r = r.json() return r["access_token"], r["refresh_token"], r["expires_in"] def _get_new_access_token(self, refresh_token, refresh_token_file): """ :param refresh_token: string: refresh token. :param refresh_token_file: string: file path for where the refresh token is stored. :return: access_token: string: access token. """ response = "{domain}/oauth/token".format(domain=self.domain), headers=self.headers_default, data=urlencode( { "client_id": self.client_id, "grant_type": "refresh_token", "refresh_token": refresh_token, }, doseq=True, ), ) response.raise_for_status() r = response.json() self._store_token_info(refresh_token, r["expires_in"], refresh_token_file) return r["access_token"]