Wednesday, April 29, 2015

Domain Model for Data Entry Applications



Domain Model for Data Entry Applications
by  Maksim Kozyarchuk




Overview

Domain Model is at the core of data-driven systems.  Unlike the presentation layer which gets customized for each use case, there should only be one Domain Model.  It’s role is to define how data is used across the system, including Data Entry, Reporting, Batch Processing and other applications.  Having a well designed Domain Model is one of the key requirements of supporting multiple applications on top of the same datastore. In this article, I will extend ReactiveFramework to support Domain Model binding without delving into the architecture and design elements of the Domain Model itself.  This topic will be covered subsequent posts.

Extending ReactiveFramework to support Domain Model mappings

There is often a structural mismatch, between the way Domain Model represents the data and the way the data is presented to the user for entry. For example, classical portfolio management systems, will model instrument data as a separate domain object from trade data.  However, depending on instrument traded, data entry forms often combine trade and instrument fields.  This makes mapping between a presentation model and domain model more complex than one-to-one mapping supported by many UI frameworks.  In this article, I will use FX Forward transaction example used for introduction of ReactiveFramework to demonstrate implementation and the types of mapping needed between domain and presentation layers.  To support mapping between ReactiveFramework and Domain Model layer, domain_mapping attribute is introduced to the Field class as a way of declaratively defining the mappings.

class Field:
   def __init__(self, name, datatype, validation_method=None,
                calculation_method=None, domain_mapping = None):
       self.name = name
       self.datatype = datatype
       self.validation_method = validation_method
       self.calculation_method = calculation_method
       self.domain_mapping = domain_mapping

For basic mappings domain_mapping field is populated with <DomainObject.field_name> where DomainObject represents the domain object to use ( Trade or Instrument for FX Forward case) and field_name describes the field where a particular data element needs to be saved or loaded from.  Some of the mappings will not be as straight-forward and will require mapping between a single presentation field and multiple domain fields as well as data transformation when saving or loading data.  To support more complex use case, domain_mapping will support references to named mapping methods implemented on the presentation model.

For the purpose of demonstration of the types of mapping that occurs mapping between presentation and Domain Model, I’ve extended FX Forward example with a few more fields.  Following is the new FieldFactory class
      
class FieldFactory:
   FIELDS = [ Field(name='action', datatype=str, validation_method='must_be_provided',
                    domain_mapping = "Trade.action"),
             Field(name='currency_pair', datatype=str, validation_method='valid_currency_pair',
                   domain_mapping = "map_currency_pair"),              
             Field(name='primary_amount', datatype=Decimal, calculation_method='calc_primary_amount',
                   validation_method='must_be_provided', domain_mapping = "Trade.quantity"),
             Field(name='secondary_amount', datatype=Decimal, calculation_method='calc_secondary_amount',
                   validation_method='must_be_provided'),
             Field(name='deal_fx_rate', datatype=Decimal, calculation_method='calc_deal_fx_rate',
                   validation_method='must_be_provided', domain_mapping = "Trade.price"),
             Field(name='trade_date', datatype=datetime.date, validation_method='must_be_provided',
                   domain_mapping = "Trade.trade_date"),              
             Field(name='expiration_date', datatype=datetime.date, validation_method='after_trade_date',
                   domain_mapping = "map_expiration_date"),              
             Field(name='commission', datatype=Decimal, calculation_method='calc_commission',
                   validation_method='must_be_provided', domain_mapping = "Trade.commission"),
  ]

   @classmethod
   def getField(cls, field_name):
       for field in cls.FIELDS:
           if field.name == field_name:
               return field

This example includes three types of mappings needed for FXTransaction Model
  • <Trade.field_name> maps to a field on the Trade domain object
  • <Instrument.field_name>  maps to a field on the Instrument domain object
  • <map_currency_pair> and <map_expiration_date>  that call a mapping function defined in FXTransaction class



Defining the DomainModel

For the purpose of this example I’ve chosen to use SQLAlchemy for implementation of the domain model because I find automatic schema maintenance, protection against common coding errors and unit of work pattern to be very helpful.  Below is the definition of the domain model that will be used in this article.

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, Date, Numeric
from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker

def get_engine():
   return create_engine( "mysql://{user}:{passwd}@{host}/{db}".format(**mysql_params))

Base = declarative_base()
_Session = None
def get_session():
   global _Session
   _Session = sessionmaker(bind=get_engine())
   return _Session()

class Instrument(Base):
   __tablename__ = 'instrument'

   id          = Column( Integer, primary_key=True)
   name        = Column( String(255))
   currency    = Column( String(3))
   ins_type    = Column( String(255))
   currency    = Column( String(255))
   underlying  = Column( String(255))
   exp_date    = Column( Date)


class Trade(Base):
   __tablename__ = 'trade'
    
   id          =  Column(Integer, primary_key=True)
   instrument_id = Column( Integer, ForeignKey('instrument.id'))
   quantity    = Column(Numeric(27,8))
   price       = Column(Numeric(31,12))
   trade_date  = Column(Date)
   settle_date = Column(Date)
   action      = Column(String(255))
   commission  = Column(Numeric(21,2))

   @classmethod
   def find_all(cls):
       session = get_session()
       return session.query(cls).all()

def build_schema():
   engine = get_engine( )
   Base.metadata.drop_all(engine)
   Base.metadata.create_all(engine)

 This example uses MySQL, but it will work just as well with MS SQL Server and I suspect other database engines supported by SQLAlchemy.  For the purpose of this example, I’ve significantly simplified Trade and Instrument domain objects as well.  i.e, in practice underlying and currency would references back to the instrument table rather than be defined as a string.



Saving the trade

Saving the trade requires orchestration between the instrument and the trade domain object.  The instrument object has to be saved first, then it’s id needs to be set as instrument_id of the trade.  This logic will be different depending on the type of trade, as some trades are executed on listed instrument that are expected to pre-exist, yet others require coordination between multiple instruments and other objects.  Therefore, the logic to perform the actual saving is handled by the presentation model. ReactiveFramework will be responsible for mapping the presentation model data to the model’s domain objects and calling model’s save method.

ReactiveFrameworks save method is below.

   def save(self):
       for field in self.get_fields():
           field.map_to_domain()
       return self.model.save()

map_to_domain method simply calls the pre-configured domain_mapping_method indicating mapping direction

   def map_to_domain(self):
       if self.domain_mapping_method:
           self.domain_mapping_method(self, self.TO)

domain_mapping_method is constructed by parsing domain_mapping attribute and establishing appropriate runtime binding between domain_model and presentation model.

   def _bind_domain_mapping_method(self, model):
       mapping = self.definition.domain_mapping
       if not mapping:
           return
       split_map = mapping.split(".")
       if len(split_map) == 1:
           return self._bind_method('domain_mapping', model)
       elif len(split_map) == 2:
           def mapper_function_wrapper(field, direction):
               domain_object = model.get_domain_object(split_map[0])
               if direction == self.TO:
                   setattr(domain_object,split_map[1],field.value )
               else:
                   return getattr(domain_object,split_map[1])
           return mapper_function_wrapper
       else:
           raise Exception("Invalid domain_mapping %s" % mapping)

To support saving trades, FXTransation model needs to expose the domain_objects being operated on.  For the case of booking a new trade, an empty trade and instrument objects are created in the initializer.  Also a getter method is provided to access domain objects by name.

    def __init__(self):
       self._domain_objects = {}
       self._domain_objects[self.INSTRUMENT] = Instrument( ins_type = 'FX Forward')
       self._domain_objects[self.TRADE] = Trade()

   def get_domain_object(self, name):
       return self._domain_objects[ name ]

The following is the implementation of the custom mapper methods used to map currency_pair and expiration date.  These mapper methods can be called with either TO or a FROM operation to indicate direction of the mapping.  When saving the trade the mappers are called with the FROM operation and are responsible for transferring value of the field to the appropriate domain objects.  Mapper methods are called after the model has been fully validated and therefore it is safe to use other fields from the presentation model in the mapper methods.

   def map_currency_pair(self, field, direction):
       if direction == field.TO:
           instrument = self.get_domain_object(self.INSTRUMENT)
           instrument.name = "%s %s" % (field.value, self.expiration_date.value)
           instrument.underlying, instrument.currency = field.value.split("/")
       else:
           return self.get_domain_object(self.INSTRUMENT).name.split(" ")[0]
       
   def map_expiration_date(self, field, direction):
       if direction == field.TO:
           self.get_domain_object(self.INSTRUMENT).exp_date = field.value
           self.get_domain_object(self.TRADE).settle_date = field.value
       else:
           return self.get_domain_object(self.INSTRUMENT).exp_date


Save method of FXTransaction is responsible for saving the domain objects that have been populated in the mapping step.  In this case it will use SQLAlchemy’s unit of work pattern to first save and flush the instrument record to disk which will give it the identity value.  Then after setting instrument.id on the trade, commit both updates in a single database transaction.  The method will return id of the trade that saved as a way of providing additional information to the user in case he/she wants to double check details of the record.   

   def save(self):
       instrument = self.get_domain_object(self.INSTRUMENT)
       s = get_session()
       s.add(instrument)
       s.flush()
       trade = self._domain_objects[self.TRADE]
       trade.instrument_id = instrument.id
       s.add(trade)
       s.commit()
       trade_id = trade.id
       s.close()
       self._domain_objects = []
       return trade_id

For the purpose of simplicity, I’ve omitted a check for existance of a duplication instrument in the database.   Handling of duplicate instruments depends on domain model implementation and something that will be discussed later.


Loading and editing the trade

ReactiveFramework’s Domain Model mappings also provides ability to load the trade from the database into the presentation model.  This will in turn enable viewing, editing or deleting trades.   Initializing presentation model state from domain objects is inverse operation to mapping presentation model to the domain, with some additional logic to handle initialization of calculated fields.  ReactiveFramework’s load method is as follows.
   def load(self,trade_id):
       self.model.load(trade_id)
       
       for field in self.get_fields():
           field.map_from_domain()
       
       recalculated = []
       for field in self.get_fields():
           if not field.has_value:
               self._recalc_field(field.name, recalculated)
               
       for field in self.get_fields():
           field.has_user_entered_value = False

First step is to load domain objects for a given trade id into the presentation model.  Then a call to map_from_domain is made for every field in the model to initialize it.   When loading fields from domain_model dependency based calculation is not performed to avoid redundant calculations of fields that are already known.   Recalculation of all fields without a value is done once all mapping have been executed.  Last step is to reset has_user_entered_value flag to False enabling recalculations when the user edits the trade.

The map_from_domain method will call the domain_mapping_method described in previous section and if it returns a value, that value will be set on the field.

   def map_from_domain(self):
       if self.domain_mapping_method:
           value = self.domain_mapping_method(self, self.FROM)
           if value is not None:
               self.set_value(value)    


Mapper methods in the presentation model support bi-directional mapping.  When direction is FROM, the method is expected to return the value that will be set on the presentation model.

   def map_currency_pair(self, field, direction):
       if direction == field.TO:
           instrument = self.get_domain_object(self.INSTRUMENT)
           instrument.name = "%s %s" % (field.value, self.expiration_date.value)
           instrument.underlying, instrument.currency = field.value.split("/")
       else:
           return self.get_domain_object(self.INSTRUMENT).name.split(" ")[0]
       
   def map_expiration_date(self, field, direction):
       if direction == field.TO:
           self.get_domain_object(self.INSTRUMENT).exp_date = field.value
           self.get_domain_object(self.TRADE).settle_date = field.value
       else:
           return self.get_domain_object(self.INSTRUMENT).exp_date

The load method of FXTransaction model is SQLAlchemy specific in this implementation. It loads relevant domain objects into the presentation model that will later be used by map_from_domain_method.

   def load(self, trade_id):
       self._domain_objects = {}
       s = get_session()
       self._domain_objects[self.TRADE] = s.query(Trade).get( trade_id )
       self._domain_objects[self.INSTRUMENT] = s.query(Instrument).get( self._domain_objects[self.TRADE].instrument_id )
       s.close()




More on DomainModels

Effective design of a DomainModel is critical to design of any system and especially to a data driven system such as Trade Model.   In this article I’ve used SQLAlchemy based Domain Model, which while helpful and easy to get going.  However it has both significant functional complexity and performance overhead that make it less than an ideal fit for larger Domain Model implementation.   However SQLAlchemy offers conveniences and rapid development that earn it a place within domain model implementation.  I will discuss this in more detail in another article.

Furthermore, complete Trade Model will have over a dozen domain objects and both instrument and trade domain objects will have much greater functional complexity, especially the instrument domain object.  This is also a topic for another post.


Complete Source Code Listing

I am including complete listing of the ReactiveFramework codebase for reference.

from decimal import Decimal
import datetime
from models.alch_model import Instrument, Trade, get_session

class Field:
   def __init__(self, name, datatype, validation_method=None,
                calculation_method=None, domain_mapping = None):
       self.name = name
       self.datatype = datatype
       self.validation_method = validation_method
       self.calculation_method = calculation_method
       self.domain_mapping = domain_mapping
       
      
class FieldFactory:
   FIELDS = [ Field(name='action', datatype=str, validation_method='must_be_provided',
                    domain_mapping = "Trade.action"),
             Field(name='currency_pair', datatype=str, validation_method='valid_currency_pair',
                   domain_mapping = "map_currency_pair"),              
             Field(name='primary_amount', datatype=Decimal, calculation_method='calc_primary_amount',
                   validation_method='must_be_provided', domain_mapping = "Trade.quantity"),
             Field(name='secondary_amount', datatype=Decimal, calculation_method='calc_secondary_amount',
                   validation_method='must_be_provided'),
             Field(name='deal_fx_rate', datatype=Decimal, calculation_method='calc_deal_fx_rate',
                   validation_method='must_be_provided', domain_mapping = "Trade.price"),
             Field(name='trade_date', datatype=datetime.date, validation_method='must_be_provided',
                   domain_mapping = "Trade.trade_date"),              
             Field(name='expiration_date', datatype=datetime.date, validation_method='after_trade_date',
                   domain_mapping = "map_expiration_date"),              
             Field(name='commission', datatype=Decimal, calculation_method='calc_commission',
                   validation_method='must_be_provided', domain_mapping = "Trade.commission"),
  ]

   @classmethod
   def getField(cls, field_name):
       for field in cls.FIELDS:
           if field.name == field_name:
               return field

class FXTransaction:
   INSTRUMENT = 'Instrument'
   TRADE = 'Trade'
   
   FIELD_DEPENDS = {
      'action': [],
      'currency_pair': [],
      'primary_amount': ['secondary_amount', 'deal_fx_rate'],
      'secondary_amount': ['primary_amount', 'deal_fx_rate'],
      'deal_fx_rate' : ['primary_amount', 'secondary_amount'],
      'trade_date' : [],
      'expiration_date' : [],
      'commission' : ['primary_amount']
  }
   
   def __init__(self):
       self._domain_objects = {}
       self._domain_objects[self.INSTRUMENT] = Instrument( ins_type = 'FX Forward')
       self._domain_objects[self.TRADE] = Trade()
   
   def bind_fields(self):
       for field_name in self.FIELD_DEPENDS:
           setattr(self, field_name, BoundField(FieldFactory.getField(field_name), self))
          
   def calc_commission(self):
       return self.primary_amount.value * Decimal("0.01")
  
   def calc_secondary_amount(self):
       return self.primary_amount.value * self.deal_fx_rate.value

   def calc_primary_amount(self):
       if self.deal_fx_rate.value:
           return self.secondary_amount.value / self.deal_fx_rate.value

   def calc_deal_fx_rate(self):
       if self.primary_amount.value:
           return self.secondary_amount.value / self.primary_amount.value
      
   def must_be_provided(self, field):
       return "" if field.has_value else "%s is missing" % field.name  

   def valid_currency_pair(self, field):
       if self.must_be_provided(field):
           return self.must_be_provided(field)
       currencies = field.value.split("/")
       if len(currencies) ==2 and all(3==len(curr) for curr in currencies):
           return ""
       else:
           return "Invalid Currency Pair %s" %  field.value

   def after_trade_date(self, field):
       if self.must_be_provided(field):
           return self.must_be_provided(field)
       if field.value <= self.trade_date.value:
           return "%s must after trade date %s" % (field.value, self.trade_date.value)
       return ""
       
   def map_currency_pair(self, field, direction):
       if direction == field.TO:
           instrument = self.get_domain_object(self.INSTRUMENT)
           instrument.name = "%s %s" % (field.value, self.expiration_date.value)
           instrument.underlying, instrument.currency = field.value.split("/")
       else:
           return self.get_domain_object(self.INSTRUMENT).name.split(" ")[0]
       
   def map_expiration_date(self, field, direction):
       if direction == field.TO:
           self.get_domain_object(self.INSTRUMENT).exp_date = field.value
           self.get_domain_object(self.TRADE).settle_date = field.value
       else:
           return self.get_domain_object(self.INSTRUMENT).exp_date

   def get_domain_object(self, name):
       return self._domain_objects[ name ]
   
   def save(self):
       instrument = self.get_domain_object(self.INSTRUMENT)
       s = get_session()
       s.add(instrument)
       s.flush()
       trade = self._domain_objects[self.TRADE]
       trade.instrument_id = instrument.id
       s.add(trade)
       s.commit()
       trade_id = trade.id
       s.close()
       self._domain_objects = []
       return trade_id
       
   def load(self, trade_id):
       self._domain_objects = {}
       s = get_session()
       self._domain_objects[self.TRADE] = s.query(Trade).get( trade_id )
       self._domain_objects[self.INSTRUMENT] = s.query(Instrument).get( self._domain_objects[self.TRADE].instrument_id )

       
class BoundField:
   TO = 'TO'
   FROM = 'FROM'
   
   def __init__(self, field_definition, model):
       self.definition = field_definition
       self.value = None
       self.has_value = False
       self.has_user_entered_value = False
       self.calculation_method = self._bind_method('calculation_method', model)
       self.validation_method = self._bind_method('validation_method', model)
       self.domain_mapping_method = self._bind_domain_mapping_method(model)

   def _bind_method(self, method, model):
       if getattr(self.definition, method):
           return  getattr(model, getattr(self.definition, method))

   def set_value(self, value, user_entered=True):
       if not isinstance(value, self.definition.datatype):
           self.value = self.definition.datatype(value)
       else:
           self.value =  value
           
       self.has_value = True
       self.has_user_entered_value = user_entered

   def recalc(self):
       if not self.has_user_entered_value:
           if self.calculation_method:
               self.set_value(self.calculation_method(), user_entered=False)
           return True
       return False
  
   def validate(self):
       if self.validation_method:
           return self.validation_method(self)
   
   @property
   def name(self):
       return self.definition.name

   def _bind_domain_mapping_method(self, model):
       mapping = self.definition.domain_mapping
       if not mapping:
           return
       split_map = mapping.split(".")
       if len(split_map) == 1:
           return self._bind_method('domain_mapping', model)
       elif len(split_map) == 2:
           def mapper_function_wrapper(field, direction):
               domain_object = model.get_domain_object(split_map[0])
               if direction == self.TO:
                   setattr(domain_object,split_map[1],field.value )
               else:
                   return getattr(domain_object,split_map[1])
           return mapper_function_wrapper
       else:
           raise Exception("Invalid domain_mapping %s" % mapping)
  
   def map_to_domain(self):
       if self.domain_mapping_method:
           self.domain_mapping_method(self, self.TO)

   def map_from_domain(self):
       if self.domain_mapping_method:
           value = self.domain_mapping_method(self, self.FROM)
           if value is not None:
               self.set_value(value)    
   
class ReactiveFramework:
   __slots__ = ('model', 'depends_notifty')
   def __init__(self, model):
       self.model = model
       self.depends_notifty = {}
       self._init_depends_notifty()
       self.model.bind_fields()
  
   def _init_depends_notifty(self):
       for field_name, deps in self.model.FIELD_DEPENDS.items():
           for dep_name in deps:
               self.depends_notifty.setdefault(dep_name, [])
               self.depends_notifty[dep_name].append(field_name)

   def _are_dependents_set(self, field_name):
       for dep_field in self.model.FIELD_DEPENDS[field_name]:
           if not getattr(self.model, dep_field).has_value:
               return False
       return True
  
   def _recalc_field(self, field_name, recalculated):
       if self._are_dependents_set(field_name):
           if getattr(self.model, field_name).recalc():
               recalculated.append(field_name)
               self._recalc_dependents(field_name, recalculated)

   def _recalc_dependents(self, field_name, recalculated=None):
       if recalculated is None:        
           recalculated = []
       for field in self.depends_notifty.get(field_name, []):
           if field not in recalculated:
               self._recalc_field(field, recalculated)
       return recalculated
      
   def set_value(self, field_name, value):
       getattr(self.model, field_name).set_value(value)
       return self._recalc_dependents(field_name)

   def get_value(self, field_name):
       return getattr(self.model, field_name).value

   def validate(self):
       result = {}
       for field in self.get_fields():
           errors = field.validate()
           if errors:
               result[field.name] = errors
       return result
   
   def get_fields(self):
       return [self.get_field( field_name)
                 for field_name in self.model.FIELD_DEPENDS ]
       
   def get_field(self, field_name):
       return  getattr(self.model, field_name)
   
   @property
   def id(self):
       return str(id(self))

   def save(self):
       for field in self.get_fields():
           field.map_to_domain()
       return self.model.save()
       
   def load(self,trade_id):
       self.model.load(trade_id)
       
       for field in self.get_fields():
           field.map_from_domain()
       
       recalculated = []
       for field in self.get_fields():
           if not field.has_value:
               self._recalc_field(field.name, recalculated)
               
       for field in self.get_fields():
           field.has_user_entered_value = False
               

if __name__ == "__main__":
   fxTrade = ReactiveFramework(FXTransaction())
   print( fxTrade.set_value('action', 'Buy'))
   print( fxTrade.set_value('primary_amount', 100) )
   print( fxTrade.set_value('primary_amount', 100) )
   print( fxTrade.set_value('deal_fx_rate', 1.5))
   
   fxTrade.set_value('trade_date', datetime.date(year=2015, month=5, day = 1))
   fxTrade.set_value('expiration_date', datetime.date(year=2015, month=5, day = 2))
   fxTrade.set_value('currency_pair', 'EUR/USD')

   print( fxTrade.get_value("secondary_amount" ))
   print( fxTrade.get_value("commission"))

   print( fxTrade.validate())
   saved_trade_id = fxTrade.save()
   print("Saved Trade %s" % saved_trade_id)
   
   fxTrade2 = ReactiveFramework(FXTransaction())
   fxTrade2.load(trade_id = saved_trade_id)
   for field1, field2 in zip(fxTrade.get_fields(), fxTrade2.get_fields()):
       print ("%s is %s vs %s "% (field1.name,  field1.value, field2.value))
       assert field1.value == field2.value