From 50a4c09c3c040db3e69298a212f5b38a0daad545 Mon Sep 17 00:00:00 2001 From: Domenico Testa Date: Wed, 15 Apr 2020 19:37:21 +0200 Subject: [PATCH] Replacing the placeholder API with a real one. --- api/Dockerfile | 14 ++++++ api/app.py | 102 +++++++++++++++++++++++++++++++++++++++++++ api/requirements.txt | 2 + app.py | 70 +++++++++++++++++++++++++---- requirements.txt | 1 + 5 files changed, 180 insertions(+), 9 deletions(-) create mode 100644 api/Dockerfile create mode 100644 api/app.py create mode 100644 api/requirements.txt diff --git a/api/Dockerfile b/api/Dockerfile new file mode 100644 index 0000000..5f1d58f --- /dev/null +++ b/api/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3 + +ENV PYTHONUNBUFFERED=1 +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PORT 8080 +ENV FLASK_APP app.py + +WORKDIR /app +COPY . /app + +RUN pip install --no-cache -r requirements.txt + +CMD ["flask", "run", "-h", "0.0.0.0", "-p", "8080"] + diff --git a/api/app.py b/api/app.py new file mode 100644 index 0000000..bc6f0db --- /dev/null +++ b/api/app.py @@ -0,0 +1,102 @@ +import os +import json +from uuid import uuid4 + +import boto3 +from flask import Flask, request + + +APP_NAME = 'karaoke-http-api' +VERSION = '0.10' +PROCESSING_QUEUE_URL = os.getenv('PROCESSING_QUEUE_URL') +TRACKS_TABLE_NAME = os.getenv('TRACKS_TABLE_NAME') +UPLOAD_BUCKET_NAME = os.getenv('UPLOAD_BUCKET_NAME') +UPLOAD_AUTH_TIMEOUT = int(os.getenv('UPLOAD_AUTH_TIMEOUT', 120)) +app = Flask(__name__) + + +def get_dynamodb_client(): + return boto3.client('dynamodb') + + +def get_s3_client(): + return boto3.client('s3') + + +def get_sqs_client(): + return boto3.client('sqs') + + +def strip_dynamodb_type_tags(obj): + return {key: list(value.values())[0] + for key, value in obj.items()} + + +@app.route('/') +def info(): + return dict(name=APP_NAME, version=VERSION) + + +@app.route('/jobs', methods=['GET', 'POST']) +def jobs_resource(): + dynamodb = get_dynamodb_client() + if request.method == 'POST': + job_id = str(uuid4()) + + s3_key = f'track_{job_id}' + s3_complete_url = f's3://{UPLOAD_BUCKET_NAME}/{s3_key}' + + dynamodb.put_item( + TableName=TRACKS_TABLE_NAME, + Item={ + 'id': {'S': job_id}, + 'input_s3_url': {'S': s3_complete_url}, + 'status': {'S': 'uploading'} + }) + + # create the pre-signed upload URL + s3 = get_s3_client() + upload_data = s3.generate_presigned_post(UPLOAD_BUCKET_NAME, s3_key, + Fields=None, + Conditions=None, + ExpiresIn=UPLOAD_AUTH_TIMEOUT) + + return dict(job_id=job_id, upload_data=upload_data) + + response = dynamodb.scan(TableName=TRACKS_TABLE_NAME) + return dict(items=[strip_dynamodb_type_tags(el) for el in response['Items']], + count=response['Count'], + scanned_count=response['ScannedCount']) + + +@app.route('/jobs/', methods=['GET']) +def get_job_details(job_id): + dynamodb = get_dynamodb_client() + response = dynamodb.get_item(TableName=TRACKS_TABLE_NAME, Key={'id': {'S': job_id}}) + return strip_dynamodb_type_tags(response['Item']) + + +@app.route('/jobs//process', methods=['POST']) +def trigger_job_processing(job_id): + dynamodb = get_dynamodb_client() + response = dynamodb.get_item(TableName=TRACKS_TABLE_NAME, Key={'id': {'S': job_id}}) + job = strip_dynamodb_type_tags(response['Item']) + + if job['status'] != 'uploading': + return dict(status='failed', message='Job is in a wrong state'), 400 + + dynamodb.update_item( + TableName=TRACKS_TABLE_NAME, + Key={'id': {'S': job_id}}, + AttributeUpdates={ + 'status': {'Value': {'S': 'processing'}} + }) + + sqs = get_sqs_client() + response = sqs.send_message( + QueueUrl=PROCESSING_QUEUE_URL, + MessageBody=json.dumps({ + 'job_id': job['id'], + 'input_s3_url': job['input_s3_url']})) + + return dict(status='processing', message_id=response['MessageId']) diff --git a/api/requirements.txt b/api/requirements.txt new file mode 100644 index 0000000..aa55d98 --- /dev/null +++ b/api/requirements.txt @@ -0,0 +1,2 @@ +flask +boto3 diff --git a/app.py b/app.py index 2926155..86cb289 100644 --- a/app.py +++ b/app.py @@ -3,9 +3,12 @@ import os from aws_cdk import core +import aws_cdk.aws_dynamodb as dynamodb import aws_cdk.aws_ec2 as ec2 import aws_cdk.aws_ecs as ecs import aws_cdk.aws_ecs_patterns as ecs_patterns +import aws_cdk.aws_s3 as s3 +import aws_cdk.aws_sqs as sqs from karaokeme_cdk.karaokeme_cdk_stack import KaraokemeCdkStack @@ -21,34 +24,77 @@ class BaseResources(core.Stack): self.cluster = ecs.Cluster(self, 'karaoke-cluster', vpc=self.vpc) + # DynamoDB table + self.tracks_table = dynamodb.Table(self, 'karaoke-tracks-table', + table_name='karaoke-tracks', + partition_key=dynamodb.Attribute(name='id', type=dynamodb.AttributeType.STRING), + billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST) + + # The bucket where uploaded files will be kept + self.input_bucket = s3.Bucket(self, 'input-bucket', + bucket_name='karaoke-uploaded-tracks') + + # The bucket where processed files will be kept and served back to the public + self.output_bucket = s3.Bucket(self, 'output-bucket', + bucket_name='karaoke-separated-tracks', + public_read_access=True) + class HttpApi(core.Stack): - def __init__(self, scope, id, cluster: ecs.Cluster, **kwargs): + def __init__(self, scope, id, + cluster: ecs.Cluster, + tracks_table: dynamodb.Table, + processing_queue: sqs.Queue, + upload_bucket: s3.Bucket, + **kwargs): super().__init__(scope, id, **kwargs) - api_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'api-placeholder')) + api_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'api')) self.api = ecs_patterns.ApplicationLoadBalancedFargateService(self, 'http-api-service', cluster=cluster, task_image_options=ecs_patterns.ApplicationLoadBalancedTaskImageOptions( image=ecs.ContainerImage.from_asset(directory=api_dir), - container_port=8080), + container_port=8080, + environment={ + 'PROCESSING_QUEUE_URL': processing_queue.queue_url, + 'TRACKS_TABLE_NAME': tracks_table.table_name, + 'UPLOAD_BUCKET_NAME': upload_bucket.bucket_name + }), desired_count=2, cpu=256, memory_limit_mib=512) + processing_queue.grant_send_messages(self.api.service.task_definition.task_role) + tracks_table.grant_read_write_data(self.api.service.task_definition.task_role) + upload_bucket.grant_put(self.api.service.task_definition.task_role) + class SeparatorWorker(core.Stack): - def __init__(self, scope, id, cluster: ecs.Cluster, **kwargs): + def __init__(self, scope, id, + cluster: ecs.Cluster, + tracks_table: dynamodb.Table, + input_bucket: s3.Bucket, + output_bucket: s3.Bucket, + **kwargs): super().__init__(scope, id, **kwargs) worker_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'worker-placeholder')) self.service = ecs_patterns.QueueProcessingFargateService(self, 'separator-service', + cluster=cluster, cpu=256, memory_limit_mib=512, image=ecs.ContainerImage.from_asset(directory=worker_dir), - cluster=cluster) + environment={ + 'TRACKS_TABLE_NAME': tracks_table.table_name, + 'INPUT_BUCKET_NAME': input_bucket.bucket_name, + 'OUTPUT_BUCKET_NAME': output_bucket.bucket_name + }) + + input_bucket.grant_read(self.service.task_definition.task_role) + output_bucket.grant_write(self.service.task_definition.task_role) + tracks_table.grant_read_write_data(self.service.task_definition.task_role) class KaraokeApp(core.App): @@ -57,11 +103,17 @@ class KaraokeApp(core.App): self.base_resources = BaseResources(self, 'base-resources') - self.http_api = HttpApi(self, 'http-api', - cluster=self.base_resources.cluster) - self.separator_worker = SeparatorWorker(self, 'separator-worker', - cluster=self.base_resources.cluster) + cluster=self.base_resources.cluster, + input_bucket=self.base_resources.input_bucket, + output_bucket=self.base_resources.output_bucket, + tracks_table=self.base_resources.tracks_table) + + self.http_api = HttpApi(self, 'http-api', + cluster=self.base_resources.cluster, + processing_queue=self.separator_worker.service.sqs_queue, + tracks_table=self.base_resources.tracks_table, + upload_bucket=self.base_resources.input_bucket) app = KaraokeApp() diff --git a/requirements.txt b/requirements.txt index c74f3f1..936c747 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ aws-cdk.aws-certificatemanager==1.32.2 aws-cdk.aws-cloudformation==1.32.2 aws-cdk.aws-cloudfront==1.32.2 aws-cdk.aws-cloudwatch==1.32.2 +aws-cdk.aws-dynamodb==1.32.2 aws-cdk.aws-ec2==1.32.2 aws-cdk.aws-ecr==1.32.2 aws-cdk.aws-ecr-assets==1.32.2