The previous post was rather basic, but gives the general idea of what is to be done.
I've augmented the code, using an example from my colleagues (Thanks Silviu!) , as follows -
externalised connect info etc. in a properties file
specified the process ID and time constraints in the WHERE clause
The properties file is as follows -
#orabpel.platform=ias_10g
#java.naming.provider.url=opmn:ormi://localhost:6003:home/orabpel
# JNDI Properties required for Web Client
orabpel.platform=oc4j_10g
java.naming.provider.url=ormi://localhost:6010/orabpel
java.naming.factory.initial=oracle.j2ee.rmi.RMIInitialContextFactory
java.naming.security.principal=oc4jadmin
java.naming.security.credentials=welcome1
# JNDI Properties required for Remote Client
#orabpel.platform=oc4j_10g
#java.naming.provider.url=opmn:ormi://ncommisk-de:6010:oc4j_soa/orabpel
#java.naming.factory.initial=oracle.j2ee.rmi.RMIInitialContextFactory
#java.naming.security.principal=oc4jadmin
#java.naming.security.credentials=welcome1
# how many bpel domains must be recovered (usually just one)
domains.number=1
# ID of the bpel domain to be recovered
domain.1.ID=default
#domain.2.ID=oc4j_soa
#domain.3.ID=mydomain
# password of the bpel domain to be recovered
domain.1.password=welcome1
#domain.2.password=oc4j_soa
#domain.3.password=mydomain
# ID of the Process to be recovered
processID=AIADemoOrderEBF
#
# throttling - number of messages to be recovered in batch mode
# if -1, then all the recoverable messages will be scheduled in a single batch
#batch.size = 10
# time between 2 consecutive batch recovering, in milli seconds
#batch.frequency = 5000
#
# can be one of the 4: second, minute, hour or day (case insensitive)
# default is second
message.age.type=second
# message age in order to be candidate for manual recovering.
# the significance of the number is given by message.age.type property
message.age = 5
# if true, it will display the summary about all the messages (callback, invoke, activity) in bpel engine.
# for debugging purpose only, not for production mode
verbose = true
# if true, does not actually recover any message. Default is false
simulate = true
The Recovery class is as follows -
package com.oracle;
import com.oracle.bpel.client.*;
import com.oracle.bpel.client.util.SQLDefs;
import com.oracle.bpel.client.util.WhereCondition;
import com.oracle.bpel.client.util.WhereConditionHelper;
import java.io.File;
import java.net.URL;
import java.sql.Date;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class Recovery{
private static final int MSG_AGE_TYPE_SECOND = 0;
private static final int MSG_AGE_TYPE_MINUTE = 1;
private static final int MSG_AGE_TYPE_HOUR = 2;
private static final int MSG_AGE_TYPE_DAY = 3;
/* Default: Recover instances older than n seconds
can be overwritten in properties file (message.age.type) */
private int msgAgeType = MSG_AGE_TYPE_SECOND;
private Date receiveDate;
private int msgAge = 5;
/* By default we will recover for all processIDs
can be overwritten in properties file (processID)*/
private String recoveryProcessID = "ALL";
private Properties props = new Properties();
/* Nr of Domains to be checked. Default is 1 (default domain)*/
private int NrOfDomains = 1;
/*configurable via the following in the properties file
domains.number=2, domain.1.ID=default, domain.2.ID=oc4j_soa domain.1.password=welcome1
domain.2.password=willkommen. Recover for domains default and oc4j_soa */
/* if true,will output contents of the properties file*/
private boolean verbose = true;
/* if true, does not actually recover any message*/
private boolean simulate = false;
private String[] domainIDs;
private String[] domainPasswords;
private Locator locator;
public Recovery() {
this(new Properties());
}
public Recovery(Properties props) {
if (props != null) {
this.props = props;
} else {
this.props = new Properties();
}
retrieveProperties();
}
public String doRecover() throws Exception{
// Read in the properties file
Properties props = new Properties();
URL url = new File("context.properties").toURL();
props.load(url.openStream());
System.out.println("props " + props.toString());
Recovery recover = new Recovery(props);
//
// Do the recovery for all of the specified domains
//
for (int i = 0; i < NrOfDomains; i++) {
try {
locator = new Locator(domainIDs[i], domainPasswords[i], props);
recoverInvoke();
recoverCallback();
} catch (ServerException e) {
System.out.println("AutomaticMsgRecovery: An error occured while recovering messages for domain: " +
domainIDs[i]);
e.printStackTrace();
}
}
return "*** Recovery Completed ***";
}
private void recoverCallbackMessages(List messages)
throws Exception
/* recoverCallbackMessages(): recover all Callback Messages*/
{
String messageGuids[] = new String[messages.size()];
for(int i = 0; i < messages.size(); i++)
{
ICallbackMetaData callbackMetadata = (ICallbackMetaData)messages.get(i);
String messageGuid = callbackMetadata.getMessageGUID();
messageGuids[i] = messageGuid;
System.out.println((new StringBuilder()).append("recovering callback message = ").append(messageGuids[i]).append(" process [").append(callbackMetadata.getProcessId()).append("(").append(callbackMetadata.getRevisionTag()).append(")] domain [").append(callbackMetadata.getDomainId()).append("]").toString());
}
if (!simulate){
IBPELDomainHandle domainHandle = locator.lookupDomain();
domainHandle.recoverCallbackMessages(messageGuids);
}
}
private void recoverInvokeMessages(List messages)
throws Exception
{
String messageGuids[] = new String[messages.size()];
for(int i = 0; i < messages.size(); i++)
{
IInvokeMetaData invokeMetadata = (IInvokeMetaData)messages.get(i);
String messageGuid = invokeMetadata.getMessageGUID();
messageGuids[i] = messageGuid;
System.out.println((new StringBuilder()).append("recovering invoke message = ").append(messageGuids[i]).append(" process [").append(invokeMetadata.getProcessId()).append("(").append(invokeMetadata.getRevisionTag()).append(")] domain [").append(invokeMetadata.getDomainId()).append("]").toString());
}
if (!simulate){
IBPELDomainHandle domainHandle = locator.lookupDomain();
domainHandle.recoverInvokeMessages(messageGuids);
}
}
private void recoverCallback() throws Exception{
//
// look for Callback messages in need of recovery
//
StringBuffer buf = new StringBuffer();
WhereCondition wc = WhereConditionHelper.whereCallbackMessagesRecoverable(); //
wc.append("AND");
// add older than date/time condition
wc.append(getWhereConditionWithTimestamp(SQLDefs.IM_receive_date));
wc.appendOrderBy(SQLDefs.IM_receive_date);
// Add processID to the where clause, if specified
if (!recoveryProcessID.equalsIgnoreCase("ALL")){
System.out.println("recoverCallback() only include instances for Process ID: " + recoveryProcessID );
buf.setLength( 0 );
WhereCondition tmpWhere = new WhereCondition( buf.append( " AND " )
.append( SQLDefs.AL_ci_process_id )
.append( " = ? " )
.toString() );
tmpWhere.setString( 1, recoveryProcessID );
wc.append( tmpWhere );
System.out.println("recoverCallback() looking for Callback instances where: "+ wc.getClause() );
System.out.println("process_id = "+ recoveryProcessID);
System.out.println("receive_date = "+ receiveDate.toString() + " " + receiveDate.getTime());
}
try{
ICallbackMetaData imd[] = locator.listCallbackMessages(wc);
List l = new ArrayList();
for (Object o:imd){
l.add(o);
}
// See how many Callbacks are in the recovery zone
System.out.println("recoverCallback() number of Callback instances to be recovered: " +l.size());
if (l.size()>0){
recoverCallbackMessages(l);
}
}
catch (Exception e){
e.printStackTrace();
}
}
public String recoverInvoke() throws Exception{
try{
// look for Invoke messages in need of recovery
StringBuffer buf = new StringBuffer();
//
WhereCondition wc = WhereConditionHelper.whereInvokeMessagesRecoverable();
wc.append("AND");
// add older than date/time condition
wc.append(getWhereConditionWithTimestamp(SQLDefs.IM_receive_date));
wc.appendOrderBy(SQLDefs.IM_receive_date);
// Add processID to the where clause, if specified
if (!recoveryProcessID.equalsIgnoreCase("ALL")){
System.out.println("recoverInvoke() only include instances for Process ID: " + recoveryProcessID );
buf.setLength( 0 );
WhereCondition tmpWhere = new WhereCondition( buf.append( "AND " )
.append( SQLDefs.AL_ci_process_id )
.append( " = ? " )
.toString() );
tmpWhere.setString( 1, recoveryProcessID );
wc.append( tmpWhere );
}
System.out.println("recoverInvoke() looking for Invoke instances where: "+ wc.getClause() );
System.out.println("process_id = "+ recoveryProcessID);
System.out.println("receive_date = "+ receiveDate.toString() + " " + receiveDate.getTime());
IInvokeMetaData imd1[] = locator.listInvokeMessages(wc);
// iterate thru the list
List l = new ArrayList();
for (Object o:imd1){
l.add(o);
}
// See how many INVOKES are in the recovery zone
System.out.println("recoverInvoke() number of Invoke instances to be recovered: " +l.size());
if (l.size()>0){
recoverInvokeMessages(l);
}
}
catch (Exception e){
e.printStackTrace();
}
return "done";
}
/*public Locator getLocator(){
System.out.println("getLocator() start");
Locator locator = null;
// set JNDI properties for BPEL lookup
String jndiProviderUrl = "opmn:ormi://ncommisk-de:6010:oc4j_soa/orabpel";
String jndiFactory = "com.evermind.server.rmi.RMIInitialContextFactory";
String jndiUsername = "oc4jadmin";
String jndiPassword = "welcome1";
Hashtable jndi = new Hashtable();
jndi.put(Context.PROVIDER_URL, jndiProviderUrl);
jndi.put(Context.INITIAL_CONTEXT_FACTORY, jndiFactory);
jndi.put(Context.SECURITY_PRINCIPAL, jndiUsername);
jndi.put(Context.SECURITY_CREDENTIALS, jndiPassword);
jndi.put("dedicated.connection", "true");
try{
System.out.println("getLocator() instantiating locator...");
locator = new Locator("default", "bpel", jndi);
System.out.println("getLocator() instantiated locator");
}
catch (Exception e){
System.out.println("getLocator() error");
e.printStackTrace();
}
return locator;
}
*/
private WhereCondition getWhereConditionWithTimestamp(String dateColumn) {
/* Only pick up invoke messages that are older than x ...
to avoid picking up the messages
with state = 0, which are in process, but their state was not yet saved in DB
Default is x, depending on following values specified in properties file
e.g. message.age = 5 and message.age.type=second = 5 seconds
e.g. message.age = 3 and message.age.type=minute = 3 minutes
*/
WhereCondition wc = new WhereCondition();
wc.append(dateColumn + " < ? ");
// Assuming seconds
long millis = msgAge * 1000;
if (msgAgeType == MSG_AGE_TYPE_MINUTE) {
// minute type
millis = millis * 60;
} else if (msgAgeType == MSG_AGE_TYPE_HOUR) {
// hour type
millis = millis * 60 * 60;
}
else if (msgAgeType == MSG_AGE_TYPE_DAY) {
// day type
millis = millis * 24 * 60 * 60;
}
receiveDate = new Date(System.currentTimeMillis() - millis);
wc.setTimestamp(1, receiveDate);
System.out.println(" *** getWhereConditionWithTimestamp(): looking for process instances older than " + millis/1000 + " seconds ");
return wc;
}
private void retrieveProperties() {
// Recover only for a particular process ID?
try {
recoveryProcessID = props.getProperty("processID", "ALL");
}
catch (Exception e) {
;
}
try {
String temp = props.getProperty("message.age.type", "second");
if ("day".equalsIgnoreCase(temp))
this.msgAgeType = MSG_AGE_TYPE_DAY;
else if ("minute".equalsIgnoreCase(temp))
this.msgAgeType = MSG_AGE_TYPE_MINUTE;
else if ("hour".equalsIgnoreCase(temp))
this.msgAgeType = MSG_AGE_TYPE_HOUR;
else // default
this.msgAgeType = MSG_AGE_TYPE_SECOND;
} catch (Exception e) {
;
}
try {
int temp = Integer.parseInt(props.getProperty("message.age", "5"));
if (temp >= 0)
this.msgAge = temp;
} catch (Exception e) {
;
}
this.verbose = Boolean.parseBoolean(props.getProperty("verbose", "false"));
try {
int temp = Integer.parseInt(props.getProperty("domains.number", "1"));
if (temp >= 1)
this.NrOfDomains = temp;
} catch (Exception e) {
;
}
this.domainIDs = new String[NrOfDomains];
this.domainPasswords = new String[NrOfDomains];
for (int i = 1; i <= NrOfDomains; i++) {
domainIDs[i - 1] = props.getProperty("domain." + i + ".ID", "default");
domainPasswords[i - 1] = props.getProperty("domain." + i + ".password");
}
this.simulate = Boolean.parseBoolean(props.getProperty("simulate", "false"));
if (verbose) {
printProperties();
}
} // private void retrieveProperties()
private void printProperties () {
System.out.println("Properties used by: " + this.getClass().getName());
System.out.println("##############################################");
System.out.println("orabpel.platform: " + props.getProperty("orabpel.platform"));
System.out.println("java.naming.provider.url: " + props.getProperty("java.naming.provider.url"));
System.out.println("java.naming.factory.initial: " + props.getProperty("java.naming.factory.initial"));
System.out.println("java.naming.security.principal: " + props.getProperty("java.naming.security.principal"));
System.out.println("java.naming.security.credentials: " + props.getProperty("java.naming.security.credentials"));
for (int i = 1; i <= NrOfDomains; i++) {
System.out.print("domain." + i + ".ID: " + domainIDs[i - 1]);
System.out.println(" \tdomain." + i + ".password: " + domainPasswords[i - 1]);
}
System.out.print("message.age.type: ");
if (msgAgeType == MSG_AGE_TYPE_MINUTE) {
System.out.println("minute");
} else if (msgAgeType == MSG_AGE_TYPE_HOUR) {
System.out.println("hour");
}
else if (msgAgeType == MSG_AGE_TYPE_DAY) {
System.out.println("day");
}else {
System.out.println("second");
}
System.out.println("message.age: " + msgAge);
System.out.println("process ID" + recoveryProcessID);
System.out.println("verbose: " + verbose);
System.out.println("simulate: " + simulate);
System.out.println();
}
}
In my scenario this needs to run as a daemon, so the idea is to call this class from
a Servlet (init method)
public void init(ServletConfig config) throws ServletException {
super.init(config);
boolean loop = true;
try{
Recovery rec = new Recovery();
while (loop){
rec.doRecover();
// sleep for 5 seconds
Thread.currentThread().sleep(5000);
}
}
catch (Exception e){
e.printStackTrace();
}
}
No comments:
Post a Comment