Replacing the placeholder API with a real one.
This commit is contained in:
14
api/Dockerfile
Normal file
14
api/Dockerfile
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
FROM python:3
|
||||||
|
|
||||||
|
ENV PYTHONUNBUFFERED=1
|
||||||
|
ENV PYTHONDONTWRITEBYTECODE=1
|
||||||
|
ENV PORT 8080
|
||||||
|
ENV FLASK_APP app.py
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
COPY . /app
|
||||||
|
|
||||||
|
RUN pip install --no-cache -r requirements.txt
|
||||||
|
|
||||||
|
CMD ["flask", "run", "-h", "0.0.0.0", "-p", "8080"]
|
||||||
|
|
||||||
102
api/app.py
Normal file
102
api/app.py
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
import os
|
||||||
|
import json
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
|
import boto3
|
||||||
|
from flask import Flask, request
|
||||||
|
|
||||||
|
|
||||||
|
APP_NAME = 'karaoke-http-api'
|
||||||
|
VERSION = '0.10'
|
||||||
|
PROCESSING_QUEUE_URL = os.getenv('PROCESSING_QUEUE_URL')
|
||||||
|
TRACKS_TABLE_NAME = os.getenv('TRACKS_TABLE_NAME')
|
||||||
|
UPLOAD_BUCKET_NAME = os.getenv('UPLOAD_BUCKET_NAME')
|
||||||
|
UPLOAD_AUTH_TIMEOUT = int(os.getenv('UPLOAD_AUTH_TIMEOUT', 120))
|
||||||
|
app = Flask(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def get_dynamodb_client():
|
||||||
|
return boto3.client('dynamodb')
|
||||||
|
|
||||||
|
|
||||||
|
def get_s3_client():
|
||||||
|
return boto3.client('s3')
|
||||||
|
|
||||||
|
|
||||||
|
def get_sqs_client():
|
||||||
|
return boto3.client('sqs')
|
||||||
|
|
||||||
|
|
||||||
|
def strip_dynamodb_type_tags(obj):
|
||||||
|
return {key: list(value.values())[0]
|
||||||
|
for key, value in obj.items()}
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/')
|
||||||
|
def info():
|
||||||
|
return dict(name=APP_NAME, version=VERSION)
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/jobs', methods=['GET', 'POST'])
|
||||||
|
def jobs_resource():
|
||||||
|
dynamodb = get_dynamodb_client()
|
||||||
|
if request.method == 'POST':
|
||||||
|
job_id = str(uuid4())
|
||||||
|
|
||||||
|
s3_key = f'track_{job_id}'
|
||||||
|
s3_complete_url = f's3://{UPLOAD_BUCKET_NAME}/{s3_key}'
|
||||||
|
|
||||||
|
dynamodb.put_item(
|
||||||
|
TableName=TRACKS_TABLE_NAME,
|
||||||
|
Item={
|
||||||
|
'id': {'S': job_id},
|
||||||
|
'input_s3_url': {'S': s3_complete_url},
|
||||||
|
'status': {'S': 'uploading'}
|
||||||
|
})
|
||||||
|
|
||||||
|
# create the pre-signed upload URL
|
||||||
|
s3 = get_s3_client()
|
||||||
|
upload_data = s3.generate_presigned_post(UPLOAD_BUCKET_NAME, s3_key,
|
||||||
|
Fields=None,
|
||||||
|
Conditions=None,
|
||||||
|
ExpiresIn=UPLOAD_AUTH_TIMEOUT)
|
||||||
|
|
||||||
|
return dict(job_id=job_id, upload_data=upload_data)
|
||||||
|
|
||||||
|
response = dynamodb.scan(TableName=TRACKS_TABLE_NAME)
|
||||||
|
return dict(items=[strip_dynamodb_type_tags(el) for el in response['Items']],
|
||||||
|
count=response['Count'],
|
||||||
|
scanned_count=response['ScannedCount'])
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/jobs/<job_id>', methods=['GET'])
|
||||||
|
def get_job_details(job_id):
|
||||||
|
dynamodb = get_dynamodb_client()
|
||||||
|
response = dynamodb.get_item(TableName=TRACKS_TABLE_NAME, Key={'id': {'S': job_id}})
|
||||||
|
return strip_dynamodb_type_tags(response['Item'])
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/jobs/<job_id>/process', methods=['POST'])
|
||||||
|
def trigger_job_processing(job_id):
|
||||||
|
dynamodb = get_dynamodb_client()
|
||||||
|
response = dynamodb.get_item(TableName=TRACKS_TABLE_NAME, Key={'id': {'S': job_id}})
|
||||||
|
job = strip_dynamodb_type_tags(response['Item'])
|
||||||
|
|
||||||
|
if job['status'] != 'uploading':
|
||||||
|
return dict(status='failed', message='Job is in a wrong state'), 400
|
||||||
|
|
||||||
|
dynamodb.update_item(
|
||||||
|
TableName=TRACKS_TABLE_NAME,
|
||||||
|
Key={'id': {'S': job_id}},
|
||||||
|
AttributeUpdates={
|
||||||
|
'status': {'Value': {'S': 'processing'}}
|
||||||
|
})
|
||||||
|
|
||||||
|
sqs = get_sqs_client()
|
||||||
|
response = sqs.send_message(
|
||||||
|
QueueUrl=PROCESSING_QUEUE_URL,
|
||||||
|
MessageBody=json.dumps({
|
||||||
|
'job_id': job['id'],
|
||||||
|
'input_s3_url': job['input_s3_url']}))
|
||||||
|
|
||||||
|
return dict(status='processing', message_id=response['MessageId'])
|
||||||
2
api/requirements.txt
Normal file
2
api/requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
flask
|
||||||
|
boto3
|
||||||
70
app.py
70
app.py
@@ -3,9 +3,12 @@
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
from aws_cdk import core
|
from aws_cdk import core
|
||||||
|
import aws_cdk.aws_dynamodb as dynamodb
|
||||||
import aws_cdk.aws_ec2 as ec2
|
import aws_cdk.aws_ec2 as ec2
|
||||||
import aws_cdk.aws_ecs as ecs
|
import aws_cdk.aws_ecs as ecs
|
||||||
import aws_cdk.aws_ecs_patterns as ecs_patterns
|
import aws_cdk.aws_ecs_patterns as ecs_patterns
|
||||||
|
import aws_cdk.aws_s3 as s3
|
||||||
|
import aws_cdk.aws_sqs as sqs
|
||||||
|
|
||||||
from karaokeme_cdk.karaokeme_cdk_stack import KaraokemeCdkStack
|
from karaokeme_cdk.karaokeme_cdk_stack import KaraokemeCdkStack
|
||||||
|
|
||||||
@@ -21,34 +24,77 @@ class BaseResources(core.Stack):
|
|||||||
self.cluster = ecs.Cluster(self, 'karaoke-cluster',
|
self.cluster = ecs.Cluster(self, 'karaoke-cluster',
|
||||||
vpc=self.vpc)
|
vpc=self.vpc)
|
||||||
|
|
||||||
|
# DynamoDB table
|
||||||
|
self.tracks_table = dynamodb.Table(self, 'karaoke-tracks-table',
|
||||||
|
table_name='karaoke-tracks',
|
||||||
|
partition_key=dynamodb.Attribute(name='id', type=dynamodb.AttributeType.STRING),
|
||||||
|
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST)
|
||||||
|
|
||||||
|
# The bucket where uploaded files will be kept
|
||||||
|
self.input_bucket = s3.Bucket(self, 'input-bucket',
|
||||||
|
bucket_name='karaoke-uploaded-tracks')
|
||||||
|
|
||||||
|
# The bucket where processed files will be kept and served back to the public
|
||||||
|
self.output_bucket = s3.Bucket(self, 'output-bucket',
|
||||||
|
bucket_name='karaoke-separated-tracks',
|
||||||
|
public_read_access=True)
|
||||||
|
|
||||||
|
|
||||||
class HttpApi(core.Stack):
|
class HttpApi(core.Stack):
|
||||||
def __init__(self, scope, id, cluster: ecs.Cluster, **kwargs):
|
def __init__(self, scope, id,
|
||||||
|
cluster: ecs.Cluster,
|
||||||
|
tracks_table: dynamodb.Table,
|
||||||
|
processing_queue: sqs.Queue,
|
||||||
|
upload_bucket: s3.Bucket,
|
||||||
|
**kwargs):
|
||||||
super().__init__(scope, id, **kwargs)
|
super().__init__(scope, id, **kwargs)
|
||||||
|
|
||||||
api_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'api-placeholder'))
|
api_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'api'))
|
||||||
|
|
||||||
self.api = ecs_patterns.ApplicationLoadBalancedFargateService(self, 'http-api-service',
|
self.api = ecs_patterns.ApplicationLoadBalancedFargateService(self, 'http-api-service',
|
||||||
cluster=cluster,
|
cluster=cluster,
|
||||||
task_image_options=ecs_patterns.ApplicationLoadBalancedTaskImageOptions(
|
task_image_options=ecs_patterns.ApplicationLoadBalancedTaskImageOptions(
|
||||||
image=ecs.ContainerImage.from_asset(directory=api_dir),
|
image=ecs.ContainerImage.from_asset(directory=api_dir),
|
||||||
container_port=8080),
|
container_port=8080,
|
||||||
|
environment={
|
||||||
|
'PROCESSING_QUEUE_URL': processing_queue.queue_url,
|
||||||
|
'TRACKS_TABLE_NAME': tracks_table.table_name,
|
||||||
|
'UPLOAD_BUCKET_NAME': upload_bucket.bucket_name
|
||||||
|
}),
|
||||||
desired_count=2,
|
desired_count=2,
|
||||||
cpu=256,
|
cpu=256,
|
||||||
memory_limit_mib=512)
|
memory_limit_mib=512)
|
||||||
|
|
||||||
|
processing_queue.grant_send_messages(self.api.service.task_definition.task_role)
|
||||||
|
tracks_table.grant_read_write_data(self.api.service.task_definition.task_role)
|
||||||
|
upload_bucket.grant_put(self.api.service.task_definition.task_role)
|
||||||
|
|
||||||
|
|
||||||
class SeparatorWorker(core.Stack):
|
class SeparatorWorker(core.Stack):
|
||||||
def __init__(self, scope, id, cluster: ecs.Cluster, **kwargs):
|
def __init__(self, scope, id,
|
||||||
|
cluster: ecs.Cluster,
|
||||||
|
tracks_table: dynamodb.Table,
|
||||||
|
input_bucket: s3.Bucket,
|
||||||
|
output_bucket: s3.Bucket,
|
||||||
|
**kwargs):
|
||||||
super().__init__(scope, id, **kwargs)
|
super().__init__(scope, id, **kwargs)
|
||||||
|
|
||||||
worker_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'worker-placeholder'))
|
worker_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'worker-placeholder'))
|
||||||
|
|
||||||
self.service = ecs_patterns.QueueProcessingFargateService(self, 'separator-service',
|
self.service = ecs_patterns.QueueProcessingFargateService(self, 'separator-service',
|
||||||
|
cluster=cluster,
|
||||||
cpu=256,
|
cpu=256,
|
||||||
memory_limit_mib=512,
|
memory_limit_mib=512,
|
||||||
image=ecs.ContainerImage.from_asset(directory=worker_dir),
|
image=ecs.ContainerImage.from_asset(directory=worker_dir),
|
||||||
cluster=cluster)
|
environment={
|
||||||
|
'TRACKS_TABLE_NAME': tracks_table.table_name,
|
||||||
|
'INPUT_BUCKET_NAME': input_bucket.bucket_name,
|
||||||
|
'OUTPUT_BUCKET_NAME': output_bucket.bucket_name
|
||||||
|
})
|
||||||
|
|
||||||
|
input_bucket.grant_read(self.service.task_definition.task_role)
|
||||||
|
output_bucket.grant_write(self.service.task_definition.task_role)
|
||||||
|
tracks_table.grant_read_write_data(self.service.task_definition.task_role)
|
||||||
|
|
||||||
|
|
||||||
class KaraokeApp(core.App):
|
class KaraokeApp(core.App):
|
||||||
@@ -57,11 +103,17 @@ class KaraokeApp(core.App):
|
|||||||
|
|
||||||
self.base_resources = BaseResources(self, 'base-resources')
|
self.base_resources = BaseResources(self, 'base-resources')
|
||||||
|
|
||||||
self.http_api = HttpApi(self, 'http-api',
|
|
||||||
cluster=self.base_resources.cluster)
|
|
||||||
|
|
||||||
self.separator_worker = SeparatorWorker(self, 'separator-worker',
|
self.separator_worker = SeparatorWorker(self, 'separator-worker',
|
||||||
cluster=self.base_resources.cluster)
|
cluster=self.base_resources.cluster,
|
||||||
|
input_bucket=self.base_resources.input_bucket,
|
||||||
|
output_bucket=self.base_resources.output_bucket,
|
||||||
|
tracks_table=self.base_resources.tracks_table)
|
||||||
|
|
||||||
|
self.http_api = HttpApi(self, 'http-api',
|
||||||
|
cluster=self.base_resources.cluster,
|
||||||
|
processing_queue=self.separator_worker.service.sqs_queue,
|
||||||
|
tracks_table=self.base_resources.tracks_table,
|
||||||
|
upload_bucket=self.base_resources.input_bucket)
|
||||||
|
|
||||||
|
|
||||||
app = KaraokeApp()
|
app = KaraokeApp()
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ aws-cdk.aws-certificatemanager==1.32.2
|
|||||||
aws-cdk.aws-cloudformation==1.32.2
|
aws-cdk.aws-cloudformation==1.32.2
|
||||||
aws-cdk.aws-cloudfront==1.32.2
|
aws-cdk.aws-cloudfront==1.32.2
|
||||||
aws-cdk.aws-cloudwatch==1.32.2
|
aws-cdk.aws-cloudwatch==1.32.2
|
||||||
|
aws-cdk.aws-dynamodb==1.32.2
|
||||||
aws-cdk.aws-ec2==1.32.2
|
aws-cdk.aws-ec2==1.32.2
|
||||||
aws-cdk.aws-ecr==1.32.2
|
aws-cdk.aws-ecr==1.32.2
|
||||||
aws-cdk.aws-ecr-assets==1.32.2
|
aws-cdk.aws-ecr-assets==1.32.2
|
||||||
|
|||||||
Reference in New Issue
Block a user