Source code for letterboxd.api

import hashlib
import hmac
import json
import logging
import os
import time
import uuid

import requests

logging.getLogger(__name__)

CHARLES_PROXY = "http://localhost:8888/"
CHARLES_CERTIFICATE = os.environ.get("CHARLES_CERTIFICATE", None)
CHARLES = os.environ.get("CHARLES", None)


[docs]class API: """ Communication methods for the Letterboxd API """ def __init__(self, api_base, api_key, api_secret): """ This method will start the shared requests session for the Letterboxd API. If the API key and secret are not passed, the initializer will attempt to get them from the environment variables. :param api_base: str - the base URL of the API endpoints, including version number :param api_key: str - API key provided by Letterboxd :param api_secret: str - API shared secret provided by Letterboxd """ self.api_base = api_base self.api_key = api_key self.api_secret = api_secret self.user = None if self.api_key == "": # If the API key wasn't passed in class APIKeyMissingError(Exception): pass raise APIKeyMissingError( "All methods require an API key. See " "https://letterboxd.com/api-coming-soon/ " "for more information" ) if self.api_secret == "": # If the API shared secret wasn't passed in class APISecretMissingError(Exception): pass raise APISecretMissingError( "All methods require an API secret. See " "https://letterboxd.com/api-coming-soon/ " "for more information" ) # Start the shared requests session self.session = requests.Session() self.session.params = {}
[docs] def api_call(self, path, params={}, form=None, headers={}, method="get"): """ The workhorse method of calls to the Letterboxd API :param path: str - URL endpoint path for the desired service :param params: dict - request parameters :param form: str - form information, likely from the auth.py call :param headers: dict - request parameters :param method: str - HTML methods, [get, post, put, patch, delete] :return: requests.Response object """ # If we have an oAuth token if self.user: headers["Authorization"] = f"Bearer {self.user.token}" url = f"{self.api_base}/{path}" logging.debug( f"\n" f"url: {url}\n" f"params: {params}\n" f"form: {form}\n" f"headers: {headers}\n" f"method: {method}\n" f"-------------------------" ) if form: # `form` seems to only be used in an oAuth call? # should some of this code be in there instead? logging.debug("API.api_call() if form") headers["Content-Type"] = "application/x-www-form-urlencoded" # Prepare the request prepared_dict = self.__prepare_request( url, body=form, headers=headers, method=method ) prepared_request = prepared_dict["prepared_request"] signature = prepared_dict["signature"] # Add the signature to the headers prepared_request.headers["Authorization"] = f"Signature {signature}" elif method.lower() in ["post", "put", "patch", "delete"]: logging.debug( "API.api_call() elif method.lower() in " '["post", "put", "patch", "delete"]:' ) params = self.__remove_empty_from_dict(params) # JSON-encode the body body = json.dumps(params) headers["Content-Type"] = "application/json" # prepare the request prepared_dict = self.__prepare_request( url, body=body, headers=headers, method=method ) prepared_request = prepared_dict["prepared_request"] signature = prepared_dict["signature"] # Attach the signature prepared_request.prepare_url(prepared_request.url, {"signature": signature}) else: logging.debug("API.api_call() else:") # It's a GET # Prepare the request prepared_dict = self.__prepare_request( url, params=params, headers=headers, method=method ) prepared_request = prepared_dict["prepared_request"] signature = prepared_dict["signature"] logging.debug(prepared_request.url) # Add the signature to the end of the params in the url prepared_request.prepare_url(prepared_request.url, {"signature": signature}) logging.debug( f"API.api_call() prepared_request\n" f"method: {prepared_request.method}\n" f"url: {prepared_request.url}\n" f"headers: {prepared_request.headers}\n" f"body: {prepared_request.body}" ) try: # If we've set the environment variable, run with Charles proxy if CHARLES == "True": # First, make sure we have the correct settings if ( logging.getLogger().isEnabledFor(logging.DEBUG) and CHARLES_CERTIFICATE ): logging.debug("Send prepared_request through Charles") proxies = {"http": CHARLES_PROXY, "https": CHARLES_PROXY} self.session.verify = CHARLES_CERTIFICATE # send the request through the proxy response = self.session.send(prepared_request, proxies=proxies) else: # send the request normally logging.debug("Send prepared_request") response = self.session.send(prepared_request) except ConnectionError as error: logging.error(error) raise # Return the response logging.debug(f"api_call() response.status_code: {response.status_code}") if response.ok: return response else: response.raise_for_status() return response
# ------------------------- # Private methods def __prepare_request( self, url, params={}, body=[], headers={}, method="get", form=False ): """ Prepare the request and sign it :param url: string :param params: dict :param form: bool :param headers: dict :param method: string - get, post, put, patch, delete :return: dict - {'prepared_request', 'signature'} """ # Add the request params required for uniquely identifying the request params = self.__add_unique_params(params) # Prepare the request and add it to the current requests session request = requests.Request( method.upper(), url, params=params, data=body, headers=headers ) prepared_request = self.session.prepare_request(request) logging.debug(f"prepared url: {prepared_request.url}") # Hash the request signature signature = self.__sign( method=prepared_request.method, url=prepared_request.url, body=prepared_request.body, ) return {"prepared_request": prepared_request, "signature": signature} def __remove_empty_from_dict(self, dirty_dict): """ Takes a dictionary recursively removes all None and "" values :param dirty_dict: dict :return: dict """ logging.debug(f"params: {dirty_dict}") cleaned_dict = {} for key, value in dirty_dict.items(): logging.debug(f"key: {key}, value: {value}") if (value is None) or (value is ""): logging.debug("Toss the value!") elif isinstance(value, dict): this_dict = self.__remove_empty_from_dict(value) cleaned_dict[key] = this_dict elif isinstance(value, tuple) or isinstance(value, list): cleaned_dict[key] = self.__remove_empty_from_list(value) else: cleaned_dict[key] = value logging.debug("-------------------------") logging.debug(f"result: {cleaned_dict}") return cleaned_dict def __remove_empty_from_list(self, dirty_list): """ Takes a tuple or list and recursively removes all None and "" values :param dirty_list: tuple or list :return: list """ cleaned_list = [] for __item in dirty_list: logging.debug(__item) if __item is "" or __item is None: logging.debug(f"item {__item} is None") pass elif isinstance(__item, dict): logging.debug(f"item {__item} is dict") cleaned_list.append(self.__remove_empty_from_dict(__item)) elif isinstance(__item, tuple) or isinstance(__item, list): logging.debug(f"item {__item} is tuple or list") cleaned_list.append(self.__remove_empty_from_list(__item)) else: logging.debug(f"item {__item} is else") cleaned_list.append(__item) return cleaned_list def __add_unique_params(self, params): """ Adds the metabody params required for signing the request :param params: dict :return: dict """ params["apikey"] = self.api_key # nonce: UUID string, must be unique for each API request params["nonce"] = uuid.uuid4() # timestamp: number of seconds since epoch, Jan 1, 1970 (UTC) params["timestamp"] = int(time.time()) return params def __sign(self, method, url, body=""): """ Create a salted string as bytes, of the form [METHOD]\x00[URL]\x00[BODY], where [METHOD] is GET, POST, etc., [URL] is the fully-qualified request URL including the apikey, nonce, timestamp and any other method parameters, and [BODY] is a JSON-encoded string (for POST, PATCH and DELETE requests) or empty (for GET requests). Next, create a [SIGNATURE] from the salted string by applying a lower-case HMAC/SHA-256 transformation, using your API Secret, and append it to your API request URL as the final query parameter: …&signature=[SIGNATURE] Notes: you must specify a Content-Type: application/json request header if [BODY] is JSON-encoded. The apikey parameter is your supplied API Key. The nonce parameter should be a UUID string and must be unique for each API request. The timestamp parameter is the number of seconds since Jan 1, 1970 (UTC), also know as "UNIX Epoch time." :param method: str - get, post, put, patch, delete :param url: str :param body: str - JSON-encoded :return: str """ # Create the salted bytestring if body is None: body = "" signing_bytestring = b"\x00".join( [str.encode(method), str.encode(url), str.encode(body)] ) logging.debug(f"signing_bytestring: {signing_bytestring}") # applying an HMAC/SHA-256 transformation, using our API Secret signature = hmac.new( str.encode(self.api_secret), signing_bytestring, digestmod=hashlib.sha256 ) # get the string representation of the hash signature_string = signature.hexdigest() return signature_string