The previous post was rather basic, but gives the general idea of what is to be done.
I've augmented the code, using an example from my colleagues (Thanks Silviu!) , as follows -
externalised connect info etc. in a properties file
specified the process ID and time constraints in the WHERE clause
The properties file is as follows -
  
#orabpel.platform=ias_10g
#java.naming.provider.url=opmn:ormi://localhost:6003:home/orabpel
# JNDI Properties required for Web Client
orabpel.platform=oc4j_10g
java.naming.provider.url=ormi://localhost:6010/orabpel
java.naming.factory.initial=oracle.j2ee.rmi.RMIInitialContextFactory
java.naming.security.principal=oc4jadmin
java.naming.security.credentials=welcome1
# JNDI Properties required for Remote Client
#orabpel.platform=oc4j_10g
#java.naming.provider.url=opmn:ormi://ncommisk-de:6010:oc4j_soa/orabpel
#java.naming.factory.initial=oracle.j2ee.rmi.RMIInitialContextFactory
#java.naming.security.principal=oc4jadmin
#java.naming.security.credentials=welcome1
  
# how many bpel domains must be recovered (usually just one)
domains.number=1
# ID of the bpel domain to be recovered
domain.1.ID=default
#domain.2.ID=oc4j_soa
#domain.3.ID=mydomain
# password of the bpel domain to be recovered
domain.1.password=welcome1
#domain.2.password=oc4j_soa
#domain.3.password=mydomain
# ID of the Process to be recovered
processID=AIADemoOrderEBF
#
# throttling - number of messages to be recovered in batch mode 
# if -1, then all the recoverable messages will be scheduled in a single batch
#batch.size = 10
# time between 2 consecutive batch recovering, in milli seconds
#batch.frequency = 5000
#
# can be one of the 4: second, minute, hour or day (case insensitive) 
# default is second
message.age.type=second
# message age in order to be candidate for manual recovering. 
# the significance of the number is given by message.age.type property
message.age = 5
# if true, it will display the summary about all the messages (callback, invoke, activity) in bpel engine.
# for debugging purpose only, not for production mode
verbose = true
# if true, does not actually recover any message. Default is false
simulate = true
The Recovery class is as follows -
package com.oracle;
import com.oracle.bpel.client.*;
import com.oracle.bpel.client.util.SQLDefs;
import com.oracle.bpel.client.util.WhereCondition;
import com.oracle.bpel.client.util.WhereConditionHelper;
import java.io.File;
import java.net.URL;
import java.sql.Date;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class Recovery{
    
    private static final int MSG_AGE_TYPE_SECOND  = 0;
    private static final int MSG_AGE_TYPE_MINUTE  = 1;
    private static final int MSG_AGE_TYPE_HOUR    = 2;
    private static final int MSG_AGE_TYPE_DAY     = 3;
    
    /* Default: Recover instances older than n seconds
     can be overwritten in properties file (message.age.type) */
    private int msgAgeType = MSG_AGE_TYPE_SECOND;
    private Date receiveDate;
    private int msgAge = 5;
    /* By default we will recover for all processIDs
       can be overwritten in properties file (processID)*/
    private String recoveryProcessID = "ALL"; 
  
    private Properties props = new Properties();
    
    /* Nr of Domains to be checked. Default is 1 (default domain)*/
    
     private int NrOfDomains = 1;
     
    /*configurable via the following in the properties file
     domains.number=2, domain.1.ID=default, domain.2.ID=oc4j_soa domain.1.password=welcome1
     domain.2.password=willkommen. Recover for domains default and oc4j_soa */
    /* if true,will output contents of the properties file*/
    private boolean verbose = true;
    /* if true, does not actually recover any message*/
    private boolean simulate = false;
    
    private String[] domainIDs;
    private String[] domainPasswords;
    private Locator locator;
   public Recovery() {
        this(new Properties());
    }
    public Recovery(Properties props) {
        if (props != null) {
           this.props = props;
        } else {
            this.props = new Properties();
        }
        retrieveProperties();
    }
   
   public String doRecover() throws Exception{
        
        // Read in the properties file
        Properties props = new Properties();
        URL url = new File("context.properties").toURL();
        props.load(url.openStream()); 
        System.out.println("props " + props.toString());
        Recovery recover = new Recovery(props);
        
        //
        // Do the recovery for all of the specified domains
        //
         for (int i = 0; i < NrOfDomains; i++) {
             try {
                 locator = new Locator(domainIDs[i], domainPasswords[i], props);
                 recoverInvoke();
                 recoverCallback();
             } catch (ServerException e) {
                 System.out.println("AutomaticMsgRecovery: An error occured while recovering messages for domain: " + 
                                    domainIDs[i]);
                 e.printStackTrace();
             }
         }
         return "***  Recovery Completed ***";
    }
      
        private void recoverCallbackMessages(List messages)
            throws Exception
        /* recoverCallbackMessages(): recover all Callback Messages*/ 
        {
            String messageGuids[] = new String[messages.size()];
            for(int i = 0; i < messages.size(); i++)
            {
                ICallbackMetaData callbackMetadata = (ICallbackMetaData)messages.get(i);
                String messageGuid = callbackMetadata.getMessageGUID();
                
                messageGuids[i] = messageGuid;
                System.out.println((new StringBuilder()).append("recovering callback message = ").append(messageGuids[i]).append(" process [").append(callbackMetadata.getProcessId()).append("(").append(callbackMetadata.getRevisionTag()).append(")] domain [").append(callbackMetadata.getDomainId()).append("]").toString());
            }
            if (!simulate){
               IBPELDomainHandle domainHandle = locator.lookupDomain();
               domainHandle.recoverCallbackMessages(messageGuids);
            }
        }
    
    private void recoverInvokeMessages(List messages)
        throws Exception
    {
        String messageGuids[] = new String[messages.size()];
        for(int i = 0; i < messages.size(); i++)
        {
            IInvokeMetaData invokeMetadata = (IInvokeMetaData)messages.get(i);
            String messageGuid = invokeMetadata.getMessageGUID();
            
            messageGuids[i] = messageGuid;
            System.out.println((new StringBuilder()).append("recovering invoke message = ").append(messageGuids[i]).append(" process [").append(invokeMetadata.getProcessId()).append("(").append(invokeMetadata.getRevisionTag()).append(")] domain [").append(invokeMetadata.getDomainId()).append("]").toString());
          
        }
        if (!simulate){
            IBPELDomainHandle domainHandle = locator.lookupDomain();
            domainHandle.recoverInvokeMessages(messageGuids);
        }
     }
    
    
      private void recoverCallback() throws Exception{
          //
          // look for Callback messages in need of recovery  
          //
          StringBuffer buf = new StringBuffer();
          WhereCondition wc = WhereConditionHelper.whereCallbackMessagesRecoverable(); //
          wc.append("AND");  
          // add older than date/time condition
          wc.append(getWhereConditionWithTimestamp(SQLDefs.IM_receive_date));
          wc.appendOrderBy(SQLDefs.IM_receive_date);
          
           // Add processID to the where clause, if specified
           if (!recoveryProcessID.equalsIgnoreCase("ALL")){
               System.out.println("recoverCallback() only include instances for Process ID: " + recoveryProcessID );
               buf.setLength( 0 );
               WhereCondition tmpWhere = new WhereCondition( buf.append( " AND " )
                                         .append( SQLDefs.AL_ci_process_id )
                                         .append( " = ? " )
                                         .toString() );
               tmpWhere.setString( 1, recoveryProcessID );
               wc.append( tmpWhere );
               System.out.println("recoverCallback() looking for Callback instances where: "+ wc.getClause() );
               System.out.println("process_id = "+ recoveryProcessID);
               System.out.println("receive_date = "+ receiveDate.toString() + " " + receiveDate.getTime());
               
           }
           try{
               ICallbackMetaData imd[] = locator.listCallbackMessages(wc);
               List l = new ArrayList();
               for (Object o:imd){
                   l.add(o);
               } 
               // See how many Callbacks are in the recovery zone
               System.out.println("recoverCallback() number of Callback instances to be recovered: " +l.size());
               if (l.size()>0){
                   recoverCallbackMessages(l);    
               }
               
           }
           catch (Exception e){
               e.printStackTrace();
           }
      }
      
    
      public String recoverInvoke() throws Exception{
           try{
         
             // look for Invoke messages in need of recovery           
              StringBuffer buf = new StringBuffer();
              
              //
               WhereCondition wc = WhereConditionHelper.whereInvokeMessagesRecoverable();
               wc.append("AND");
               // add older than date/time condition
               
               wc.append(getWhereConditionWithTimestamp(SQLDefs.IM_receive_date));
               wc.appendOrderBy(SQLDefs.IM_receive_date);
               
               // Add processID to the where clause, if specified
               if (!recoveryProcessID.equalsIgnoreCase("ALL")){
                  System.out.println("recoverInvoke() only include instances for Process ID: " + recoveryProcessID );
                   buf.setLength( 0 );
                   WhereCondition tmpWhere = new WhereCondition( buf.append( "AND " )
                                             .append( SQLDefs.AL_ci_process_id )
                                             .append( " = ? " )
                                             .toString() );
                   tmpWhere.setString( 1, recoveryProcessID );
                   wc.append( tmpWhere );
               }
              
               System.out.println("recoverInvoke() looking for Invoke instances where: "+ wc.getClause() );
               System.out.println("process_id = "+ recoveryProcessID);
               System.out.println("receive_date = "+ receiveDate.toString() + " " + receiveDate.getTime());
               
               IInvokeMetaData imd1[] = locator.listInvokeMessages(wc);
                              
              // iterate thru the list
               
               List l = new ArrayList();
               for (Object o:imd1){
                   l.add(o);
               }
               // See how many INVOKES are in the recovery zone
               System.out.println("recoverInvoke() number of Invoke instances to be recovered: " +l.size());
              
              if (l.size()>0){
                  recoverInvokeMessages(l);
              }
           }
          catch (Exception e){
              e.printStackTrace();
          }
          return "done";
      }
     
    /*public Locator getLocator(){
         System.out.println("getLocator() start");
         Locator locator = null;
                      
       // set JNDI properties for BPEL lookup
      String jndiProviderUrl = "opmn:ormi://ncommisk-de:6010:oc4j_soa/orabpel";
      String jndiFactory = "com.evermind.server.rmi.RMIInitialContextFactory";
      String jndiUsername = "oc4jadmin";
      String jndiPassword = "welcome1";
                  
      Hashtable jndi = new Hashtable();
      jndi.put(Context.PROVIDER_URL, jndiProviderUrl);
      jndi.put(Context.INITIAL_CONTEXT_FACTORY, jndiFactory);
      jndi.put(Context.SECURITY_PRINCIPAL, jndiUsername);
      jndi.put(Context.SECURITY_CREDENTIALS, jndiPassword);
      jndi.put("dedicated.connection", "true");
      
      try{
         System.out.println("getLocator() instantiating locator...");     
         locator = new Locator("default", "bpel", jndi);
         System.out.println("getLocator() instantiated locator");     
         }
     catch (Exception e){
         System.out.println("getLocator() error");     
         e.printStackTrace();
         }
                      
     return locator;
    }
    */
    private WhereCondition getWhereConditionWithTimestamp(String dateColumn) {
        /* Only pick up invoke messages that are older than x ...
         to avoid picking up the messages
         with state = 0, which are in process, but their state was not yet saved in DB
         Default is x, depending on following values specified in properties file
        e.g. message.age = 5 and message.age.type=second = 5 seconds
        e.g. message.age = 3 and message.age.type=minute = 3 minutes
        */
        WhereCondition wc = new WhereCondition();
        wc.append(dateColumn + " < ? ");
        
        // Assuming seconds         
         long millis = msgAge * 1000;
               
          if (msgAgeType == MSG_AGE_TYPE_MINUTE) {
            // minute type
            millis = millis * 60;
        } else if (msgAgeType == MSG_AGE_TYPE_HOUR) {
            // hour type
            millis = millis * 60 * 60;
        }
        else if (msgAgeType == MSG_AGE_TYPE_DAY) {
            // day type
            millis = millis * 24 * 60 * 60;
        }            
        receiveDate = new Date(System.currentTimeMillis() - millis);
        wc.setTimestamp(1, receiveDate);
        System.out.println(" *** getWhereConditionWithTimestamp(): looking for process instances older than " + millis/1000 + " seconds ");
        return wc;
    }
    
    private void retrieveProperties() {
        
        
        // Recover only for a particular process ID? 
        try {
           recoveryProcessID = props.getProperty("processID", "ALL");
        }
        catch (Exception e) {
            ;
        }
        
        try {
            String temp = props.getProperty("message.age.type", "second");
            if ("day".equalsIgnoreCase(temp))
                this.msgAgeType = MSG_AGE_TYPE_DAY;
            else if ("minute".equalsIgnoreCase(temp))
                this.msgAgeType = MSG_AGE_TYPE_MINUTE;
            else if ("hour".equalsIgnoreCase(temp))
                this.msgAgeType = MSG_AGE_TYPE_HOUR;
            else // default
                this.msgAgeType = MSG_AGE_TYPE_SECOND;
        } catch (Exception e) {
            ;
        }
        try {
            int temp = Integer.parseInt(props.getProperty("message.age", "5"));
            if (temp >= 0)
                this.msgAge = temp;
        } catch (Exception e) {
            ;
        }
        this.verbose = Boolean.parseBoolean(props.getProperty("verbose", "false"));
        try {
            int temp = Integer.parseInt(props.getProperty("domains.number", "1"));
            if (temp >= 1)
                this.NrOfDomains = temp;
        } catch (Exception e) {
            ;
        }
        this.domainIDs = new String[NrOfDomains];
        this.domainPasswords = new String[NrOfDomains];
        for (int i = 1; i <= NrOfDomains; i++) {
            domainIDs[i - 1] = props.getProperty("domain." + i + ".ID", "default");
            domainPasswords[i - 1] = props.getProperty("domain." + i + ".password");
        }
        this.simulate = Boolean.parseBoolean(props.getProperty("simulate", "false"));
        
        if (verbose) {
            printProperties();
        }
    } // private void retrieveProperties()
    private void printProperties () {
        System.out.println("Properties used by: " + this.getClass().getName());
        System.out.println("##############################################");
        System.out.println("orabpel.platform: " + props.getProperty("orabpel.platform"));
        System.out.println("java.naming.provider.url: " + props.getProperty("java.naming.provider.url"));
        System.out.println("java.naming.factory.initial: " + props.getProperty("java.naming.factory.initial"));
        System.out.println("java.naming.security.principal: " + props.getProperty("java.naming.security.principal"));
        System.out.println("java.naming.security.credentials: " + props.getProperty("java.naming.security.credentials"));
        
        for (int i = 1; i <= NrOfDomains; i++) {
            System.out.print("domain." + i + ".ID: " + domainIDs[i - 1]);
            System.out.println(" \tdomain." + i + ".password: " + domainPasswords[i - 1]);
        }
        
        System.out.print("message.age.type: ");
        if (msgAgeType == MSG_AGE_TYPE_MINUTE) {
            System.out.println("minute");
        } else if (msgAgeType == MSG_AGE_TYPE_HOUR) {
            System.out.println("hour");
        } 
         else if (msgAgeType == MSG_AGE_TYPE_DAY) {
                System.out.println("day");
            }else {
            System.out.println("second");
        }
        System.out.println("message.age: " + msgAge);
        System.out.println("process ID" + recoveryProcessID);
        System.out.println("verbose: " + verbose);
        System.out.println("simulate: " + simulate);
        System.out.println();
    }
    
}
In my scenario this needs to run as a daemon, so the idea is to call this class from
a Servlet (init method)
    public void init(ServletConfig config) throws ServletException {
        super.init(config);
        
        boolean loop = true;
         try{
            Recovery rec = new Recovery();
            while (loop){
                rec.doRecover(); 
                // sleep for 5 seconds
                Thread.currentThread().sleep(5000);
            }
        }
        catch (Exception e){
            e.printStackTrace();
        }
        
    }
No comments:
Post a Comment