Source code for aws_ops_alpha.vendor.aws_s3_lock

# -*- coding: utf-8 -*-

"""
This module provides a distributed lock using AWS S3 backend.

Requirements::

    python>=3.7
    boto3

Usage:

.. code-block:: python

    import boto3
    from aws_s3_lock.py import Lock, Vault, get_utc_now, AlreadyLockedError

    s3_client = boto3.client("s3")

    # define a vault backend
    vault = Vault(bucket="my-bucket", key="my-task.json", expire=900, wait=1.0)

    # acquire the lock before doing any task
    lock = vault.acquire(s3_client) # or vault.acquire(s3_client, owner="alice")

    # do yor task that requires the distributed lock here
    ...

    # release the lock after the task is done
    vault.release(s3_client, lock)
"""

import typing as T
import json
import time
import uuid
import dataclasses
from datetime import datetime, timezone

if T.TYPE_CHECKING:  # pragma: no cover
    from mypy_boto3_s3.client import S3Client

__version__ = "0.1.1"

def get_utc_now() -> datetime:
    return datetime.utcnow().replace(tzinfo=timezone.utc)


EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc)


[docs]class AlreadyLockedError(RuntimeError): pass
[docs]@dataclasses.dataclass class Lock: expire: int = dataclasses.field() lock_time: str = dataclasses.field() release_time: T.Optional[str] = dataclasses.field() owner: T.Optional[str] = dataclasses.field() def to_json(self) -> str: return json.dumps(dataclasses.asdict(self)) @classmethod def from_json(cls, json_str: str) -> "Lock": return cls(**json.loads(json_str)) @property def lock_datetime(self) -> datetime: return datetime.fromisoformat(self.lock_time) def is_expired(self, utc_now: datetime) -> bool: return (utc_now - self.lock_datetime).total_seconds() >= self.expire
[docs] def is_locked( self, utc_now: datetime, expect_owner: str, ) -> bool: """ From the expect_owner point of view, is the lock locked? - if the lock has no owner, then it is NOT LOCKED - if the lock's owner matches the expected owner, then it is NOT LOCKED - if the lock's owner is not the expected owner: - if the lock is expired, then it is NOT LOCKED - if the lock is not expired, then it is LOCKED """ if self.owner is None: return False if self.owner == expect_owner: return False else: if self.is_expired(utc_now=utc_now): return False else: return True
[docs]@dataclasses.dataclass class Vault: """ A vault is an S3 object to store a lock. :param bucket: the S3 bucket. :param key: the S3 key. :param expire: how long the lock will expire in seconds. :param wait: how long we should wait after we thought we successfully acquired the lock, before actually doing any task. """ bucket: str = dataclasses.field() key: str = dataclasses.field() expire: int = dataclasses.field() wait: float = dataclasses.field(default=1.0) def _read(self, s3_client: "S3Client") -> Lock: try: response = s3_client.get_object(Bucket=self.bucket, Key=self.key) return Lock.from_json(response["Body"].read().decode("utf-8")) except Exception as e: if "NoSuchKey" in str(e): lock = Lock( expire=self.expire, lock_time=EPOCH.isoformat(), release_time=None, owner=None, ) self._write(s3_client=s3_client, lock=lock) return lock else: # pragma: no cover raise e def _write(self, s3_client: "S3Client", lock: Lock): s3_client.put_object( Bucket=self.bucket, Key=self.key, Body=lock.to_json(), ContentType="application/json", )
[docs] def acquire(self, s3_client: "S3Client", owner: T.Optional[str] = None) -> Lock: """ The owner tries to acquire the lock. - If failed, raise AlreadyLockedError. - If the lock is already owned by the owner, and still not expired, then refresh lock_time. :param s3_client: :param owner: the owner of the lock. If None, then a random uuid will be used. """ if owner is None: owner = uuid.uuid4().hex # read the current lock lock = self._read(s3_client=s3_client) utc_now = get_utc_now() if lock.is_locked(utc_now=utc_now, expect_owner=owner): raise AlreadyLockedError(f"Lock is already acquired by {lock}") lock.expire = self.expire lock.lock_time = utc_now.isoformat() lock.release_time = None lock.owner = owner self._write(s3_client, lock) if self.wait > 0: time.sleep(self.wait) lock = self._read(s3_client=s3_client) if lock.owner != owner: # pragma: no cover raise AlreadyLockedError(f"Lock is already acquired by {lock}") return lock
[docs] def release(self, s3_client: "S3Client", lock: Lock) -> Lock: """ Release the lock. Set the owner as None and update release time. :param s3_client: :param lock: the lock to release. """ lock.release_time = get_utc_now().isoformat() lock.owner = None self._write(s3_client, lock) return lock