Permissions with database-based storage¶
We use SimpleCookieStorage
and an in-memory SQLite DB to
make it easy to try out the demo. When developing an application, you should use
EncryptedCookieStorage
or
RedisStorage
and a production-ready database.
If you want the full source code in advance or for comparison, check out
the demo source.
Database¶
When the application runs, we initialise the DB with sample data using SQLAlchemy ORM:
async def init_db(db_engine: AsyncEngine, db_session: async_sessionmaker[AsyncSession]) -> None:
"""Initialise DB with sample data."""
async with db_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with db_session.begin() as sess:
pw = "$5$rounds=535000$2kqN9fxCY6Xt5/pi$tVnh0xX87g/IsnOSuorZG608CZDFbWIWBr58ay6S4pD"
sess.add(User(username="admin", password=pw, is_superuser=True))
moderator = User(username="moderator", password=pw)
user = User(username="user", password=pw)
sess.add(moderator)
sess.add(user)
async with db_session.begin() as sess:
sess.add(Permission(user_id=moderator.id, name="protected"))
sess.add(Permission(user_id=moderator.id, name="public"))
sess.add(Permission(user_id=user.id, name="public"))
This will consist of 2 tables/models created in db.py
:
Users:
class User(Base):
"""A user and their credentials."""
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(sa.String(256), unique=True, index=True)
password: Mapped[str] = mapped_column(sa.String(256))
is_superuser: Mapped[bool] = mapped_column(
default=False, server_default=sa.sql.expression.false())
disabled: Mapped[bool] = mapped_column(
default=False, server_default=sa.sql.expression.false())
permissions = relationship("Permission", cascade="all, delete")
And their permissions:
class Permission(Base):
"""A permission that grants a user access to something."""
__tablename__ = "permissions"
user_id: Mapped[int] = mapped_column(
sa.ForeignKey(User.id, ondelete="CASCADE"), primary_key=True)
name: Mapped[str] = mapped_column(sa.String(64), primary_key=True)
Writing policies¶
You need to implement two entities:
IdentityPolicy
and
AuthorizationPolicy
.
First one should have these methods:
identify
,
remember
and
forget
.
For the second one:
authorized_userid
and
permits
. We will use the
included SessionIdentityPolicy
and write our own
database-based authorization policy.
In our example we will lookup a user login in the database and, if present, return the identity.
async def authorized_userid(self, identity: str) -> str | None:
where = _where_authorized(identity)
async with self.dbsession() as sess:
user_id = await sess.scalar(sa.select(User.id).where(*where))
return str(user_id) if user_id else None
For permission checking, we will fetch the user first, check if he is superuser (all permissions are allowed), otherwise check if the permission is explicitly set for that user.
async def permits(self, identity: str | None, permission: str | Enum,
context: dict[str, object] | None = None) -> bool:
if identity is None:
return False
where = _where_authorized(identity)
stmt = sa.select(User).options(selectinload(User.permissions)).where(*where)
async with self.dbsession() as sess:
user = await sess.scalar(stmt)
if user is None:
return False
if user.is_superuser:
return True
return any(p.name == permission for p in user.permissions)
Setup¶
Once we have all the code in place we can install it for our application:
async def init_app() -> web.Application:
app = web.Application()
db_engine = create_async_engine("sqlite+aiosqlite:///:memory:")
app["db_session"] = async_sessionmaker(db_engine, expire_on_commit=False)
await init_db(db_engine, app["db_session"])
setup_session(app, SimpleCookieStorage())
setup_security(app, SessionIdentityPolicy(), DBAuthorizationPolicy(app["db_session"]))
web_handlers = Web()
web_handlers.configure(app)
return app
Now we have authorization and can decorate every other view with access rights based on permissions. There are two helpers included for this:
from aiohttp_security import check_authorized, check_permission
For each view you need to protect - just apply the decorator on it.
async def protected_page(self, request: web.Request) -> web.Response:
await check_permission(request, 'protected')
return web.Response(text="You are on protected page")
or
async def logout(self, request: web.Request) -> web.Response:
await check_authorized(request)
response = web.Response(text="You have been logged out")
await forget(request, response)
return response
If someone tries to access that protected page he will see:
403: Forbidden
The best part of it - you can implement any logic you want following the API conventions.
Launch application¶
For working with passwords there is a good library passlib. Once you’ve created some users you want to check their credentials on login. A similar function may do what you are trying to accomplish:
from passlib.hash import sha256_crypt
async def check_credentials(db_session: async_sessionmaker[AsyncSession],
username: str, password: str) -> bool:
where = _where_authorized(username)
async with db_session() as sess:
hashed_pw = await sess.scalar(sa.select(User.password).where(*where))
if hashed_pw is None:
return False
return sha256_crypt.verify(password, hashed_pw)
Final step is to launch your application:
python -m database_auth
Try to login with admin/moderator/user accounts (with password password) and access /public or /protected endpoints.