Permissions with database-based storage

We use SimpleCookieStorage and an in-memory SQLite DB to make it easy to try out the demo. When developing an application, you should use EncryptedCookieStorage or RedisStorage and a production-ready database. If you want the full source code in advance or for comparison, check out the demo source.

Database

When the application runs, we initialise the DB with sample data using SQLAlchemy ORM:

async def init_db(db_engine: AsyncEngine, db_session: async_sessionmaker[AsyncSession]) -> None:
    """Initialise DB with sample data."""
    async with db_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    async with db_session.begin() as sess:
        pw = "$5$rounds=535000$2kqN9fxCY6Xt5/pi$tVnh0xX87g/IsnOSuorZG608CZDFbWIWBr58ay6S4pD"
        sess.add(User(username="admin", password=pw, is_superuser=True))
        moderator = User(username="moderator", password=pw)
        user = User(username="user", password=pw)
        sess.add(moderator)
        sess.add(user)
    async with db_session.begin() as sess:
        sess.add(Permission(user_id=moderator.id, name="protected"))
        sess.add(Permission(user_id=moderator.id, name="public"))
        sess.add(Permission(user_id=user.id, name="public"))

This will consist of 2 tables/models created in db.py:

Users:

class User(Base):
    """A user and their credentials."""

    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(sa.String(256), unique=True, index=True)
    password: Mapped[str] = mapped_column(sa.String(256))
    is_superuser: Mapped[bool] = mapped_column(
        default=False, server_default=sa.sql.expression.false())
    disabled: Mapped[bool] = mapped_column(
        default=False, server_default=sa.sql.expression.false())
    permissions = relationship("Permission", cascade="all, delete")

And their permissions:

class Permission(Base):
    """A permission that grants a user access to something."""

    __tablename__ = "permissions"

    user_id: Mapped[int] = mapped_column(
        sa.ForeignKey(User.id, ondelete="CASCADE"), primary_key=True)
    name: Mapped[str] = mapped_column(sa.String(64), primary_key=True)

Writing policies

You need to implement two entities: IdentityPolicy and AuthorizationPolicy. First one should have these methods: identify, remember and forget. For the second one: authorized_userid and permits. We will use the included SessionIdentityPolicy and write our own database-based authorization policy.

In our example we will lookup a user login in the database and, if present, return the identity.

    async def authorized_userid(self, identity: str) -> str | None:
        where = _where_authorized(identity)
        async with self.dbsession() as sess:
            user_id = await sess.scalar(sa.select(User.id).where(*where))
        return str(user_id) if user_id else None

For permission checking, we will fetch the user first, check if he is superuser (all permissions are allowed), otherwise check if the permission is explicitly set for that user.

    async def permits(self, identity: str | None, permission: str | Enum,
                      context: dict[str, object] | None = None) -> bool:
        if identity is None:
            return False

        where = _where_authorized(identity)
        stmt = sa.select(User).options(selectinload(User.permissions)).where(*where)
        async with self.dbsession() as sess:
            user = await sess.scalar(stmt)

        if user is None:
            return False
        if user.is_superuser:
            return True
        return any(p.name == permission for p in user.permissions)

Setup

Once we have all the code in place we can install it for our application:

async def init_app() -> web.Application:
    app = web.Application()

    db_engine = create_async_engine("sqlite+aiosqlite:///:memory:")
    app["db_session"] = async_sessionmaker(db_engine, expire_on_commit=False)

    await init_db(db_engine, app["db_session"])

    setup_session(app, SimpleCookieStorage())
    setup_security(app, SessionIdentityPolicy(), DBAuthorizationPolicy(app["db_session"]))

    web_handlers = Web()
    web_handlers.configure(app)

    return app

Now we have authorization and can decorate every other view with access rights based on permissions. There are two helpers included for this:

from aiohttp_security import check_authorized, check_permission

For each view you need to protect - just apply the decorator on it.

    async def protected_page(self, request: web.Request) -> web.Response:
        await check_permission(request, 'protected')
        return web.Response(text="You are on protected page")

or

    async def logout(self, request: web.Request) -> web.Response:
        await check_authorized(request)
        response = web.Response(text="You have been logged out")
        await forget(request, response)
        return response

If someone tries to access that protected page he will see:

403: Forbidden

The best part of it - you can implement any logic you want following the API conventions.

Launch application

For working with passwords there is a good library passlib. Once you’ve created some users you want to check their credentials on login. A similar function may do what you are trying to accomplish:

from passlib.hash import sha256_crypt
async def check_credentials(db_session: async_sessionmaker[AsyncSession],
                            username: str, password: str) -> bool:
    where = _where_authorized(username)
    async with db_session() as sess:
        hashed_pw = await sess.scalar(sa.select(User.password).where(*where))

    if hashed_pw is None:
        return False

    return sha256_crypt.verify(password, hashed_pw)

Final step is to launch your application:

python -m database_auth

Try to login with admin/moderator/user accounts (with password password) and access /public or /protected endpoints.