Source code for jetfactory.service.db

# -*- coding: utf-8 -*-

import peewee
from peewee_async import Manager

from jetfactory.exceptions import JetfactoryException
from .base import BaseService


[docs]class DatabaseService(BaseService): """Service class providing an interface with common database operations :ivar __model__: Peewee.Model :ivar db_manager: Jetfactory database manager (peewee-async) """ __model__: peewee.Model db_manager: Manager
[docs] @classmethod def register(cls, pkg, mgr): super(DatabaseService, cls).register(pkg, mgr) cls.db_manager = mgr.db.manager
@property def model(self): """Return service model or raise an exception :return: peewee.Model """ if not self.__model__: raise Exception(f'{self.__class__.__name__}.__model__ not set, unable to perform database operation') return self.__model__ def _select(self, *fields): """Creates a new select query :param fields: fields tuple :return: peewee.ModelSelect """ return self.model.extended(*fields) if hasattr(self.model, 'extended') else self.model.select(*fields) def _model_has_attrs(self, *attrs): """Ensure model has field(s) :param attrs: fields tuple :return: True or Exception """ for attr in attrs: if not hasattr(self.model, attr): raise Exception(f'Unknown field {attr}', 400) return True def _parse_sortstr(self, value: str): """Takes a comma-separated string of fields to sort in asc or desc order example: "-field1,field2" to order by field1 desc, field2 asc :param value: sortstr :return: list of sort-objects """ if not value: return [] model = self.model for col_name in value.split(','): sort_asc = True if col_name.startswith('-'): col_name = col_name[1:] sort_asc = False if self._model_has_attrs(col_name): sort_obj = getattr(model, col_name) yield sort_obj.asc() if sort_asc else sort_obj.desc() def _get_query_filtered(self, expression: peewee.Expression = None, select: peewee.ModelSelect = None, **params): """Constructs a query using an Expression and/or parameters. :param expression: peewee.Expression :param select: peewee.ModelSelect :param params: Dictionary for controlling pagination and sorting :return: Peewee query """ query_base = select or self._select() sort = params.pop('_sort', None) limit = params.pop('_limit', 0) offset = params.pop('_offset', 0) query = query_base.limit(limit).offset(offset) if isinstance(expression, peewee.Expression): query = query.where(expression) elif params: query = query.filter(**params) if sort: order = self._parse_sortstr(params['sort']) return query.order_by(*order) return query
[docs] async def get_many(self, expression=None, **kwargs): """Returns a list of zero or more records :param expression: peewee.Expression (optional) :param kwargs: kwargs to pass along to `_get_query_filtered` :return: [peewee.Model] """ query = self._get_query_filtered(expression, **kwargs) return [o for o in await self.db_manager.execute(query)]
[docs] async def get_by_pk(self, record_id: int): """Returns the matching record or raises an exception :param record_id: the record to query for :return: peewee.Model """ return await self.get_one(self.__model__.id == record_id)
[docs] async def create(self, item: dict): """Creates new record, returning the record upon success :param item: item to be inserted :return: peewee.Model """ return await self.db_manager.create(self.model, **item)
[docs] async def get_or_create(self, item: dict): """Get an object or create it with the provided defaults :param item: item to be fetched or inserted :return: peewee.Model """ return await self.db_manager.get_or_create(self.model, **item)
[docs] async def count(self, expression=None, **kwargs): """Return the number of records the given query would yield :param expression: peewee.Expression (optional) :param kwargs: kwargs to pass along to `_get_query_filtered` :return: count (int) """ query = self._get_query_filtered(expression, **kwargs) return await self.db_manager.count(query)
[docs] async def get_one(self, *args, **kwargs): """Get exactly one record or raise an exception :param args: peewee.Expression (optional) :param kwargs: kwargs to pass along to `_get_query_filtered` :return: peewee.Model """ query = self._get_query_filtered(*args, **kwargs) try: return await self.db_manager.get(query) except peewee.DoesNotExist: raise JetfactoryException('No such record', 404)
[docs] async def update(self, record, payload): """Updates a record in the database :param record: peewee.Model or PK :param payload: update payload :return: updated record """ if not isinstance(record, peewee.Model): record = await self.get_by_pk(record) for k, v in payload.items(): setattr(record, k, v) await self.db_manager.update(record) return record
[docs] async def delete(self, record_id: int): """Deletes a record by id :param record_id: record id :return: {'deleted': True|False} """ record = await self.get_by_pk(record_id) deleted = await self.db_manager.delete(record) return {'deleted': deleted}