Unit of Work en Python

Últimamente he estado trabajando en una aplicación en la que, básicamente, todos los casos de uso siguen un patrón muy claro: emitir logs al inicio, operar con la base de datos para persistir los cambios o hacer rollback si algo sale mal. Si todo ha ido bien, cerramos la sesión y emitimos algunos logs informativos; si, por el contrario, algo falla, emitimos logs de error.

¿Qué es Unit Of Work?

Después de unas semanas implementando casos de uso como los que menciono arriba, caí en la cuenta de que estaba utilizando un patrón muy conocido llamado Unit of Work, sobre el cual hay mucha literatura escrita y que se puede definir como:

Un patrón de diseño que se utiliza para agrupar varias operaciones (normalmente de una base de datos) en una sola "unidad de trabajo", asegurando que todas estas operaciones se ejecuten completamente o no se ejecuten.

Por tanto, encajaba perfectamente con lo que necesitaba. Llevando este patrón un poco más allá, mi idea era tener disponible dentro del UnitOfWork un logger que nos permitiera emitir logs de forma sencilla.

Implementar nuestro Unit of Work

Como siempre, vamos a pensar en cómo nos gustaría que fuera nuestro UnitOfWork, diseñándolo desde los tests:

class TestUnitOfWork:
    def test_commit_and_close(self) -> None:
        # ARRANGE
        executor = f"{self.__class__.__name__}#test_commit_and_close"
        logger = Mimic(Spy, Logger)
        session = Mimic(Spy, Session)

        # ACT
        with UnitOfWork(executor, session, logger) as unit_of_work:
            unit_of_wok.session.add("foo")

        # ASSERT
        expect(session.add).to(have_been_called_with("foo"))
        expect(session.commit).to(have_been_called)
        expect(session.close).to(have_been_called)
        expect(session.rollback).not_to(have_been_called)

Vamos a ir paso a paso, avanzando solo lo justo y necesario. Por ello, nos centraremos por el momento en poder interactuar con la session de la base de datos.

En este test podemos ver que nuestro UnitOfWork se usa como un Context Manager, al que le inyectamos en el constructor tres parámetros:

  • Una cadena de texto con el nombre de la clase que lo invoca.
  • Un logger de tipo Logger.
  • Una sesión de base de datos de tipo Session.

Al inyectarlos vía constructor, los tenemos disponibles en el test para manipularlos como necesitemos (¡justo lo que queremos!). Dentro del UnitOfWork, accedemos a la sesión para posteriormente realizar las aserciones correspondientes y validar que se ha llamado correctamente a los métodos esperados.

Implementar un Context Manager en Python (métodos __enter__ y __exit__)

Ahora viene la parte más interesante del post: ¿cómo implementamos en Python nuestro UnitOfWork?

Por suerte, en Python es muy sencillo implementar un Context Manager. Basta con crear una clase que defina, al menos, los métodos __enter__ y __exit__.

  • El método __enter__ se ejecutará automáticamente justo antes de ejecutar el código dentro del Context Manager.
  • El método __exit__ se ejecutará al finalizar la ejecución del Context Manager y nos proporcionará tres parámetros (opcionales) que nos permitirán comprobar si ha ocurrido algún problema durante la misma.

Estos tres parámetros son:

  • ex_type: Tipo de excepción (Exception). Ejemplo: ValueError, NotImplementedError, etc.
  • ex_value: Valor de la excepción.
  • traceback: La traza del error, útil para localizar dónde se ha producido.

Sabiendo esto, vamos a implementar lo mínimo y necesario para pasar nuestro primer test:

class UnitOfWork:
    def __init__(
        self,
        executor: str,
        session: Session,
        logger: Logger = Logger(),
    ) -> None:
        self.executor = executor
        self.logger = logger
        self.session = session

    def __enter__(self) -> "UnitOfWork":
        return self

    def __exit__(
        self,
        ex_type: type[BaseException] | None,
        ex_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        self.session.commit()
        self.session.close()

Básicamente, hemos definido las dependencias en el constructor. En el método __enter__, simplemente devolvemos la propia instancia de la clase, y en el método __exit__ llamamos a nuestra sesión para cerrarla después de persistir los cambios.

A continuación, queremos que se emitan ciertos logs durante la ejecución de nuestro UnitOfWork, al mismo tiempo que el logger está disponible para su uso.

Creamos un test para ello:

class TestUnitOfWork:

    # ...

    def test_logs_at_the_enter_and_the_exit(self) -> None:
        # ARRANGE
        executor = f"{self.__class__.__name__}#test_commit_and_close"
        logger = Mimic(Spy, Logger)
        session = Mimic(Spy, Session)

        # ACT
        with UnitOfWork(executor, session, logger) as unit_of_work:
            unit_of_work.logger.info("Test")

        # ASSERT
        expect(logger.info).to(have_been_called_with(f"Starting UnitOfWork in class {executor}"))
        expect(logger.info).to(have_been_called_with("Test"))
        expect(logger.info).to(have_been_called_with(("Committing database transaction")))
        expect(logger.info).to(have_been_called_with("Closing database session"))
        expect(logger.info).to(have_been_called_with(f"Ending UnitOfWork in class {executor}"))

En este caso, hacemos una llamada al logger dentro del UnitOfWork y después realizamos todas las aserciones necesarias para validar que los logs se han emitido correctamente.

class UnitOfWork:
    def __init__(
        self,
        executor: str,
        session: Session,
        logger: Logger = Logger(),
    ) -> None:
        self.executor = executor
        self.logger = logger
        self.session = session

    def __enter__(self) -> "UnitOfWork":
        self.logger.info(f"Starting UnitOfWork in class {self.executor}")
        return self

    def __exit__(
        self,
        ex_type: type[BaseException] | None,
        ex_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        self._commit()
        self._close()

        self.logger.info(f"Ending UnitOfWork in class {self.executor}")

    def _commit(self) -> None:
        self.logger.info("Committing database transaction")
        self.session.commit()

    def _close(self) -> None:
        self.logger.info("Closing database session")
        self.session.close()

Como puedes ver, en esta iteración hemos añadido llamadas al logger en los puntos clave de nuestro UnitOfWork.

Implementar nuestro Unit of Work (Unhappy path)

Hasta este momento hemos estado testeando e implementando lo que se conoce como Happy Path, pero ¿qué pasa cuando ocurre un error en nuestro UnitOfWork?

Mejor pensar en ello creando unos nuevos tests, ¿verdad? Vamos allá:

class TestUnitOfWork:

    # ...

    def test_exceptions_do_rollbacks(self) -> None:
        # ARRANGE
        executor = f"{self.__class__.__name__}#test_raise_exception_rollbacks"
        logger = Mimic(Spy, Logger)
        session = Mimic(Spy, Session)

        # ACT
        try:
            with UnitOfWork(executor, session, logger) as unit_of_work:
                raise ValueError("Error")
        except Exception:
            pass

        # ASSERT
        expect(session.rollback).to(have_been_called)
        expect(session.commit).not_to(have_been_called)
        expect(session.close).not_to(have_been_called)

Bastante sencillo. Ahora, vamos a forzar una excepción dentro de nuestro UnitOfWork para posteriormente validar que se produce el rollback. La implementación quedaría así:

class UnitOfWork:

    # ...

    def __exit__(
        self,
        ex_type: type[BaseException] | None,
        ex_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        if ex_type:
            self.session.rollback()
        else:
            self._commit()
            self._close()

        self.logger.info(f"Ending UnitOfWork in class {self.executor}")

Como puedes ver, lo único que hacemos es validar la presencia de una excepción cuando se ejecuta el método __exit__, y si es así, llamamos a la sesión para hacer rollback.

Por último, y como hicimos antes con el Happy Path, vamos a crear un test para validar que se emiten los logs correctos en caso de error:

class TestUnitOfWork:

    # ...

    def test_exceptions_print_log_errors_after_entering(self) -> None:
        # ARRANGE
        executor = f"{self.__class__.__name__}#test_raise_exception_rollbacks"
        logger = Mimic(Spy, Logger)
        session = Mimic(Spy, Session)

        # ACT
        try:
            with UnitOfWork(executor, session, logger) as unit_of_work:
                unit_of_work.logger.info("Test")
                raise ValueError("Error")
        except Exception:
            pass

        # ASSERT
        expect(logger.info).to(have_been_called_with(f"Starting UnitOfWork in class {executor}"))
        expect(logger.info).to(have_been_called_with("Test"))
        expect(logger.error).to(have_been_called_with("Exception: ValueError with error message: 'Error'"))
        expect(logger.error).to(have_been_called_with(contain("Traceback: ")))

En este caso, estamos registrando en logs toda la información relevante sobre los errores. Por lo tanto, la implementación quedaría de la siguiente manera:

class UnitOfWork:

    # ...

    def __exit__(
        self,
        ex_type: type[BaseException] | None,
        ex_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        if ex_type:
            self._rollback(ex_type, ex_value, traceback)
        else:
            self._commit()
            self._close()

        self.logger.info(f"Ending UnitOfWork in class {self.executor}")

    def _rollback(
        self,
        ex_type: type[BaseException] | None,
        ex_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        self.logger.error(f"Exception: {ex_type.__name__} with error message: '{ex_value}'")
        self.logger.error(f"Traceback: {traceback.tb_frame}")
        self.logger.error("Rolling back database transaction")
        self.session.rollback()

Nada del otro mundo, ¿verdad? Además, haciendo TDD todo ha ido súper fluido, ¿verdad? 🚀

Cómo utilizar nuestro Unit of Work

Una vez que nuestro UnitOfWork está implementado, usarlo es súper fácil:

from src.session import Session
from src.unit_of_work import UnitOfWork


class Foo:
    def run(self) -> None:
        session = Session()
        with UnitOfWork("Foo#run", session) as unit_of_work:
            unit_of_work.logger.info("Inside FOO")

Siendo esto lo que veríamos como salida de la ejecución:

Context Manager

Conclusiones

A lo largo de este artículo hemos visto cómo implementar un Unit of Work en Python, aprovechando el uso de Context Managers y asegurándonos de que todas las transacciones con la base de datos se manejen correctamente.

Además, hemos seguido un enfoque de TDD que nos ha permitido desarrollar nuestra solución de forma iterativa y controlada, validando cada paso con tests antes de implementar la funcionalidad.

Gracias a esto, ahora tenemos un UnitOfWork que:

  • Gestiona automáticamente las transacciones de la base de datos.
  • Emite logs útiles para depuración y monitoreo.
  • Maneja errores correctamente asegurando un rollback cuando sea necesario.

¡Espero que te haya sido útil! 🚀

Todo el código de este post está disponible aquí