Lambbda+PythonでPOSTされたmultipart/form-dataを扱う

AWS

はじめに

lambda+Pythonに対してmultipart/form-data形式でフォルダをアップロードしてS3に保存します。
実装に時間がかかったので、忘備録として残しています。

コードの概要

  • multipart/form-data のリクエストを処理し、フォームデータとファイルを解析。
    標準ライブラリではないのでzip化します。
pip3 install requests-toolbelt -t .
zip -r lambda_function.zip .
  • ファイルがある場合は S3 にアップロード し、S3 の URL をレスポンスとして返す。
  • requests-toolbelt を使用して multipart/form-data をパース。
  • boto3 を使って S3 にアップロード。

コード全文

import json
import base64
import re
import boto3
from io import BytesIO
from requests_toolbelt.multipart import decoder

# S3 設定
S3_BUCKET = "test-bucket"  # 🔹 S3 バケット名を指定
s3_client = boto3.client("s3")

def lambda_handler(event, context):
    try:
        print("\n=== EVENT START ===")
        print(event)  # Lambda に渡された event 全体をログに出力
        print("=== EVENT END ===\n")

        # Base64 エンコードされている場合、デコード
        if event.get("isBase64Encoded", False):
            print("Event body is Base64 encoded, decoding...")
            body = base64.b64decode(event["body"])  # バイナリデコード
        else:
            print("Event body is not Base64 encoded, encoding to bytes...")
            body = event["body"].encode("utf-8")  # 文字列ならバイト変換

        print(f"Decoded Body (length: {len(body)} bytes): {body[:500]}\n")  # 長すぎる場合、最初の500バイトだけ出力

        # Content-Type ヘッダーの取得
        content_type = event["headers"].get("Content-Type") or event["headers"].get("content-type")
        print(f"Content-Type: {content_type}")

        # `boundary` の取得
        boundary_match = re.search(r"boundary=(.+)", content_type)
        if not boundary_match:
            raise ValueError("Invalid Content-Type: Missing boundary")

        boundary = boundary_match.group(1).encode()
        print(f"Boundary: {boundary}\n")

        # `requests-toolbelt` を使って multipart データを解析
        multipart_data = decoder.MultipartDecoder(body, content_type)

        parsed_data = {}

        for i, part in enumerate(multipart_data.parts):
            print(f"\n--- Processing Part {i+1} ---")
            print(f"Headers: {part.headers}")
            
            # `Content-Disposition` から `name` を取得
            content_disposition = part.headers.get(b"Content-Disposition", b"").decode(errors="ignore")
            print(f"Content-Disposition: {content_disposition}")

            name_match = re.search(r'name="([^"]+)"', content_disposition)
            if not name_match:
                print("No 'name' found in Content-Disposition, skipping...")
                continue

            name = name_match.group(1)
            print(f"Field Name: {name}")

            # ファイルの場合
            if "filename=" in content_disposition:
                filename_match = re.search(r'filename="([^"]+)"', content_disposition)
                filename = filename_match.group(1) if filename_match else "unknown"
                print(f"Filename: {filename}")

                file_size = len(part.content)
                print(f"File Size: {file_size} bytes")

                # S3 にアップロード
                S3_KEY = f"uploads/{filename}"  # S3 のキーを指定
                s3_client.put_object(
                    Bucket=S3_BUCKET,
                    Key=S3_KEY,
                    Body=part.content,
                    ContentType=part.headers.get(b"Content-Type", b"").decode("utf-8", errors="ignore")
                )
                print(f"File uploaded to S3: {S3_KEY}")

                # S3 の URL を生成
                file_url = f"https://{S3_BUCKET}.s3.amazonaws.com/{S3_KEY}"
                parsed_data["file_url"] = file_url
                parsed_data["file_name"] = filename
                parsed_data["file_size"] = file_size
            else:
                # 通常のフォームデータ
                parsed_data[name] = part.text
                print(f"Value: {parsed_data[name]}")

        print("\n=== Parsed Data ===")
        print(parsed_data)

        return {
            "statusCode": 200,
            "body": json.dumps({
                "title": parsed_data.get("title", "No title"),
                "file_name": parsed_data.get("file_name", "No file"),
                "file_size": parsed_data.get("file_size", 0),
                "file_url": parsed_data.get("file_url", "No file URL"),
            })
        }

    except Exception as e:
        print("\n=== Unhandled Exception ===")
        print(str(e))
        return {
            "statusCode": 500,
            "body": f"Internal Server Error: {str(e)}"
        }

解説

使用ライブラリ

ライブラリ用途
jsonレスポンスの JSON 変換
base64Base64 エンコード・デコード
re正規表現 (boundary の解析)
boto3AWS S3 へのアップロード
io.BytesIOバイナリデータの処理
requests-toolbelt.multipart.decodermultipart/form-data の解析

S3 設定

S3_BUCKET = "test-locomoco"
s3_client = boto3.client("s3")
  • S3_BUCKET に S3 バケットの名前を指定。
  • boto3.client("s3") を使用して S3 クライアントを作成。

multipart/form-data を解析

Base64 デコード

if event.get("isBase64Encoded", False):
    body = base64.b64decode(event["body"])  # バイナリデコード
else:
    body = event["body"].encode("utf-8")  # 文字列ならバイト変換
  • API Gateway から受け取ったリクエストが Base64 エンコードされている場合はデコード

boundary の取得

content_type = event["headers"].get("Content-Type") or event["headers"].get("content-type")
boundary_match = re.search(r"boundary=(.+)", content_type)
if not boundary_match:
    raise ValueError("Invalid Content-Type: Missing boundary")
boundary = boundary_match.group(1).encode()
  • multipart/form-databoundary を取得し、データを正しく分割できるようにする。

multipart/form-data の解析

multipart_data = decoder.MultipartDecoder(body, content_type)
  • requests-toolbelt を使って multipart/form-data をデコードし、各パートに分解。

フォームデータとファイルを処理

for i, part in enumerate(multipart_data.parts):
    content_disposition = part.headers.get(b"Content-Disposition", b"").decode(errors="ignore")
    name_match = re.search(r'name="([^"]+)"', content_disposition)

    if not name_match:
        continue  # `name` がない場合スキップ
    name = name_match.group(1)
  • 各パートの Content-Disposition を解析し、name を取得。

ファイルデータの処理

if "filename=" in content_disposition:
    filename_match = re.search(r'filename="([^"]+)"', content_disposition)
    filename = filename_match.group(1) if filename_match else "unknown"
    file_size = len(part.content)
  • filename がある場合、ファイルと判定
  • ファイル名を取得し、サイズ (file_size) を記録。

S3 にアップロード

S3_KEY = f"uploads/{filename}"  # S3 のキーを指定
s3_client.put_object(
    Bucket=S3_BUCKET,
    Key=S3_KEY,
    Body=part.content,
    ContentType=part.headers.get(b"Content-Type", b"").decode("utf-8", errors="ignore")
)
  • uploads/ ディレクトリ配下にアップロード
  • ContentType を S3 に反映(例: image/png, application/pdf)。

S3 のファイル URL を取得

file_url = f"https://{S3_BUCKET}.s3.amazonaws.com/{S3_KEY}"
parsed_data["file_url"] = file_url
  • アップロードされたファイルの S3 の URL をレスポンスに含める

必要な IAM ポリシー

Lambda から S3 に書き込むには、以下の IAM ポリシーをアタッチする必要があります。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::test-locomoco/uploads/*"
        }
    ]
}

コメント

タイトルとURLをコピーしました