Wednesday, December 30, 2009

Hibernate interceptor, not typical usage ;-)




Hibernate interceptor not typical usage




<?xmlversion="1.0"encoding="UTF-8"?>
<!-- Persistence deployment descriptor for dev profile -->
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             version="1.0">
             
   <persistence-unitname="ramp">
      <provider>org.hibernate.ejb.HibernatePersistence</provider>
      <jta-data-source>java:/rampDatasource</jta-data-source>
      <properties>
         <propertyname="hibernate.dialect"value="com.paddypower.ramp.common.core.persistence.hibernate.InformixDialect"/>
         <propertyname="hibernate.hbm2ddl.auto"value="none"/>
         <propertyname="hibernate.show_sql"value="false"/>
         <propertyname="hibernate.use_sql_comments"value="false"/>
         <propertyname="hibernate.format_sql"value="false"/>
         <propertyname="jboss.entity.manager.factory.jndi.name"value="java:/rampEntityManagerFactory"/>
         <propertyname="hibernate.default_batch_fetch_size"value="100"/>  
         <propertyname="hibernate.jdbc.batch_size"value="50"/> 
         <propertyname="cache.provider_class"value="org.hibernate.cache.EhCacheProvider"/>
         <propertyname="hibernate.cache.provider_class"value="net.sf.ehcache.hibernate.EhCacheProvider"/>
        <!-- <property name="hibernate.cache.provider_configuration_file_resource_path">ehcache.xml</property>-->
        <propertyname="hibernate.cache.use_second_level_cache"value="true"/>
        <propertyname="hibernate.cache.region_prefix"value=""/>
        <propertyname="hibernate.cache.use_query_cache"value="true"/>
         <!-- <property name="hibernate.default_catalog" value="ramp"/>
         <property name="hibernate.default_schema" value="ramp"/> -->
         <propertyname="hibernate.ejb.interceptor"  value="com.yarenty.common.core.intercept.YarentyInterceptor"/>
      </properties>
      
   </persistence-unit>

</persistence>


----------





Checking onFlushDirty

Checking changes in the objects - onFlushDirty
Hibernate interceptor is called every time update on managed object is required.
Easiest way to have it working is to expend EmptyInterceptor which implements Interceptor interface. And here one method is interest of us: boolean onFlushDirty
That method parameters are:

publicboolean onFlushDirty(Object entity, Serializable id, Object[] currentState, Object[] previousState, String[] propertyNames, Type[] types)

Object entity - our object
Serializable id - not interested ;-)
Object[] currentState - current state of object
Object[] previousState - previous state of object
String[] propertyNames - name on each property 
Type[] types - types

If there are lots of entities and we are interested only in few is good to have my standard registry/pool solution for processing them. So in that method I will have basically two lines:

//line responsible for processing real object changes:
if (ChangeListenersRegistry.containsListener(entity.getClass())) {
ChangeListenersRegistry.getListener(entity.getClass()).send(entity, currentState, previousState, propertyNames);
}
//as I don’t want to interfere with what hibernate is doing on DB side - call to standard hibernate interceptor:
returnsuper.onFlushDirty(entity, id, currentState, previousState, propertyNames, types);



Registry

package com.yarenty.common.core.intercept;

import java.util.HashMap;
import java.util.Map;


public class ChangeListenersRegistry {
static ChangeListenersRegistry instance;

protected final Map<Class<?>, InterceptorChangeListener> registry;

private ChangeListenersRegistry() {

registry = new HashMap<Class<?>,InterceptorChangeListener>();

registry.put(Selection.class, new SelectionListener());
registry.put(Event.class, new EventListener());
registry.put(Subclass.class, new SubclassListener());

}

public static ChangeListenersRegistry getInstance() {
if (instance == null) {
instance = new ChangeListenersRegistry();
}
return instance;
}

public static InterceptorChangeListener getListener(Class<?> clazz) {
return ChangeListenersRegistry.getInstance().registry.get(clazz);
}

public static boolean containsListener(Class<?> clazz) {
return ChangeListenersRegistry.getInstance().registry.containsKey(clazz);
}
}



InterceptorChangeListener - interface
package com.yarenty.common.core.intercept;

/**
 * Interface implemented by listeners that want's to participate in hibernate entity changes.
 *  
 * @author yarenty
 *
 */
publicinterface InterceptorChangeListener {

void send (Object entity, Object[] currentState, Object[] previousState, String[] propertyNames, long userId);
}





Example of Listener - Subclass
package com.yarenty.common.core.intercept.listeners;

import org.apache.log4j.Logger;

public class SubclassListener implements InterceptorChangeListener {

Logger log = Logger.getLogger(SubclassListener.class);
@Override
public void send(Object entity, Object[] currentState,
Object[] previousState, String[] propertyNames, long userId) {
try {
Subclass subclass = (Subclass) entity;
final Subclass sendMe = new PpSubclass();
InspectorResult out = Inspector.createUpdatedEmptyObject(currentState, previousState, propertyNames, sendMe);
sendMe.setSubclassId(subclass.getSubclassId());
sendMe.setSuperclass(subclass.getSuperclass());
sendMe.setUserId(subclass.getUserId());

if (out.isPublish()) {
PhaseAPI.buffer(sendMe,  Operation.UPDATE, null, userId);
}

} catch (Exception e) {
log.error("Error sending updated Subclass", e);
}

}

}

Entity inspector
package com.yarenty.common.core.intercept;

import org.apache.log4j.Logger;

/**
 * Hibernate change object inspector.
 * 
 * @author yarenty
 */
public class Inspector {

static Logger log = Logger.getLogger(Inspector.class);

/**
* Method is looking into current and previous state of passed object properties.<br/>
* When change is found, based on propertyName is trying to invoke method setXXX(param type) on sendMe object.
* @param currentState list of objects with current state
* @param previousState list of objects with previous state
* @param propertyNames name of property
* @param sendMe object to set
* @return <b>true</b> if object contains &quot;publish&quot; parameter and this parameter is changing to 'Y' - yes - means object should be created <br/>
* <b>false</b> otherwise <br/><br/>
* <b>sendMe</b> object should contain only changes! <small>(of course that depends what it has before calling this method ;-) )</small>
*/
public static InspectorResult createUpdatedEmptyObject(Object[] currentState, Object[] previousState, String[] propertyNames, Object sendMe) {
return createUpdatedEmptyObject(currentState, previousState, propertyNames, sendMe, null);  

}


public static InspectorResult createUpdatedEmptyObject(Object[] currentState, Object[] previousState, String[] propertyNames, Object sendMe, ArrayList<String> nativeFields)  
//throws IllegalArgumentException, IllegalAccessException, InvocationTargetException, SecurityException, NoSuchMethodException
InspectorResult output = new InspectorResult();
for(int i=0; i< propertyNames.length; i++) {
// treat empty strings as null, for consistency with isRestrictionParameterSet()
Object parameterValue = currentState[i];
Object lastParameterValue = previousState[i];

if ( "".equals(parameterValue) ) parameterValue = null;
if ( "".equals(lastParameterValue) ) lastParameterValue = null;
if ( parameterValue!=lastParameterValue && ( parameterValue==null || !parameterValue.equals(lastParameterValue) ) )
{

try {
Method m = null;
String methodName = "set"+propertyNames[i].substring(0,1).toUpperCase()+propertyNames[i].substring(1);

try {
if( parameterValue !=null) {
m = sendMe.getClass().getMethod(methodName,parameterValue.getClass());
} else {
m = sendMe.getClass().getMethod(methodName,lastParameterValue.getClass());
}
} catch (NoSuchMethodException e) {
if( parameterValue !=null) {

if ( parameterValue instanceof Short) {
m = sendMe.getClass().getMethod(methodName, short.class);
} else 
if ( parameterValue instanceof Integer) {
m = sendMe.getClass().getMethod(methodName, int.class);
} else 
if ( parameterValue instanceof Double) {
m = sendMe.getClass().getMethod(methodName, double.class);
} else 
if ( parameterValue instanceof Character) {
m = sendMe.getClass().getMethod(methodName, char.class);
} else 
if ( parameterValue instanceof Float) {
m = sendMe.getClass().getMethod(methodName, float.class);
} else 
if ( parameterValue instanceof Boolean) {
m = sendMe.getClass().getMethod(methodName, boolean.class);
} else 
if ( parameterValue instanceof Byte) {
m = sendMe.getClass().getMethod(methodName, byte.class);
} else 
if ( parameterValue instanceof Long) {
m = sendMe.getClass().getMethod(methodName, long.class);
} else 
if ( parameterValue instanceof Timestamp){ // fix for timestamp issue!
m = sendMe.getClass().getMethod(methodName, Date.class);
parameterValue  = new Date(((Timestamp)parameterValue).getTime());
}
}
else {
if ( lastParameterValue instanceof Short) {
m = sendMe.getClass().getMethod(methodName, short.class);
} else 
if ( lastParameterValue instanceof Integer) {
m = sendMe.getClass().getMethod(methodName, int.class);
} else 
if ( lastParameterValue instanceof Double) {
m = sendMe.getClass().getMethod(methodName, double.class);
} else 
if ( lastParameterValue instanceof Character) {
m = sendMe.getClass().getMethod(methodName, char.class);
} else 
if ( lastParameterValue instanceof Float) {
m = sendMe.getClass().getMethod(methodName, float.class);
} else 
if ( lastParameterValue instanceof Boolean) {
m = sendMe.getClass().getMethod(methodName, boolean.class);
} else 
if ( lastParameterValue instanceof Byte) {
m = sendMe.getClass().getMethod(methodName, byte.class);
} else 
if ( lastParameterValue instanceof Long) {
m = sendMe.getClass().getMethod(methodName, long.class);
} else 
if ( lastParameterValue instanceof Timestamp){ // fix for timestamp issue!
m = sendMe.getClass().getMethod(methodName, Date.class);
}

}

}

if (m!=null) { 
m.invoke(sendMe,parameterValue);
if (log.isDebugEnabled()) {
log.debug("PHASE:: I'll send: "+ propertyNames[i]+ " because is " + parameterValue + " and was " + lastParameterValue);
}
} else {

if( parameterValue !=null) {
throw new NoSuchMethodException("Really - can't find:: "+ methodName +"("+parameterValue.getClass().getName()+") for propertyNames["+i+"]::"+propertyNames[i]+" even after trying with prymitives!");
} else {
throw new NoSuchMethodException("Really - can't find:: "+ methodName +"("+lastParameterValue.getClass().getName()+") for propertyNames["+i+"]::"+propertyNames[i]+" even after trying with prymitives! last");
}
}
// NO MORE PUBLISH!!!
////check for changes of one special field = published - means - should be created in handler
//if (propertyNames[i].equals("published") && ( ((Character) parameterValue) == 'Y' )) {
//if (log.isDebugEnabled()) {
//log.debug("PHASE:: Published changed to Y so ... CREATE"); 
//}
//output.setCreate(true);
//}

//check for changes of one very special field = removed - means need to be deleted in Orbis
if (propertyNames[i].equals("removed") && ( ((Character) parameterValue) == 'Y' )) {
if (log.isDebugEnabled()) {
log.debug("PHASE:: Removed changed to Y so ... DELETE"); 
}
output.setDelete(true);
}

// if native field was changed - it must be informed about that
if (nativeFields!=null){
if (nativeFields.contains(propertyNames[i])) {
output.setNativeChange(true);
}
}

} catch (SecurityException e) {
log.warn("PHASE:: reflection is not working! se: "+ e);
} catch (NoSuchMethodException e) {
log.warn("PHASE:: reflection is not working! nsme: " + e);
} catch (IllegalArgumentException e) {
log.warn("PHASE:: reflection is not working! iae: " + e);
} catch (IllegalAccessException e) {
log.warn("PHASE:: reflection is not working! iacce: " + e);
} catch (InvocationTargetException e) {
log.warn("PHASE:: reflection is not working! ite: " + e);
} else {
// to avoid ending updates before create 
if (propertyNames[i].equals("published") && ( ((Character) parameterValue) == 'N' )) {
output.setPublish(false);
log.warn("PHASE:: not published::"+ sendMe);
}

}

}
return output;
}
}






We have our changed object now - lets do something with them ....



package com.yarenty.common.core;


/**
 * @author yarenty
 */
public class PhaseAPI {

private final static Logger log = Logger.getLogger(PhaseAPI.class);

// to avoid creating other instance!
private PhaseAPI() {
}

/**
* SingletonHolder is loaded on the first execution of PhaseAPI.getInstance() 
* or the first access to SingletonHolder.INSTANCE, not before.
*/
private static class SingletonHolder {
private static final PhaseAPI INSTANCE = new PhaseAPI();
}

public static PhaseAPI getInstance() {
return SingletonHolder.INSTANCE;
}


// /////////////////////////////
// SINGLE PACKETS OPERATIONS
// /////////////////////////////

/**
* Create PHASE packet and put them into buffer
* @param sendMe
*            - object to send
* @param mode
*            - mode type UPDATE/CREATE/DELETE
* @param nativeChange
*            - has native field change (like, int, short, char..)
* @return
*/
public static Future<?> buffer(Object sendMe, Operation mode, Boolean nativeChange, long userId) {
// return PhaseBufferingThreadPoolExecutor.getInstance().addToBuffer( sendMe, mode, nativeChange);
PhasePacketCreator sender = PhasePacketCreatorPool.getInstance().getSender();
if (PhaseConvertersRegistry.isConverterAvailable(sendMe.getClass())) {
try {
final String group = PhaseUtils.getGroupGlobalId(sendMe);
if (log.isDebugEnabled()) {
log.debug("PHASE: group:" + group);
}
sender.buffer(group, sendMe, mode, nativeChange, userId);
} catch (ConnectionFailedException e) {
log.error("PHASE: ERROR:" + e, e);
} catch (SQLException e) {
log.error("PHASE: ERROR:" + e, e);
} catch (NamingException e) {
log.error("PHASE: ERROR:" + e, e);
} catch (JMSException e) {
log.error("PHASE: ERROR:" + e, e);
} catch (ObjectInstanceNotFoundException e) {
log.error("PHASE: ERROR:" + e, e);
} catch (PhaseDataValidatorException e) {
log.error("PHASE: ERROR:" + e, e);
} catch (PhaseUnknownException e) {
log.error("PHASE: ERROR:" + e, e);
} finally {
PhasePacketCreatorPool.getInstance().returnSender(sender);
}

} else {
log.warn("PHASE:: No registered converter for :" + sendMe.getClass().getCanonicalName());
}

return null;
}

/**
* Create PHASE packet and put them into buffer
* @param group
*            - send packet in specific group
* @param sendMe
*            - object to send
* @param mode
*            - mode type UPDATE/CREATE/DELETE
* @return
*/
public static Future<?> buffer(String group, Object sendMe, Operation mode, long userId) {
// return PhaseBufferingThreadPoolExecutor.getInstance().addToBuffer( sendMe, mode, nativeChange);
PhasePacketCreator sender = PhasePacketCreatorPool.getInstance().getSender();
if (PhaseConvertersRegistry.isConverterAvailable(sendMe.getClass())) {
try {
if (log.isDebugEnabled()) {
log.debug("PHASE: group:" + group);
}
sender.buffer(group, sendMe, mode, false, userId);
} catch (ConnectionFailedException e) {
log.error("PHASE: ERROR:" + e, e);
} catch (SQLException e) {
log.error("PHASE: ERROR:" + e, e);
} catch (NamingException e) {
log.error("PHASE: ERROR:" + e, e);
} catch (JMSException e) {
log.error("PHASE: ERROR:" + e, e);
} catch (ObjectInstanceNotFoundException e) {
log.error("PHASE: ERROR:" + e, e);
} catch (PhaseDataValidatorException e) {
log.error("PHASE: ERROR:" + e, e);
} catch (PhaseUnknownException e) {
log.error("PHASE: ERROR:" + e, e);
} finally {
PhasePacketCreatorPool.getInstance().returnSender(sender);
}

} else {
log.warn("PHASE:: No registered converter for :" + sendMe.getClass().getCanonicalName());
}

return null;
}

public static Future<?> commit(Object sendMe, long userId) {
return commit(sendMe, false, userId);
}



// TODO: add packet_id update code!!
/**
* Commit - get packet from buffer and sent them to phase
*/
public static Future<?> commit(Object sendMe, boolean isSetupEntity, long userId) {

final long GID = PhaseUtils.getGlobalId(sendMe, userId);
log.info("PHASE:: GID::Searching buffer for GID:" + GID);

final BufferedCacheObject cachedObject = BufferedCacheObjectMap.getInstance().get(PhaseUtils.getGlobalId(sendMe, userId));

if (cachedObject != null) {

// if it is setup entity - check if there is CREATE package - and do nothing
if (isSetupEntity) {

// if there is no CREATE package  - cool - send them
if (cachedObject.getCreatePacket() == null) {

//send update only
sendPacket(cachedObject.getUpdatePacket());
log.info("PHASE:: GID::" + GID + " cachedObject has a UPDATE packet and is SetupEntity.");

//clear bufffer
BufferedCacheObjectMap.getInstance().remove(PhaseUtils.getGlobalId(sendMe, userId));

//otherwise you should wait 
} else {
log.info("PHASE:: GID::" + GID + " cachedObject has a CREATE packet and as is SetupEntity - will not be send at this moment.");
}

// else send 
} else {

//send both
if (cachedObject.getCreatePacket() != null) {
log.info("PHASE:: GID::" + GID + " cachedObject has a CREATE packet and is not SetupEntity.");
sendPacket(cachedObject.getCreatePacket());
}

if (cachedObject.getUpdatePacket() != null) {
log.info("PHASE:: GID::" + GID + " cachedObject has a UPDATE packet.");
sendPacket(cachedObject.getUpdatePacket());
}

//clear bufffer
BufferedCacheObjectMap.getInstance().remove(GID);
}

} else {
log.info("PHASE:: GID::" + GID + " cachedObject for this GID is null");
}

if (log.isDebugEnabled()) {
log.debug("PHASE:: Finished::" + sendMe.getClass().getName() + ". Objects still in buffer :" + BufferedCacheObjectMap.getInstance().size());
}
return null;
}





private static void sendPacket(MultiDestDataPacket packet) {
if (packet != null) {
// PhasePacketUpdater updater = PhasePacketUpdaterPool.getInstance().getUpdater();
try {
// send them
long packetId;

packetId = MessageSenderSingleton.getSender().sendMessage(packet);

// TODO update packed id
if (log.isDebugEnabled()) {
log.debug("PHASE:: packet id=" + packetId + ",  but packet_id will not be updated - as updatePacket method is commented out.");
}
// updater.updatePacket(sendMe, packetId);
// log.debug("PHASE::NOT UPDATED!!!");

// } catch (RampConnectionFailedException e) {
// log.error("PHASE:: Connection Error updating package id:"+e,e);
// } catch (SQLException e) {
// log.error("PHASE:: SQL Error updating package id:"+e,e);
} catch (ObjectInstanceNotFoundException e) {
log.error("PHASE:: Error sending object:" + e, e);
} catch (PhaseDataValidatorException e) {
log.error("PHASE:: Error sending object:" + e, e);
} catch (PhaseUnknownException e) {
log.error("PHASE:: Error sending object:" + e, e);
} catch (Exception e) {
log.error("PHASE:: Error sending object:" + e, e);
//} finally {
// PhasePacketUpdaterPool.getInstance().returnUpdater(updater);
}
}
}

/**
* Removing single packet from buffer
* @param sendMe
*/
public static void rollback(Object sendMe, long userId) {
BufferedCacheObjectMap.getInstance().remove(PhaseUtils.getGlobalId(sendMe, userId));
}

}




Datafusion Comet

Hi! Recently I moved to Rust and working on several projects - more insights to come ... one of them was Datafusion - an extremely fast S...