To implement a "Disappearing Batch" feature where batches automatically expire after a specified duration, follow this structured approach:
Core Components
- Batch Class: Represents a batch with metadata and expiration logic.
- BatchManager: Handles batch creation, retrieval, and cleanup.
- Background Cleaner: Periodically removes expired batches.
Implementation Code
import time
import threading
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class Batch:
id: str
items: List
created_at: float
lifespan: float # seconds until expiration
def is_expired(self, current_time: Optional[float] = None) -> bool:
"""Check if the batch has expired."""
if current_time is None:
current_time = time.time()
return (current_time - self.created_at) > self.lifespan
class BatchManager:
def __init__(self):
self.batches = {}
self.lock = threading.Lock()
self.start_background_cleaner()
def create_batch(self, items: List, lifespan: float) -> str:
"""Create a new batch with a specified lifespan."""
batch_id = f"batch_{int(time.time() * 1000)}" # Unique ID
batch = Batch(
id=batch_id,
items=items,
created_at=time.time(),
lifespan=lifespan
)
with self.lock:
self.batches[batch_id] = batch
return batch_id
def get_batch(self, batch_id: str) -> Optional[Batch]:
"""Retrieve a batch if it exists and hasn't expired."""
with self.lock:
batch = self.batches.get(batch_id)
if batch and batch.is_expired():
del self.batches[batch_id]
return None
return batch
def cleanup_expired_batches(self):
"""Remove all expired batches."""
current_time = time.time()
with self.lock:
expired_ids = [
bid for bid, batch in self.batches.items()
if batch.is_expired(current_time)
]
for bid in expired_ids:
del self.batches[bid]
def start_background_cleaner(self):
"""Start a background thread to clean expired batches periodically."""
def cleaner_loop():
while True:
time.sleep(60) # Check every 60 seconds
self.cleanup_expired_batches()
thread = threading.Thread(target=cleaner_loop, daemon=True)
thread.start()
def stop_background_cleaner(self):
"""Stop the background cleaner (if needed)."""
# For daemon threads, this is optional; they'll exit when the program exits.
pass
Key Features
- Automatic Expiration: Batches disappear after their
lifespan. - Thread Safety: Uses locks to handle concurrent access.
- Lazy Cleanup: Expired batches are removed on access or via background cleaner.
- Background Cleaner: Runs every 60 seconds to remove expired batches.
Usage Example
# Create a batch that expires in 10 seconds
batch_id = manager.create_batch(["item1", "item2"], lifespan=10)
print(f"Created batch: {batch_id}")
# Access the batch immediately
batch = manager.get_batch(batch_id)
print(f"Batch items: {batch.items if batch else 'Expired'}")
# Wait for expiration
time.sleep(11)
# Try accessing after expiration
batch = manager.get_batch(batch_id)
print(f"Batch items: {batch.items if batch else 'Expired'}") # Output: Expired
Output Explanation
- Immediately after creation:
Batch items: ['item1', 'item2']. - After 11 seconds:
Batch items: Expired(batch has disappeared).
Customization Options
- Cleanup Interval: Adjust
time.sleep(60)incleaner_loopfor more/less frequent checks. - Persistence: Replace in-memory
self.batcheswith a database (e.g., Redis, SQL). - Non-Daemon Thread: Use a non-daemon thread if you need explicit cleanup before exit.
This implementation ensures batches automatically expire while providing thread-safe access and efficient background cleanup.
Request an On-site Audit / Inquiry