开发者

Reverse mapping from a table to a model in SQLAlchemy

开发者 https://www.devze.com 2022-12-30 08:27 出处:网络
To provide an activity log in my SQLAlchemy-based app, I have a model like this: class ActivityLog(Base):

To provide an activity log in my SQLAlchemy-based app, I have a model like this:

class ActivityLog(Base):
    __tablename__ = 'activitylog'
    id = Column(Integer, primary_key=True)
    activity_by_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    activity_by = relation(User, primaryjoin=activity_by_id == User.id)
    activity_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    activity_type = Column(SmallInteger, nullable=False)

    target_table = Column(Unicode(20), nullable=False)
    target_id = Column(Integer, nullable=False)
    target_title = Column(Unicode(255), nullable=False)

The log contains entries for multiple tables, so I can't use ForeignKey relations. Log entries are made like this:

doc = Document(name=u'mydoc', title=u'My Test Document',
               created_by=user, edited_by=user)
session.add(doc)
session.flush() # See note below
log = ActivityLog(activity_by=user, activity_type=ACTIVITY_ADD,
                  target_table=Document.__table__.name, target_id=doc.id,
                  target_title=doc.title)
session.add(log)

This leaves me with three problems:

  1. I have to flush the session before my doc object gets an id. If I had used a ForeignKey column and a relation mapper, I could have simply called ActivityLog(target=doc) and let SQLAlchemy do the work. Is there any way to work around needing to flush by hand?

  2. The target_table parameter is too verbose. I suppose I could solve this with a target property setter in ActivityLog that automatically retrieves the table name and id from a given instance.

  3. Biggest of all, I'm not sure how to retrieve a model instance from the database. Given an ActivityLog instance log, calling self.session.query(log.target_table).get(log.target_id) does not work, as query() expects a model as parameter.

One workaround appears to be to use polymorphism and derive all my models from a base model which ActivityLog recognises. Something like this:

class Entity(Base):
    __tablename__ = 'entities'
    id = Column(Integer, primary_key=True)
    title = Column(Unicode(255), nullable=False)
    edited_at = Column(DateTime, onupdate=datetime.utcnow, nullable=False)
    entity_type = Column(Unicode(20), nullable=False)
    __mapper_args__ = {'polymorphic_on': entity_type}

class Document(Entity):
    __tablename__ = 'documents'
    __mapper_args__ = {'polymorphic_identity': 'document'}
    body = Column(Uni开发者_高级运维codeText, nullable=False)

class ActivityLog(Base):
    __tablename__ = 'activitylog'
    id = Column(Integer, primary_key=True)
    ...
    target_id = Column(Integer, ForeignKey('entities.id'), nullable=False)
    target = relation(Entity)

If I do this, ActivityLog(...).target will give me a Document instance when it refers to a Document, but I'm not sure it's worth the overhead of having two tables for everything. Should I go ahead and do it this way?


One way to solve this is polymorphic associations. It should solve all 3 of your issues and also make database foreign key constraints work. See the polymorphic association example in SQLAlchemy source. Mike Bayer has an old blogpost that discusses this in greater detail.


Definitely go through the blogpost and examples Ants linked to. I did not find the explanation confusion, but rather assuming some more experience on the topic.

Few things I can suggest are:

  • ForeignKeys: in general I agree they are a good thing go have, but I am not sure it is conceptually important in your case: you seem to be using this ActivityLog as an orthogonal cross-cutting concern (AOP); but version with foreign keys would effectively make your business objects aware of the ActivityLog. Another problem with having FK for audit purposes using schema setup you have is that if you allow object deletion, FK requirement will delete all the ActivityLog entries for this object.
  • Automatic logging: you are doing all this logging manually whenever you create/modify(/delete) the object. With SA you could implement a SessionExtension with before_commit which would do the job for you automatically.

In this way you completely can avoid writing parts like below:

log = ActivityLog(activity_by=user, activity_type=ACTIVITY_ADD,
                  target_table=Document.__table__.name, target_id=doc.id,
                  target_title=doc.title)
session.add(log)

EDIT-1: complete sample code added

  • The code is based on the first non-FK version from http://techspot.zzzeek.org/?p=13.
  • The choice not to use FK is based on the fact that for audit purposes when the main object is deleted, it should not cascade to delete all the audit log entries. Also this keeps auditable objects unaware of the fact they are being audited.
  • Implementation uses a SA one-to-many relationship. It is possible that some objects are modified many times, which will result in many audit log entries. By default SA will load the relationship objects when adding a new entry to the list. Assuming that during "normal" usage we would like only to add new audit log entry, we use lazy='noload' flag so that the relation from the main object will never be loaded. It is loaded when navigated from the other side though, and also can be loaded from the main object using custom query, which is shown in the example as well using activitylog_readonly readonly property.

Code (runnable with some tests):

from datetime import datetime

from sqlalchemy import create_engine, Column, Integer, SmallInteger, String, DateTime, ForeignKey, Table, UnicodeText, Unicode, and_
from sqlalchemy.orm import relationship, dynamic_loader, scoped_session, sessionmaker, class_mapper, backref
from sqlalchemy.orm.session import Session
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm.interfaces import SessionExtension

import logging
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger()

ACTIVITY_ADD = 1
ACTIVITY_MOD = 2
ACTIVITY_DEL = 3

class ActivityLogSessionExtension(SessionExtension):
    _logger = logging.getLogger('ActivityLogSessionExtension')

    def before_commit(self, session):
        self._logger.debug("before_commit: %s", session)
        for d in session.new:
            self._logger.info("before_commit >> add: %s", d)
            if hasattr(d, 'create_activitylog'):
                log = d.create_activitylog(ACTIVITY_ADD)
        for d in session.dirty:
            self._logger.info("before_commit >> mod: %s", d)
            if hasattr(d, 'create_activitylog'):
                log = d.create_activitylog(ACTIVITY_MOD)
        for d in session.deleted:
            self._logger.info("before_commit >> del: %s", d)
            if hasattr(d, 'create_activitylog'):
                log = d.create_activitylog(ACTIVITY_DEL)


# Configure test data SA
engine = create_engine('sqlite:///:memory:', echo=False)
session = scoped_session(sessionmaker(bind=engine, autoflush=False, extension=ActivityLogSessionExtension()))
Base = declarative_base()
Base.query = session.query_property()

class _BaseMixin(object):
    """ Just a helper mixin class to set properties on object creation.  
    Also provides a convenient default __repr__() function, but be aware that 
    also relationships are printed, which might result in loading relations.
    """
    def __init__(self, **kwargs):
        for k,v in kwargs.items():
            setattr(self, k, v)

    def __repr__(self):
        return "<%s(%s)>" % (self.__class__.__name__, 
            ', '.join('%s=%r' % (k, self.__dict__[k]) 
                for k in sorted(self.__dict__) if '_sa_' != k[:4] and '_backref_' != k[:9])
            )

class User(Base, _BaseMixin):
    __tablename__ = u'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)

class Document(Base, _BaseMixin):
    __tablename__ = u'documents'
    id = Column(Integer, primary_key=True)
    title = Column(Unicode(255), nullable=False)
    body = Column(UnicodeText, nullable=False)

class Folder(Base, _BaseMixin):
    __tablename__ = u'folders'
    id = Column(Integer, primary_key=True)
    title = Column(Unicode(255), nullable=False)
    comment = Column(UnicodeText, nullable=False)

class ActivityLog(Base, _BaseMixin):
    __tablename__ = u'activitylog'
    id = Column(Integer, primary_key=True)

    activity_by_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    activity_by = relationship(User) # @note: no need to specify the primaryjoin
    activity_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    activity_type = Column(SmallInteger, nullable=False)

    target_table = Column(Unicode(20), nullable=False)
    target_id = Column(Integer, nullable=False)
    target_title = Column(Unicode(255), nullable=False)
    # backref relation for auditable
    target = property(lambda self: getattr(self, '_backref_%s' % self.target_table))

def _get_user():
    """ This method returns the User object for the current user.
    @todo: proper implementation required
    @hack: currently returns the 'user2'
    """
    return session.query(User).filter_by(name='user2').one()

# auditable support function
# based on first non-FK version from http://techspot.zzzeek.org/?p=13
def auditable(cls, name):
    def create_activitylog(self, activity_type):
        log = ActivityLog(activity_by=_get_user(),
                          activity_type=activity_type,
                          target_table=table.name, 
                          target_title=self.title,
                          )
        getattr(self, name).append(log)
        return log

    mapper = class_mapper(cls)
    table = mapper.local_table
    cls.create_activitylog = create_activitylog

    def _get_activitylog(self):
        return Session.object_session(self).query(ActivityLog).with_parent(self).all()
    setattr(cls, '%s_readonly' %(name,), property(_get_activitylog))

    # no constraints, therefore define constraints in an ad-hoc fashion.
    primaryjoin = and_(
            list(table.primary_key)[0] == ActivityLog.__table__.c.target_id,
            ActivityLog.__table__.c.target_table == table.name
    )
    foreign_keys = [ActivityLog.__table__.c.target_id]
    mapper.add_property(name, 
            # @note: because we use the relationship, by default all previous
            # ActivityLog items will be loaded for an object when new one is
            # added. To avoid this, use either dynamic_loader (http://www.sqlalchemy.org/docs/reference/orm/mapping.html#sqlalchemy.orm.dynamic_loader)
            # or lazy='noload'. This is the trade-off decision to be made.
            # Additional benefit of using lazy='noload' is that one can also
            # record DEL operations in the same way as ADD, MOD
            relationship(
                ActivityLog,
                lazy='noload',  # important for relationship
                primaryjoin=primaryjoin, 
                foreign_keys=foreign_keys,
                backref=backref('_backref_%s' % table.name, 
                    primaryjoin=list(table.primary_key)[0] == ActivityLog.__table__.c.target_id, 
                    foreign_keys=foreign_keys)
        )
    )

# this will define which classes support the ActivityLog interface
auditable(Document, 'activitylogs')
auditable(Folder, 'activitylogs')

# create db schema
Base.metadata.create_all(engine)


## >>>>> TESTS >>>>>>

# create some basic data first
u1 = User(name='user1')
u2 = User(name='user2')
session.add(u1)
session.add(u2)
session.commit()
session.expunge_all()
# --check--
assert not(_get_user() is None)


##############################
## ADD
##############################
_logger.info('-' * 80)
d1 = Document(title=u'Document-1', body=u'Doc1 some body skipped the body')
# when not using SessionExtension for any reason, this can be called manually
#d1.create_activitylog(ACTIVITY_ADD)
session.add(d1)
session.commit()

f1 = Folder(title=u'Folder-1', comment=u'This folder is empty')
# when not using SessionExtension for any reason, this can be called manually
#f1.create_activitylog(ACTIVITY_ADD)
session.add(f1)
session.commit()

# --check--
session.expunge_all()
logs = session.query(ActivityLog).all()
_logger.debug(logs)
assert len(logs) == 2
assert logs[0].activity_type == ACTIVITY_ADD
assert logs[0].target.title == u'Document-1'
assert logs[0].target.title == logs[0].target_title
assert logs[1].activity_type == ACTIVITY_ADD
assert logs[1].target.title == u'Folder-1'
assert logs[1].target.title == logs[1].target_title

##############################
## MOD(ify)
##############################
_logger.info('-' * 80)
session.expunge_all()
d1 = session.query(Document).filter_by(id=1).one()
assert d1.title == u'Document-1'
assert d1.body == u'Doc1 some body skipped the body'
assert d1.activitylogs == []
d1.title = u'Modified: Document-1'
d1.body = u'Modified: body'
# when not using SessionExtension (or it does not work, this can be called manually)
#d1.create_activitylog(ACTIVITY_MOD)
session.commit()
_logger.debug(d1.activitylogs_readonly)

# --check--
session.expunge_all()
logs = session.query(ActivityLog).all()
assert len(logs)==3
assert logs[2].activity_type == ACTIVITY_MOD
assert logs[2].target.title == u'Modified: Document-1'
assert logs[2].target.title == logs[2].target_title


##############################
## DEL(ete)
##############################
_logger.info('-' * 80)
session.expunge_all()
d1 = session.query(Document).filter_by(id=1).one()
# when not using SessionExtension for any reason, this can be called manually,
#d1.create_activitylog(ACTIVITY_DEL)
session.delete(d1)
session.commit()
session.expunge_all()

# --check--
session.expunge_all()
logs = session.query(ActivityLog).all()
assert len(logs)==4
assert logs[0].target is None
assert logs[2].target is None
assert logs[3].activity_type == ACTIVITY_DEL
assert logs[3].target is None

##############################
## print all activity logs
##############################
_logger.info('=' * 80)
logs = session.query(ActivityLog).all()
for log in logs:
    _ = log.target
    _logger.info("%s -> %s", log, log.target)

##############################
## navigate from main object
##############################
_logger.info('=' * 80)
session.expunge_all()
f1 = session.query(Folder).filter_by(id=1).one()
_logger.info(f1.activitylogs_readonly)
0

精彩评论

暂无评论...
验证码 换一张
取 消