A Guide to Using FastAPI Lifespan Events


The Definitive Guide to FastAPI Lifespan Events: Mastering Application Startup and Shutdown

FastAPI has rapidly become a go-to framework for building high-performance APIs with Python, thanks to its modern features, asynchronous support, automatic documentation, and developer-friendly design. As applications grow in complexity, managing their lifecycle – specifically, executing code during startup before handling requests and performing cleanup during shutdown – becomes crucial. This is where FastAPI’s lifespan event handling mechanism shines.

Traditionally, web frameworks might offer simple startup and shutdown hooks. FastAPI initially provided @app.on_event("startup") and @app.on_event("shutdown") decorators. However, the modern, recommended, and more powerful approach leverages the ASGI (Asynchronous Server Gateway Interface) lifespan protocol directly via a lifespan context manager function.

This guide provides a comprehensive exploration of FastAPI’s lifespan events. We’ll cover:

  1. What Lifespan Events Are and Why They Matter: Understanding the problem and the solution.
  2. The Evolution from startup/shutdown Events: Why the lifespan context manager is preferred.
  3. Implementing the lifespan Context Manager: Syntax, structure, and basic usage.
  4. Core Use Cases: Detailed examples for common scenarios like database connections, ML model loading, caching, background tasks, and more.
  5. Sharing Resources with app.state: Making resources initialized during lifespan available to your request handlers.
  6. Robust Error Handling: Managing exceptions during startup and shutdown phases.
  7. Testing Applications with Lifespan Events: Strategies for effective testing.
  8. Advanced Considerations and Best Practices: Tips for optimizing and maintaining lifespan logic.
  9. Troubleshooting Common Issues: Identifying and fixing potential problems.

By the end of this guide, you’ll have a deep understanding of how to effectively use FastAPI’s lifespan events to build robust, efficient, and maintainable applications.

1. What are Lifespan Events and Why Do They Matter?

At its core, lifespan management in a web application context refers to the ability to execute specific logic at two critical points in the application’s lifecycle:

  1. Startup: Just after the server process starts but before it begins accepting and processing incoming requests.
  2. Shutdown: Just before the server process exits, typically after it has stopped accepting new requests and finished processing ongoing ones.

Why is this necessary?

Many applications rely on external resources or require initial setup that shouldn’t happen on a per-request basis. Consider these common needs:

  • Database Connection Pools: Establishing a pool of database connections is resource-intensive. Doing it once at startup is far more efficient than creating connections for each request or managing global state awkwardly. The pool also needs to be closed gracefully at shutdown to release resources.
  • Machine Learning Models: Loading large ML models into memory can take significant time and consume substantial RAM. This should happen once when the application starts, not delaying the first user request (or worse, every request).
  • Cache Connections: Initializing connections to caching systems like Redis or Memcached.
  • Background Task Schedulers: Starting scheduler processes (like APScheduler) that need to run alongside the web server.
  • Configuration Loading & Validation: Reading configuration files, validating settings, and perhaps fetching dynamic configuration from a central service.
  • Initializing External Service Clients: Setting up authenticated clients for third-party APIs or microservices.
  • Resource Seeding/Priming: Populating caches or performing initial data setup.

Attempting to manage these tasks without dedicated lifecycle hooks leads to several problems:

  • Inefficiency: Initializing resources on the first request causes significant latency for that user.
  • Complexity: Managing global state manually for resources like connection pools can be error-prone and difficult to test.
  • Resource Leaks: Failure to properly clean up resources (like closing database connections or stopping background threads) during shutdown can lead to leaks and instability.
  • Race Conditions: Trying to initialize shared resources lazily within request handlers can lead to race conditions in concurrent environments.

Lifespan events provide a clean, standardized, and reliable way to handle these setup and teardown requirements directly within the framework’s lifecycle management. FastAPI’s implementation leverages the ASGI lifespan protocol, ensuring compatibility with ASGI servers like Uvicorn, Hypercorn, and Daphne.

2. The Evolution: From startup/shutdown to lifespan

FastAPI versions prior to 0.93.0 primarily used @app.on_event("startup") and @app.on_event("shutdown") decorators.

“`python

Old way (Deprecated)

from fastapi import FastAPI
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)

app = FastAPI()

Dummy resource state

resources = {“db_connection”: None, “ml_model”: None}

@app.on_event(“startup”)
async def startup_event():
logger.info(“Application starting up…”)
# Simulate initializing resources
resources[“db_connection”] = “dummy_db_connection_object”
resources[“ml_model”] = “dummy_ml_model_object”
logger.info(“Resources initialized.”)

@app.on_event(“shutdown”)
async def shutdown_event():
logger.info(“Application shutting down…”)
# Simulate cleaning up resources
resources[“db_connection”] = None
resources[“ml_model”] = None
logger.info(“Resources cleaned up.”)

@app.get(“/”)
async def read_root():
# Accessing resources requires careful handling (e.g., global or complex state management)
return {“message”: “Hello World”, “db”: bool(resources[“db_connection”]), “model”: bool(resources[“ml_model”])}

To run this (conceptually): uvicorn main:app –reload

“`

While functional, this approach had limitations:

  1. Separation: Startup and shutdown logic were in separate functions, making it harder to see the complete lifecycle of a resource in one place.
  2. State Management: Sharing state (like the db_connection or ml_model) between the startup events and the request handlers often relied on global variables or complex dependency injection patterns, which could be less explicit and harder to manage.
  3. ASGI Standard: The ASGI specification defines a lifespan protocol using an asgi.lifespan scope. Directly using this protocol provides a more standard and potentially more robust way to handle lifecycle events, especially concerning how servers manage application startup failures or graceful shutdowns.

Recognizing these points, FastAPI adopted the lifespan context manager approach, which aligns directly with the ASGI standard and offers a more elegant and Pythonic way (async with) to manage resource lifecycles. The on_event handlers are now deprecated and will be removed in future FastAPI versions. All new development should use the lifespan context manager.

3. Implementing the lifespan Context Manager

The core idea is to define an async function decorated with @contextlib.asynccontextmanager. This function yields exactly once. The code before the yield runs during application startup, and the code after the yield runs during application shutdown.

Syntax:

“`python
import contextlib
from fastapi import FastAPI
import asyncio
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)

Define the lifespan context manager

@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
# Code to run on startup
logger.info(“Lifespan startup: Initializing resources…”)
# Example: Simulate resource initialization
await asyncio.sleep(1) # Simulate async setup task
startup_resource = {“status”: “initialized”}
logger.info(f”Lifespan startup: Resources initialized: {startup_resource}”)

yield # <--- The application runs while yielded

# Code to run on shutdown
logger.info("Lifespan shutdown: Cleaning up resources...")
# Example: Simulate resource cleanup
await asyncio.sleep(0.5) # Simulate async cleanup task
startup_resource = None # Clear the resource
logger.info("Lifespan shutdown: Resources cleaned up.")

Create the FastAPI app and pass the lifespan manager

app = FastAPI(lifespan=lifespan)

@app.get(“/”)
async def read_root():
# Note: startup_resource is NOT directly accessible here yet.
# We’ll cover sharing state later.
return {“message”: “Hello from the running application!”}

To run this: uvicorn main:app –reload

“`

Explanation:

  1. import contextlib: This standard Python library provides the necessary decorator.
  2. @contextlib.asynccontextmanager: This decorator transforms our async def function into an asynchronous context manager.
  3. async def lifespan(app: FastAPI)::
    • The function must be async def.
    • It receives the FastAPI application instance (app) as an argument. This is useful if you need access to app configuration or want to attach state (more on this later).
    • The name lifespan is conventional but not mandatory; what matters is passing the function to the FastAPI(lifespan=...) parameter.
  4. Code Before yield: This block is executed when the ASGI server (like Uvicorn) initiates the lifespan protocol’s startup phase. Any exceptions raised here will typically prevent the application from starting successfully. This is where you initialize connection pools, load models, etc.
  5. yield: This is the crucial part.
    • Execution pauses here.
    • Control is yielded back to the ASGI server, signaling that startup is complete.
    • The server now starts accepting and processing incoming HTTP requests. The application runs normally.
  6. Code After yield: This block is executed when the ASGI server initiates the lifespan protocol’s shutdown phase (e.g., when receiving a SIGTERM signal). This happens after the server has stopped accepting new connections and ideally finished processing existing requests (graceful shutdown). This is where you close connections, release resources, stop background tasks, etc. It’s often wrapped in a finally block to ensure cleanup happens even if the application encounters an error while running (though errors during startup or within the shutdown block itself need separate handling).

Running the Example:

If you run the code above with Uvicorn (uvicorn main:app), you’ll see output similar to this in your console:

“`
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Application startup complete.
INFO: Lifespan startup: Initializing resources…
INFO: Lifespan startup: Resources initialized: {‘status’: ‘initialized’}

<– Application is now ready and serving requests –>

(Make a request to http://127.0.0.1:8000/ in your browser or with curl)

INFO: 127.0.0.1:xxxxx – “GET / HTTP/1.1” 200 OK

<– Press CTRL+C to stop the server –>

INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Lifespan shutdown: Cleaning up resources…
INFO: Lifespan shutdown: Resources cleaned up.
INFO: Application shutdown complete.
INFO: Finished server process [xxxxx]
“`

This clearly demonstrates the execution flow: startup code runs first, then the application serves requests, and finally, shutdown code runs when the server exits.

4. Core Use Cases for Lifespan Events

Let’s dive into practical examples of how to use the lifespan context manager for common application requirements.

4.1. Managing Database Connection Pools (e.g., asyncpg for PostgreSQL)

Database interactions are fundamental to many APIs. Maintaining a pool of connections is essential for performance. asyncpg is a popular asynchronous library for PostgreSQL.

“`python
import contextlib
from fastapi import FastAPI, Depends, Request, HTTPException
import asyncpg
import logging
import os
from pydantic import BaseModel

— Configuration —

DATABASE_URL = os.getenv(“DATABASE_URL”, “postgresql://user:password@host:port/dbname”)
MIN_POOL_SIZE = int(os.getenv(“MIN_POOL_SIZE”, “1”))
MAX_POOL_SIZE = int(os.getenv(“MAX_POOL_SIZE”, “10”))

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)

— Lifespan Management —

@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
logger.info(“Lifespan: Connecting to database…”)
try:
# Create connection pool
pool = await asyncpg.create_pool(
DATABASE_URL,
min_size=MIN_POOL_SIZE,
max_size=MAX_POOL_SIZE
)
logger.info(“Lifespan: Database connection pool created.”)
# Store the pool in app.state for access in request handlers
app.state.db_pool = pool
except Exception as e:
logger.error(f”Lifespan: Failed to connect to database: {e}”)
# Optionally re-raise or handle differently to prevent app start
raise RuntimeError(“Database connection failed during startup”) from e

yield # Application runs here

logger.info("Lifespan: Closing database connection pool...")
# Ensure pool exists before trying to close (robustness)
if hasattr(app.state, "db_pool") and app.state.db_pool:
    await app.state.db_pool.close()
    logger.info("Lifespan: Database connection pool closed.")
else:
    logger.warning("Lifespan: No database pool found in state to close.")

app = FastAPI(lifespan=lifespan)

— Dependency for getting a connection —

async def get_db_connection(request: Request) -> asyncpg.Connection:
# Retrieve the pool from app.state
pool: asyncpg.Pool = request.app.state.db_pool
# Acquire a connection from the pool
async with pool.acquire() as connection:
# Yield the connection to the path operation function
yield connection
# Connection is automatically released back to the pool here

— Pydantic Model —

class Item(BaseModel):
id: int
name: str

— Path Operation —

@app.get(“/items/{item_id}”, response_model=Item)
async def read_item(item_id: int, db: asyncpg.Connection = Depends(get_db_connection)):
try:
row = await db.fetchrow(“SELECT id, name FROM items WHERE id = $1″, item_id)
if row:
return Item(id=row[‘id’], name=row[‘name’])
raise HTTPException(status_code=404, detail=”Item not found”)
except asyncpg.PostgresError as e:
logger.error(f”Database error fetching item {item_id}: {e}”)
raise HTTPException(status_code=500, detail=”Database error”)

@app.get(“/”)
async def health_check(request: Request):
# Basic check if pool seems available
pool_status = “available” if hasattr(request.app.state, “db_pool”) and request.app.state.db_pool else “unavailable”
return {“message”: “API is running”, “db_pool_status”: pool_status}

To run:

1. Set DATABASE_URL environment variable

2. Install asyncpg: pip install asyncpg

3. Run with Uvicorn: uvicorn main:app –reload

(Requires a running PostgreSQL database matching DATABASE_URL)

“`

Key elements:

  • The lifespan function creates the asyncpg pool before yield.
  • Crucially, the pool is stored in app.state.db_pool. We’ll discuss app.state in detail shortly, but this is how we make the pool accessible later.
  • Error handling during pool creation is included. A failure here prevents the app from starting cleanly.
  • The code after yield closes the pool. It checks if app.state.db_pool exists before attempting closure for robustness.
  • The get_db_connection dependency function retrieves the pool from request.app.state and uses pool.acquire() to get a connection for a single request. The async with ensures the connection is released back to the pool.

4.2. Loading Machine Learning Models

Loading large ML models (e.g., from scikit-learn, TensorFlow, PyTorch, or spaCy) is often time-consuming. Lifespan ensures this happens only once.

“`python
import contextlib
from fastapi import FastAPI, Depends, Request
import joblib # Example for scikit-learn models, use appropriate lib for others
import logging
import os
import time

— Configuration —

MODEL_PATH = os.getenv(“MODEL_PATH”, “./model.joblib”) # Path to your trained model file

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)

— Lifespan Management —

@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
logger.info(“Lifespan: Loading ML model…”)
start_time = time.time()
try:
# Load the model (synchronous loading in this example)
# For very large models or GPU init, consider running in a thread pool
# using asyncio.to_thread (Python 3.9+) or loop.run_in_executor
model = joblib.load(MODEL_PATH)
duration = time.time() – start_time
logger.info(f”Lifespan: ML model loaded successfully from {MODEL_PATH} in {duration:.2f}s.”)
# Store the model in app.state
app.state.ml_model = model
except FileNotFoundError:
logger.error(f”Lifespan: Model file not found at {MODEL_PATH}. Application will start without model.”)
app.state.ml_model = None # Explicitly set to None
# Decide if this is critical. If so, raise an error:
# raise RuntimeError(f”Model file not found at {MODEL_PATH}”)
except Exception as e:
logger.error(f”Lifespan: Failed to load ML model: {e}”)
app.state.ml_model = None
# Decide if this is critical
# raise RuntimeError(“ML Model loading failed”) from e

yield # Application runs here

logger.info("Lifespan: Shutting down. Releasing ML model reference.")
# Python's garbage collector will handle memory release when app state is cleared,
# but explicitly setting to None is good practice.
# If the model needs explicit cleanup (e.g., closing TF session), do it here.
if hasattr(app.state, "ml_model"):
     app.state.ml_model = None
logger.info("Lifespan: ML model reference released.")

app = FastAPI(lifespan=lifespan)

— Dependency to get the model —

def get_ml_model(request: Request):
if not hasattr(request.app.state, “ml_model”) or request.app.state.ml_model is None:
raise HTTPException(status_code=503, detail=”ML Model not available”)
return request.app.state.ml_model

— Path Operation —

@app.post(“/predict”)
async def predict(data: dict, model = Depends(get_ml_model)):
# Assume ‘data’ contains features compatible with the model
# Note: model.predict might be synchronous. For CPU-bound tasks,
# run it in a thread pool executor to avoid blocking the event loop:
# import asyncio
# loop = asyncio.get_running_loop()
# prediction = await loop.run_in_executor(None, model.predict, feature_vector)

try:
    # Example: Directly using a scikit-learn style model
    # This is SYNCHRONOUS and will BLOCK the event loop for its duration.
    # Needs careful consideration for production systems.
    prediction = model.predict([list(data.values())]) # Simplify input data processing
    return {"prediction": prediction.tolist()}
except Exception as e:
    logger.error(f"Prediction error: {e}")
    raise HTTPException(status_code=400, detail=f"Error during prediction: {e}")

@app.get(“/model_status”)
async def model_status(request: Request):
status = “loaded” if hasattr(request.app.state, “ml_model”) and request.app.state.ml_model else “not loaded”
return {“model_status”: status}

To run:

1. Create a dummy model file:

python -c “import joblib; from sklearn.linear_model import LogisticRegression; lr = LogisticRegression(); joblib.dump(lr, ‘./model.joblib’)”

2. Install joblib & scikit-learn: pip install joblib scikit-learn

3. Set MODEL_PATH if needed.

4. Run with Uvicorn: uvicorn main:app –reload

“`

Considerations:

  • Synchronous Loading: joblib.load (and many model loading functions) are synchronous. If loading takes a long time, it will block the event loop during startup. For very large models, consider using asyncio.to_thread (Python 3.9+) or loop.run_in_executor to load the model in a separate thread, preventing startup blockage.
  • Prediction Blocking: Similarly, model.predict() is often synchronous and CPU-bound. Running predictions directly in an async def path operation will block the event loop. Use asyncio.to_thread or loop.run_in_executor for predictions as well in production.
  • GPU Resources: If using GPU-accelerated models (TensorFlow, PyTorch), initialization and cleanup might involve specific library calls (e.g., managing CUDA contexts) which should be placed in the lifespan handler.
  • Error Handling: Decide whether a model loading failure is critical. If the application cannot function without the model, raise an exception in the lifespan startup phase to prevent the app from serving requests.

4.3. Initializing Cache Connections (e.g., Redis with redis-py)

Connecting to a cache like Redis at startup ensures it’s ready for use immediately.

“`python
import contextlib
from fastapi import FastAPI, Depends, Request, HTTPException
import redis.asyncio as redis # Use the async version
import logging
import os

— Configuration —

REDIS_URL = os.getenv(“REDIS_URL”, “redis://localhost:6379/0”)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)

— Lifespan Management —

@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
logger.info(“Lifespan: Connecting to Redis…”)
try:
# Create Redis connection pool
redis_pool = redis.ConnectionPool.from_url(REDIS_URL, decode_responses=True)
redis_client = redis.Redis(connection_pool=redis_pool)
# Ping to ensure connection is valid
await redis_client.ping()
logger.info(“Lifespan: Redis connection successful.”)
# Store the client instance in app.state
app.state.redis = redis_client
except redis.exceptions.ConnectionError as e:
logger.error(f”Lifespan: Failed to connect to Redis: {e}”)
# Decide if Redis is critical
raise RuntimeError(“Redis connection failed during startup”) from e
except Exception as e:
logger.error(f”Lifespan: An unexpected error occurred during Redis setup: {e}”)
raise RuntimeError(“Redis setup failed”) from e

yield # Application runs here

logger.info("Lifespan: Closing Redis connection...")
if hasattr(app.state, "redis") and app.state.redis:
    await app.state.redis.close()
    # Older redis-py versions might need pool.disconnect() instead/as well
    # await app.state.redis.connection_pool.disconnect()
    logger.info("Lifespan: Redis connection closed.")
else:
    logger.warning("Lifespan: No Redis client found in state to close.")

app = FastAPI(lifespan=lifespan)

— Dependency for getting Redis client —

async def get_redis_client(request: Request) -> redis.Redis:
if not hasattr(request.app.state, “redis”) or not app.state.redis:
raise HTTPException(status_code=503, detail=”Redis client not available”)
# You might want to add a ping here for robustness in handlers, but often relies on pool health
return request.app.state.redis

— Path Operations —

@app.get(“/cache/{key}”)
async def get_from_cache(key: str, redis_client: redis.Redis = Depends(get_redis_client)):
try:
value = await redis_client.get(key)
if value is None:
raise HTTPException(status_code=404, detail=”Key not found in cache”)
return {“key”: key, “value”: value}
except redis.exceptions.RedisError as e:
logger.error(f”Redis error getting key {key}: {e}”)
raise HTTPException(status_code=500, detail=”Cache error”)

@app.post(“/cache/{key}”)
async def set_in_cache(key: str, value: str, redis_client: redis.Redis = Depends(get_redis_client)):
try:
await redis_client.set(key, value, ex=3600) # Set with 1-hour expiry
return {“key”: key, “value”: value, “status”: “set”}
except redis.exceptions.RedisError as e:
logger.error(f”Redis error setting key {key}: {e}”)
raise HTTPException(status_code=500, detail=”Cache error”)

@app.get(“/redis_status”)
async def redis_status(request: Request):
status = “connected” if hasattr(request.app.state, “redis”) and request.app.state.redis else “disconnected”
if status == “connected”:
try:
await request.app.state.redis.ping()
except redis.exceptions.ConnectionError:
status = “connection_error”
except Exception:
status = “unknown_error”
return {“redis_status”: status}

To run:

1. Install redis-py async: pip install “redis[hiredis]” (hiredis is recommended for performance)

2. Set REDIS_URL if needed.

3. Ensure a Redis server is running at that URL.

4. Run with Uvicorn: uvicorn main:app –reload

“`

Notes:

  • We use redis.asyncio, the asynchronous version of redis-py.
  • A ConnectionPool is created and used by the Redis client for efficient connection management.
  • A ping() is performed during startup to verify the connection.
  • The client is stored in app.state.redis.
  • Cleanup involves calling redis_client.close().

4.4. Managing Background Task Schedulers (e.g., APScheduler)

If your application needs to run tasks periodically (e.g., cleanup jobs, report generation), a scheduler like APScheduler can be started and stopped via lifespan events.

“`python
import contextlib
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
import logging
import time
import asyncio

logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(name)s – %(levelname)s – %(message)s’)
logger = logging.getLogger(name)
scheduler_logger = logging.getLogger(‘apscheduler’)
scheduler_logger.setLevel(logging.WARNING) # Keep APScheduler logs quieter

— Background Task —

async def periodic_task():
logger.info(f”Periodic task running at {time.strftime(‘%X’)}”)
# Simulate some async work
await asyncio.sleep(2)
logger.info(f”Periodic task finished at {time.strftime(‘%X’)}”)

— Lifespan Management —

@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
logger.info(“Lifespan: Initializing scheduler…”)
scheduler = AsyncIOScheduler(timezone=”UTC”) # Use UTC or your preferred timezone
# Add the job before starting the scheduler
scheduler.add_job(
periodic_task,
trigger=IntervalTrigger(seconds=10), # Run every 10 seconds
id=”periodic_task_1″,
name=”My Periodic Task”,
replace_existing=True
)
try:
scheduler.start()
logger.info(“Lifespan: Scheduler started.”)
# Store scheduler in app state if needed elsewhere (e.g., to add jobs dynamically)
app.state.scheduler = scheduler
except Exception as e:
logger.error(f”Lifespan: Failed to start scheduler: {e}”)
# Depending on importance, you might raise an error
# raise RuntimeError(“Scheduler failed to start”) from e

yield # Application runs here

logger.info("Lifespan: Shutting down scheduler...")
if hasattr(app.state, "scheduler") and app.state.scheduler.running:
    # Give running jobs a chance to finish? Optional: add timeout.
    app.state.scheduler.shutdown(wait=True) # Set wait=False for faster shutdown
    logger.info("Lifespan: Scheduler shut down.")
else:
    logger.warning("Lifespan: No running scheduler found in state to shut down.")

app = FastAPI(lifespan=lifespan)

@app.get(“/”)
async def root():
return {“message”: “API with background scheduler running.”}

@app.get(“/scheduler_status”)
async def get_scheduler_status(request: Request):
if hasattr(request.app.state, “scheduler”):
scheduler = request.app.state.scheduler
return {
“running”: scheduler.running,
“jobs”: [{“id”: job.id, “name”: job.name, “next_run”: str(job.next_run_time)} for job in scheduler.get_jobs()]
}
return {“status”: “Scheduler not initialized”}

To run:

1. Install APScheduler: pip install apscheduler

2. Run with Uvicorn: uvicorn main:app –reload

“`

Explanation:

  • An AsyncIOScheduler instance is created.
  • Jobs are added using scheduler.add_job().
  • scheduler.start() is called before yield.
  • scheduler.shutdown() is called after yield. The wait=True argument tells APScheduler to wait for currently running jobs to complete before shutting down. Set wait=False if you want a faster shutdown that might interrupt running tasks.
  • The scheduler instance is stored in app.state if you need to interact with it later (e.g., add/remove jobs dynamically via API endpoints).

5. Sharing Resources with app.state

As seen in the examples, initializing a resource (like a database pool or ML model) in the lifespan function is only half the story. You need a way to access that resource within your request handlers (path operations).

FastAPI provides a convenient mechanism for this: app.state.

The app instance passed to the lifespan function has a state attribute. This state is an instance of starlette.datastructures.State. You can attach any attributes you want to this state object during the lifespan’s startup phase.

“`python

lifespan function

@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
# … initialize resources …
db_pool = await setup_database()
ml_model = load_model()
cache_client = await connect_cache()

# Attach to app.state
app.state.db_pool = db_pool
app.state.ml_model = ml_model
app.state.cache_client = cache_client
# You can attach anything: dicts, lists, custom objects...
app.state.app_config = {"setting1": "value1", "retries": 3}

yield

# ... cleanup: close pool, client etc ...
await db_pool.close()
await cache_client.close()
# No explicit cleanup needed for model/config unless they hold external resources

“`

Accessing State in Request Handlers:

Inside your path operation functions, you can access app.state via the Request object.

“`python
from fastapi import FastAPI, Request, Depends

Assume ‘lifespan’ function above is defined and passed to FastAPI

app = FastAPI(lifespan=lifespan)

— Using Request directly —

@app.get(“/items/{item_id}”)
async def get_item(item_id: int, request: Request):
# Access the pool from request.app.state
db_pool = request.app.state.db_pool
async with db_pool.acquire() as connection:
# Use connection …
result = await connection.fetchrow(“SELECT name FROM items WHERE id = $1”, item_id)
return {“item_id”: item_id, “name”: result[‘name’] if result else None}

@app.get(“/config”)
async def get_config(request: Request):
# Access custom config
app_config = request.app.state.app_config
return {“config”: app_config}

— Using Dependencies (Recommended) —

async def get_db_conn(request: Request):
pool = request.app.state.db_pool
async with pool.acquire() as conn:
yield conn

def get_ml_model(request: Request):
# Check if model exists for robustness
if not hasattr(request.app.state, “ml_model”) or request.app.state.ml_model is None:
raise HTTPException(status_code=503, detail=”ML Model not available”)
return request.app.state.ml_model

@app.post(“/predict_items”)
async def predict_items(data: list,
db: asyncpg.Connection = Depends(get_db_conn),
model = Depends(get_ml_model)):
# Use db connection and model retrieved via dependencies
# … logic …
return {“status”: “processed”}
“`

Why use dependencies?

  • Reusability: Dependencies encapsulate the logic for retrieving and potentially managing the resource (like acquiring/releasing a DB connection).
  • Testability: Dependencies are easier to override during testing.
  • Clarity: It makes the requirements of the path operation explicit in its signature.
  • Type Hinting: Provides better static analysis and editor support.

Important Note on app.state: The app.state object is shared across all requests. Therefore, only store objects in app.state that are intended to be shared and are thread-safe (or, in asyncio terms, concurrency-safe). Examples include connection pools, ML models (if stateless or properly locked), configuration dictionaries, and initialized clients. Do not store request-specific data in app.state.

6. Robust Error Handling in Lifespan Events

Errors can occur during both the startup and shutdown phases of the lifespan. Proper handling is crucial for application stability.

Startup Errors:

If an exception occurs in the code before the yield statement, the application startup should ideally fail. The ASGI server (like Uvicorn) typically detects this failure in the lifespan protocol and will stop the application from starting, logging the error.

“`python
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
logger.info(“Startup: Attempting critical initialization…”)
try:
# Simulate a critical resource that might fail
app.state.critical_service = await initialize_critical_service()
logger.info(“Startup: Critical service initialized.”)

    # Initialize non-critical resource
    try:
        app.state.optional_cache = await connect_optional_cache()
        logger.info("Startup: Optional cache connected.")
    except Exception as e:
        logger.warning(f"Startup: Failed to connect optional cache: {e}. Continuing without it.")
        app.state.optional_cache = None

except CriticalServiceError as e:
    logger.error(f"Startup FATAL: Failed to initialize critical service: {e}")
    # Re-raising the error prevents the application from starting
    raise RuntimeError("Application cannot start without critical service") from e
except Exception as e:
    logger.error(f"Startup FATAL: An unexpected critical error occurred during startup: {e}")
    raise RuntimeError("Unexpected critical startup failure") from e

# If we reach here, critical setup succeeded
yield # Application starts running

# Shutdown code follows...
# ...

“`

  • Use try...except blocks to catch potential exceptions during initialization.
  • Critical Failures: If a resource is essential for the application to function, catch the specific exception (or a general Exception), log it clearly, and then re-raise an exception (often wrapping the original using raise NewError from original_exception). This signals failure to the ASGI server.
  • Non-Critical Failures: If a resource is optional (like a cache), catch the exception, log a warning, perhaps set the corresponding app.state attribute to None, and do not re-raise. This allows the application to start in a degraded state.

Shutdown Errors:

Errors can also occur in the code after the yield. This is often during resource cleanup (e.g., the database is already down when trying to close the pool). It’s important to attempt all cleanup steps possible, even if one fails. The finally block is your friend here.

“`python
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
# — Startup —
app.state.db_pool = None
app.state.redis_client = None
try:
logger.info(“Startup: Initializing resources…”)
app.state.db_pool = await asyncpg.create_pool(…)
app.state.redis_client = redis.Redis.from_url(…)
await app.state.redis_client.ping()
logger.info(“Startup: Resources initialized.”)
except Exception as e:
logger.error(f”Startup FATAL: Initialization failed: {e}”)
# Cleanup potentially partially initialized resources before raising
if app.state.redis_client:
try:
await app.state.redis_client.close()
except Exception as close_err:
logger.error(f”Startup Error Cleanup: Failed to close Redis: {close_err}”)
if app.state.db_pool:
try:
await app.state.db_pool.close()
except Exception as close_err:
logger.error(f”Startup Error Cleanup: Failed to close DB Pool: {close_err}”)
raise RuntimeError(“Startup failed”) from e

# --- Yield ---
yield # Application runs

# --- Shutdown ---
logger.info("Shutdown: Cleaning up resources...")
# Use finally to ensure cleanup is attempted even if app crashes mid-request
# (Though ASGI server graceful shutdown handles much of this)
# Individual try-except blocks ensure one failure doesn't stop others.
try:
    if hasattr(app.state, "redis_client") and app.state.redis_client:
        logger.info("Shutdown: Closing Redis connection...")
        await app.state.redis_client.close()
        logger.info("Shutdown: Redis connection closed.")
except Exception as e:
    logger.error(f"Shutdown Error: Failed to close Redis connection: {e}")
    # Log the error, but continue to the next cleanup step

try:
    if hasattr(app.state, "db_pool") and app.state.db_pool:
        logger.info("Shutdown: Closing database pool...")
        await app.state.db_pool.close()
        logger.info("Shutdown: Database pool closed.")
except Exception as e:
    logger.error(f"Shutdown Error: Failed to close database pool: {e}")
    # Log the error

logger.info("Shutdown: Cleanup process finished.")

“`

  • Wrap the entire shutdown logic (after yield) or individual cleanup operations within try...except blocks.
  • Log any errors that occur during cleanup.
  • Generally, do not re-raise exceptions during shutdown, as this might interfere with the server’s shutdown process. The goal is to attempt cleanup as best as possible and log failures.
  • Check if resources actually exist (e.g., hasattr(app.state, 'db_pool') and app.state.db_pool) before trying to clean them up, adding robustness.
  • If startup fails after initializing some resources, the initial except block handling the startup failure should also attempt to clean up those partially initialized resources.

7. Testing Applications with Lifespan Events

Testing applications that use lifespan events is straightforward with FastAPI’s TestClient.

When you use TestClient as a context manager (with TestClient(app) as client:), it automatically handles the lifespan protocol:

  1. Startup: When entering the with block, TestClient triggers the lifespan startup phase (executing code before yield).
  2. Testing: Inside the with block, you can make requests using client.get(), client.post(), etc. Your application runs as if deployed.
  3. Shutdown: When exiting the with block, TestClient triggers the lifespan shutdown phase (executing code after yield).

“`python
from fastapi.testclient import TestClient

Assuming your FastAPI app (‘app’) and lifespan function are defined in ‘main.py’

from main import app
import pytest # Using pytest framework

Example test file: test_main.py

def test_lifespan_startup_shutdown():
# Test that lifespan runs without raising exceptions and state is managed
# (This implicitly tests lifespan execution via the context manager)
try:
with TestClient(app) as client:
# Check if state was set during startup (example for DB pool)
assert hasattr(client.app.state, “db_pool”)
assert client.app.state.db_pool is not None # Or check pool status

        # Perform a basic request to ensure app is running
        response = client.get("/")
        assert response.status_code == 200
        assert "db_pool_status" in response.json()
        # Potentially check the status reported by the endpoint
        # assert response.json()["db_pool_status"] == "available"

except RuntimeError as e:
    # If lifespan startup raised a critical error, TestClient might propagate it
    pytest.fail(f"Lifespan startup failed during test setup: {e}")

# Add assertions here if you need to check post-shutdown state,
# although TestClient doesn't easily expose app state *after* shutdown.
# Focus tests on behavior *during* the lifespan context.

def test_resource_access_via_endpoint():
# More specific test using a resource initialized in lifespan
with TestClient(app) as client:
# Assuming /redis_status endpoint exists from redis example
response = client.get(“/redis_status”)
assert response.status_code == 200
# Check if the status indicates successful connection managed by lifespan
# This requires mocking the actual redis connection for unit tests
assert response.json()[“redis_status”] == “connected” # Or “connection_error” if mock simulates failure

— Mocking External Services —

Often, you don’t want tests to hit real databases or external services.

You can mock the resources initialized during lifespan.

@pytest.fixture
def mock_redis(mocker):
“””Pytest fixture to mock the redis client.”””
mock_client = mocker.AsyncMock(spec=redis.Redis) # Use AsyncMock for async methods
mock_client.ping.return_value = True
mock_client.get.return_value = “mocked_value”
mock_client.set.return_value = True
# Mock close/disconnect as needed if shutdown logic is complex
mock_client.close = mocker.AsyncMock()

return mock_client

@pytest.fixture
def override_lifespan_redis(mock_redis):
“””Fixture to create a lifespan override that injects the mock redis.”””
@contextlib.asynccontextmanager
async def mock_lifespan(app: FastAPI):
print(“Using mocked lifespan for Redis test”)
# Simulate startup, inject mock
app.state.redis = mock_redis
print(“Mock Redis injected into app.state”)
yield
# Simulate shutdown (mock close might be called)
print(“Mocked lifespan shutdown”)
if hasattr(app.state, “redis”) and app.state.redis:
await app.state.redis.close() # Trigger mock close if needed
return mock_lifespan

def test_cache_endpoints_with_mock(override_lifespan_redis):
# Override the app’s lifespan before creating TestClient
app.dependency_overrides = {} # Clear other overrides if necessary
# We need to tell the app to use our mocked lifespan
# Temporarily modify the app’s lifespan for this test
original_lifespan = app.router.lifespan_context
app.router.lifespan_context = override_lifespan_redis
try:
with TestClient(app) as client:
# Test GET
response = client.get(“/cache/mykey”)
# Because redis is mocked, this might succeed or fail based on mock setup
# If get returns None -> 404, if returns value -> 200
assert response.status_code == 200 # Assuming mock returns a value
assert response.json()[“value”] == “mocked_value”
mock_redis.get.assert_awaited_once_with(“mykey”) # Check mock was called

        # Test POST
        response = client.post("/cache/newkey?value=test_value")
        assert response.status_code == 200
        assert response.json()["status"] == "set"
        mock_redis.set.assert_awaited_once_with("newkey", "test_value", ex=3600)

finally:
    # Restore original lifespan to avoid affecting other tests
    app.router.lifespan_context = original_lifespan

“`

Key Testing Strategies:

  1. TestClient Context Manager: Always use with TestClient(app) as client: to ensure lifespan events are executed correctly during tests.
  2. State Inspection: Inside the with block, you can inspect client.app.state to assert that resources were initialized as expected.
  3. Endpoint Testing: Test the endpoints that rely on resources initialized during lifespan to ensure they function correctly.
  4. Mocking: For unit/integration tests, mock the external services (databases, caches, APIs, model loading).
    • Use libraries like unittest.mock or pytest-mock. Remember to use AsyncMock for mocking async functions/methods.
    • You can mock the initialization functions called by your lifespan manager.
    • Alternatively, you can override the entire lifespan manager for specific tests, injecting mocks directly into app.state, as shown in the override_lifespan_redis example. This gives fine-grained control for testing different scenarios (e.g., testing behavior when a resource fails to initialize).
  5. Dependency Overrides: While lifespan manages global resources, individual request dependencies (Depends(...)) can still be overridden using app.dependency_overrides for targeted testing of path operation logic, even if those dependencies use resources from app.state.

8. Advanced Considerations and Best Practices

  • Idempotency: Ensure your startup and shutdown logic is idempotent where possible. If the server restarts unexpectedly, could running startup logic again cause issues? Could running shutdown logic twice cause errors? Design defensively.
  • Keep it Focused: The lifespan handler should focus only on application-level setup and teardown. Avoid putting business logic or request-handling logic directly inside it.
  • Asynchronous Operations: Lifespan functions must be async. Ensure any I/O operations within them (database connections, API calls, file access) use asynchronous libraries (e.g., asyncpg, aiohttp, aiofiles) to avoid blocking the event loop during startup/shutdown. If you must run synchronous, blocking code (like some ML model loading/prediction), use asyncio.to_thread or loop.run_in_executor.
  • Configuration: Load configuration (e.g., using Pydantic’s BaseSettings) before the lifespan manager runs or very early within it, as resource initialization often depends on configuration values.
  • State Management: app.state is simple but global. For very complex applications, you might explore more structured dependency injection containers, but app.state is often sufficient and the standard FastAPI approach for lifespan-initialized resources.
  • Graceful Shutdown: Rely on the ASGI server (Uvicorn, Hypercorn) to handle graceful shutdown signals (SIGINT, SIGTERM). It will typically stop accepting new requests, wait for existing requests to finish (within a timeout), and then trigger the lifespan shutdown phase. Your shutdown code should focus on releasing resources.
  • Logging: Implement thorough logging within your lifespan handler. Log the start and end of initialization/cleanup phases, successful operations, and especially any errors encountered. This is invaluable for debugging deployment issues.
  • Resource Handles: Ensure you store the correct handles in app.state needed for cleanup. For example, store the pool object itself, not just a single connection obtained during startup.

9. Troubleshooting Common Issues

  • Application Fails to Start:
    • Cause: An unhandled exception occurred in the lifespan code before the yield.
    • Solution: Check the server logs (Uvicorn/Hypercorn) for tracebacks originating from your lifespan function. Implement robust try...except blocks in the startup phase and log errors clearly. Ensure critical failures re-raise exceptions.
  • Resource Not Available in Handler (AttributeError: 'State' object has no attribute '...'):
    • Cause 1: You forgot to attach the resource to app.state in the lifespan function (app.state.my_resource = resource).
    • Cause 2: A non-critical initialization failed silently during startup, and the attribute was never set or set to None.
    • Cause 3: Typo in the attribute name when setting it (app.state.db_pool) vs. accessing it (request.app.state.db_poool).
    • Solution: Verify the attribute assignment in the lifespan function. Check startup logs for any warnings about failed initializations. Double-check attribute names for consistency. Add checks in dependencies or handlers (e.g., if not hasattr(request.app.state, 'my_resource') or not request.app.state.my_resource:).
  • Blocking Startup/Shutdown:
    • Cause: Synchronous, long-running code (e.g., slow model loading, network calls using requests instead of httpx or aiohttp) is blocking the asyncio event loop.
    • Solution: Identify the blocking call. Use asynchronous libraries for I/O. For CPU-bound synchronous code, run it in a separate thread using asyncio.to_thread or loop.run_in_executor.
  • Resource Leaks after Shutdown:
    • Cause: The shutdown code (after yield) failed to run, failed partially, or didn’t correctly close/release all resources.
    • Solution: Ensure shutdown logic is present and correct. Wrap cleanup operations in try...except blocks to prevent one failure from stopping others. Log shutdown steps and errors. Verify that the correct resource handles are being closed (e.g., the pool, not just a single connection). Ensure the ASGI server is configured for a reasonable graceful shutdown timeout.
  • Tests Behave Differently / Fail:
    • Cause: Tests are not using TestClient as a context manager, so lifespan events aren’t triggered. Or, mocking is incomplete/incorrect.
    • Solution: Ensure tests use with TestClient(app) as client:. Review mocking strategies – are you mocking the right things? Are you using AsyncMock for async methods? Consider overriding the lifespan context for test isolation.

Conclusion

FastAPI’s lifespan context manager provides an elegant, powerful, and ASGI-standard way to manage application setup and teardown. By executing code before the application starts serving requests and performing cleanup before it exits, you can efficiently initialize resources like database connection pools, ML models, and cache clients, ensuring they are ready when needed and released gracefully.

Mastering lifespan events involves understanding the asynccontextmanager pattern, leveraging app.state for resource sharing, implementing robust error handling for both startup and shutdown phases, and employing effective testing strategies, often involving mocking.

While the older @app.on_event("startup") and @app.on_event("shutdown") decorators exist, the lifespan approach is the modern, recommended standard, offering better structure, state management integration, and alignment with the underlying ASGI protocol. By embracing lifespan events, you can build more robust, performant, and maintainable FastAPI applications capable of handling complex initialization and cleanup requirements with clarity and efficiency.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top