2020-11-22 10:52:48 +01:00
|
|
|
"""
|
2020-06-06 01:04:40 +02:00
|
|
|
Simple command to share one-time files
|
2020-11-22 10:52:48 +01:00
|
|
|
"""
|
2020-06-06 01:04:40 +02:00
|
|
|
|
|
|
|
|
import os
|
2020-06-08 23:33:18 +02:00
|
|
|
import base64
|
2020-06-11 15:50:51 +02:00
|
|
|
import configparser
|
2020-06-08 23:33:18 +02:00
|
|
|
import hashlib
|
|
|
|
|
import hmac
|
2020-06-06 01:04:40 +02:00
|
|
|
import json
|
|
|
|
|
import time
|
2020-06-08 23:33:18 +02:00
|
|
|
from datetime import datetime
|
|
|
|
|
from urllib.parse import quote_plus, urljoin
|
2020-06-06 01:04:40 +02:00
|
|
|
|
|
|
|
|
import click
|
|
|
|
|
import requests
|
|
|
|
|
from pygments import highlight, lexers, formatters
|
|
|
|
|
|
|
|
|
|
|
2020-11-22 10:52:48 +01:00
|
|
|
ONCE_CONFIG_FILE = os.getenv("ONCE_CONFIG_FILE", os.path.expanduser("~/.once"))
|
|
|
|
|
ONCE_SIGNATURE_HEADER = "x-once-signature"
|
|
|
|
|
ONCE_TIMESTAMP_FORMAT = "%Y%m%d%H%M%S%f"
|
2020-06-06 01:04:40 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def highlight_json(obj):
|
|
|
|
|
formatted_json = json.dumps(obj, sort_keys=True, indent=4)
|
|
|
|
|
return highlight(formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def echo_obj(obj):
|
|
|
|
|
click.echo(highlight_json(obj))
|
|
|
|
|
|
|
|
|
|
|
2020-06-11 15:50:51 +02:00
|
|
|
def get_config(config_file: str = ONCE_CONFIG_FILE) -> configparser.ConfigParser:
|
|
|
|
|
if not os.path.exists(config_file):
|
2020-11-22 10:52:48 +01:00
|
|
|
raise ValueError(f"Config file not found at {config_file}")
|
2020-06-11 15:50:51 +02:00
|
|
|
config = configparser.ConfigParser()
|
|
|
|
|
config.read(ONCE_CONFIG_FILE)
|
|
|
|
|
return config
|
|
|
|
|
|
|
|
|
|
|
2020-06-06 01:04:40 +02:00
|
|
|
def api_req(method: str, url: str, verbose: bool = False, **kwargs):
|
2020-06-11 15:50:51 +02:00
|
|
|
config = get_config()
|
2020-11-22 10:52:48 +01:00
|
|
|
if not config.has_option("once", "base_url"):
|
|
|
|
|
raise ValueError(f"Configuration file at {ONCE_CONFIG_FILE} misses `base_url` option")
|
2020-06-11 15:50:51 +02:00
|
|
|
|
2020-11-22 10:52:48 +01:00
|
|
|
base_url = os.getenv("ONCE_API_URL", config["once"]["base_url"])
|
|
|
|
|
secret_key = base64.b64decode(os.getenv("ONCE_SECRET_KEY", config["once"]["secret_key"]))
|
2020-06-11 15:50:51 +02:00
|
|
|
|
2020-06-06 01:04:40 +02:00
|
|
|
method = method.lower()
|
2020-11-22 10:52:48 +01:00
|
|
|
if method not in ["get", "post"]:
|
2020-06-06 01:04:40 +02:00
|
|
|
raise ValueError(f'Unsupported HTTP method "{method}"')
|
|
|
|
|
|
2020-06-11 15:50:51 +02:00
|
|
|
actual_url = urljoin(base_url, url)
|
2020-06-06 01:04:40 +02:00
|
|
|
|
|
|
|
|
if verbose:
|
2020-11-22 10:52:48 +01:00
|
|
|
print(f"{method.upper()} {actual_url}")
|
2020-06-06 01:04:40 +02:00
|
|
|
|
2020-06-08 23:33:18 +02:00
|
|
|
req = requests.Request(method=method, url=actual_url, **kwargs).prepare()
|
2020-11-22 10:52:48 +01:00
|
|
|
plain_text = req.path_url.encode("utf-8")
|
2020-06-11 15:50:51 +02:00
|
|
|
hmac_obj = hmac.new(secret_key, msg=plain_text, digestmod=hashlib.sha256)
|
2020-06-08 23:33:18 +02:00
|
|
|
req.headers[ONCE_SIGNATURE_HEADER] = base64.b64encode(hmac_obj.digest())
|
|
|
|
|
|
|
|
|
|
response = requests.Session().send(req)
|
2020-06-06 01:04:40 +02:00
|
|
|
|
|
|
|
|
if verbose:
|
2020-11-22 10:52:48 +01:00
|
|
|
print(f"Server response status: {response.status_code}")
|
2020-06-06 01:04:40 +02:00
|
|
|
echo_obj(response.json())
|
|
|
|
|
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
|
2020-11-22 10:52:48 +01:00
|
|
|
@click.command("share")
|
|
|
|
|
@click.argument("file", type=click.File(mode="rb"), required=True)
|
|
|
|
|
@click.option("--verbose", "-v", is_flag=True, default=False, help="Enables verbose output.")
|
2020-06-06 01:04:40 +02:00
|
|
|
def share(file: click.File, verbose: bool):
|
2020-11-22 10:52:48 +01:00
|
|
|
entry = api_req(
|
|
|
|
|
"GET",
|
|
|
|
|
"/",
|
|
|
|
|
params={"f": quote_plus(os.path.basename(file.name)), "t": datetime.utcnow().strftime(ONCE_TIMESTAMP_FORMAT)},
|
|
|
|
|
verbose=verbose,
|
|
|
|
|
).json()
|
2020-06-06 01:04:40 +02:00
|
|
|
|
2020-11-22 10:52:48 +01:00
|
|
|
once_url = entry["once_url"]
|
|
|
|
|
upload_data = entry["presigned_post"]
|
|
|
|
|
files = {"file": file}
|
2020-06-06 01:04:40 +02:00
|
|
|
|
|
|
|
|
upload_started = time.time()
|
2020-11-22 10:52:48 +01:00
|
|
|
response = requests.post(upload_data["url"], data=upload_data["fields"], files=files)
|
2020-06-06 01:04:40 +02:00
|
|
|
|
|
|
|
|
upload_time = time.time() - upload_started
|
2020-11-22 10:52:48 +01:00
|
|
|
print(f"File uploaded in {upload_time}s")
|
2020-06-06 01:04:40 +02:00
|
|
|
print(f"File can be downloaded once at: {once_url}")
|
|
|
|
|
|
|
|
|
|
|
2020-11-22 10:52:48 +01:00
|
|
|
if __name__ == "__main__":
|
2020-06-06 01:04:40 +02:00
|
|
|
share()
|