Source code for sobe.aws

"""Everything related to AWS. In the future, we may support other cloud providers."""

import datetime
import json
import pathlib
import time

import boto3
import botocore.exceptions

from sobe.config import AWSConfig


[docs] class AWS:
[docs] def __init__(self, config: AWSConfig) -> None: self.config = config self._session = boto3.Session(**self.config.session) self._s3_resource = self._session.resource("s3", **self.config.service) self._bucket = self._s3_resource.Bucket(self.config.bucket) # type: ignore[attr-defined] self._cloudfront = self._session.client("cloudfront", **self.config.service)
[docs] def upload(self, prefix: str, local_path: pathlib.Path, remote_name: str = "", *, content_type: str = "") -> None: """Upload a file.""" extra_args = {"ContentType": content_type or guess_content_type(local_path)} if not remote_name: remote_name = local_path.name self._bucket.upload_file(str(local_path), f"{prefix}{remote_name}", ExtraArgs=extra_args)
[docs] def delete(self, prefix: str, remote_filename: str) -> bool: """Delete a file, if it exists. Returns whether it did.""" obj = self._bucket.Object(f"{prefix}{remote_filename}") try: obj.load() obj.delete() return True except botocore.exceptions.ClientError as e: if e.response.get("Error", {}).get("Code") == "404": return False raise
[docs] def list(self, prefix: str) -> list[str]: """Return a list of object filenames in the given prefix.""" objects = self._bucket.objects.filter(Prefix=prefix) pos1 = len(prefix) results = set() for obj in objects: if len(obj.key) == pos1: continue # skip the prefix entry itself pos2 = obj.key.find("/", pos1) if pos2 == -1: results.add(obj.key[pos1:]) else: results.add(obj.key[pos1:pos2] + "/") return sorted(results)
[docs] def invalidate_cache(self): """Create and wait for a full-path CloudFront invalidation. Iterates until completion.""" ref = datetime.datetime.now().astimezone().isoformat() batch = {"Paths": {"Quantity": 1, "Items": ["/*"]}, "CallerReference": ref} distribution = self.config.cloudfront response = self._cloudfront.create_invalidation(DistributionId=distribution, InvalidationBatch=batch) invalidation = response["Invalidation"]["Id"] status = "Created" while status != "Completed": yield status time.sleep(3) response = self._cloudfront.get_invalidation(DistributionId=distribution, Id=invalidation) status = response["Invalidation"]["Status"]
[docs] def generate_needed_permissions(self) -> str: """Return the minimal IAM policy statement required by the tool.""" try: sts = self._session.client("sts", **self.config.service) account_id = sts.get_caller_identity()["Account"] except botocore.exceptions.ClientError: account_id = "YOUR_ACCOUNT_ID" actions = """ s3:PutObject s3:GetObject s3:ListBucket s3:DeleteObject cloudfront:CreateInvalidation cloudfront:GetInvalidation """.split() resources = [ f"arn:aws:s3:::{self.config.bucket}", f"arn:aws:s3:::{self.config.bucket}/*", f"arn:aws:cloudfront::{account_id}:distribution/{self.config.cloudfront}", ] statement = {"Effect": "Allow", "Action": actions, "Resource": resources} policy = {"Version": "2012-10-17", "Statement": [statement]} return json.dumps(policy, indent=2)
[docs] def guess_content_type(path: pathlib.Path) -> str: """Return a guessed content type for the given file.""" import mimetypes # Guess based on filename using standard library guess, _ = mimetypes.guess_type(path.name) if guess: return guess import puremagic # Guess based on file content using puremagic for result in puremagic.magic_file(path): # result is ordered by confidence guess = getattr(result, "mime_type", None) if guess: return guess # Fallback return "application/octet-stream"