Another new feature from the OIC August 2020 release -
Let's try out the Re-sequencer - useful for my simple use case of
requencing orders by orderNr, before processing them.
The job of the re-sequencer is to process the input messages, in my case - orders, based on a sequence id - e.g. orderNr, as opposed to processing them based on time of arrival.
In this scenario, each message will be "parked" in an ATP DB for a certain time period.
This allows out of sequence orders to arrive and be processed in the correct order.
e.g.
orderNr 3 for Lucia Inc.
orderNr 2 for Phillip Inc.
orderNr 1 for Lucia Inc.
orderNr 1 for Phillip Inc.
orderNr 2 for Lucia Inc.
orderNr 3 for Phillip Inc.
need to be re-sequenced as -
orderNr 1 for Lucia Inc.
orderNr 2 for Lucia Inc.
orderNr 3 for Lucia Inc.
orderNr 2 for Phillip Inc.
orderNr 3 for Phillip Inc.
The Re-sequencer has the concept of groups - e.g. the Orders group, the HCM employee update group etc, i.e. the type of messages to be resequenced.
There are other key parameters, which are discussed in the documentation, a link to which is provided below. There are also discussed in this post.
Before I actually start using the Accelerator, let's take a detailed look at it in the OIC Home Page -
Clicking on the documentation link brings you here -
I strongly suggest for read this, before continuing.
One click installs the package, which contains 7 integrations.
As already mentioned, the Resequencer leverages a DB to store in-flight messages for resequencing.
The DDL SQL script is available here.
Executing this script will create 3 DB tables - RSQ_CONFIG, RSQ_GROUP and RSQ_MESSAGE
and 3 PL/SQL stored procedures -
The 7 integrations in the package are the following -
- Oracle REST REST Order Service Front
- Oracle REST REST Employee Service Front
- Oracle REST DB RSQProducer
- Oracle REST DB Resequencer Manager
- Oracle Scheduled RSQGroupConsumer
- Oracle REST DB RSQMessageConsumer
- Oracle REST REST RSQDispatcher
They can be mapped 1:1 to the architecture diagram from the docs -
The Business Front End receive the messages, in my case the orders with fields - orderNr, customer, product, price etc.
The 2 business frontends in the Accelerator package are simply examples. I will create my own, as you will see in this post.
The Resequencer box contains the core of this technical accelerator -
The front end passes this on to the Producer which essentially stores the order in the DB. Again, this is the "parking lot" pattern and the parking time is another key parameter you can tweak.
Let's go back to my use case of processing orders according to the orderNr - I may want to wait 5 minutes before beginning processing the batch of orders.
The GroupConsumer invokes the MessageConsumer for active groups - for example, my orders group.
The MessageConsumer then processes the messages that are older than the parking time limit.
It then invokes the Dispatcher.
The Dispatcher calls the backend system e.g. my orders will be processed in Netsuite.
The backend system(s) are called by OIC integrations - shown here in the Business Integrations column. Ergo, these are NOT part of the Technical Accelerator. I will create my own, as you will see further on in this post.
The Manager integration, as the name suggests, manages the re-sequencer.
Details on the operations supported here can be found in the doc.
Again, these 5 integrations are the core of the resequencer, the business front end and the business integration I will create myself; the package just contains examples of such.
The following connections are used by the package -
I edit the ATP connection to point to my ATP instance -
Now let's look at the first integration - Oracle REST REST Order Service Front
Again, this is the front end that processes the incoming orders - in this case, the processing just involves calling the Producer -
Let's look at the mapping here -
The payload is set to get-content-as-string( Order)
That's the order taken care of!
Now let's look at the other target fields -
gid - is the id within the orders group. The input customer field is used for this. I want to re-sequence the orders coming in for each customer.
e.g. Lucia Inc.
gtype - The type of stream. Different message types can be sequenced in parallel, for example, account updates and personnel updates are different group types. So in my case this is set to "orders" as my orders are processed as the orders group.
id - is the message unique identifier, in this case, the orderNr.
sequenceId - A field in the request or a timestamp that is used to determine how to sequence the messages in a stream.
Here I can use my orderNr value again.
So let's go through each of these, starting with the Manager.
Manager
The manager integration supports 3 operations -
UpdateConfig - updates the config table - RSQ_CONFIG
RecoverResumeGroup -
Deletes stuck messages - DELETE FROM RSQ_MESSAGE WHERE GRP_ID=#GID AND GRP_TYPE=#GTYPE AND STATUS='P'
ActivateGroup - UPDATE RSQ_GROUP SET STATUS='N', STATUSTIME=SYSDATE WHERE ID=#GID AND TYPE=#GTYPE
GetConfigs - SELECT * FROM RSQ_CONFIG
Producer
addMessage invokes the ATP adapter -
Ok, so that's the message "parked".
Next invoke is - addGroup
This invokes the stored procedure -
Let's look at that stored procedure -
In my case, it checks whether there is already an "orders" group for my customer - "Lucia Inc".
If not, then a row is created for such in the RSQ_GROUP table.
If yes, the status for that group is set to "N" - signalling a NEW active message for an existing group.
addGroup invokes the ATP adapter to call the stored procedure PROC_RSQ_CONFIG.
The mapping here is as follows -
GTYPE will be set to the input gtype.
MAXCONCUR and TIMEWINDOW are both set to 10.
These 2 values could be externalised as integration properties.
Group Consumer
As already stated, the GroupConsumer invokes the MessageConsumer for active groups.
This is a scheduled job that can run every n minutes.
the first invoke is of ATP to run the following query -
It then loops (ProcessTypes Action) over all types found and invokes the ATP adapter (ActiveGroups Action)-
SELECT * FROM RSQ_GROUP WHERE TYPE=#TYPE AND STATUS='N' ORDER BY STATUSTIME ASC FETCH FIRST ((SELECT MAXCONCUR FROM RSQ_CONFIG WHERE TYPE=#TYPE)- (SELECT COUNT(*) FROM RSQ_GROUP WHERE TYPE=#TYPE AND STATUS='P')) ROWS ONLY
Now loop over the ActiveGroups (ProcessActiveGroups Action)
First step here is to lock the group (lockGroup Action) -
UPDATE RSQ_GROUP SET STATUS='P', STATUSTIME=SYSDATE WHERE ID=#GID AND TYPE=#GTYPE
and then invoke the Message Consumer for each active group.
Message Consumer
readMessages Action -
Invokes ATP - SELECT * FROM RSQ_MESSAGE WHERE GRP_ID=#GID AND GRP_TYPE=#GTYPE AND CREATIONTIME < SYSDATE - INTERVAL '30' SECOND ORDER BY SEQ_ID
So here we are retrieving the messages in the correct order.
Each message is then forwarded to the Dispatcher and then deleted from the DB.
Dispatcher
The dispatcher calls the relevant business service(s).
The example integration in the accelerator is as follows -
Naturally the Dispatcher will have to be cloned/amended to suit your particular use case(s).
So now to my use case -
Requencing my Orders
Here is the order json payload -
{"orderNr":
"1",
"customer":"Lucia
Inc.",
"product":
"iBike",
"quantity":
2
"orderValue":
1000}
gid will be set to the customer value e.g. Lucia Inc.
id will be set to orderNr
sequenceId will be set to orderNr
Step 1 - Activate the Producer integration.
I need to configure it's REST connection in order to do this.
Just set the REST connection URL to your OIC instance and Test/Save.
Step 2 - is to configure the imported ATP Connection to point to your ATP instance
unless you have already done this...
Step 3 - Create the Business FrontEnd which will call the Producer
I can now test this -
I check out the DB tables -
Looks good.
Step 4 - Create the Business Integration
This is my order processing integration - all mine does is log the order.
This will be called by the Dispatcher.
Now let's look at our Dispatcher -
Step 5 - Clone the Dispatcher and amend to call the Business Integration
Step 6 - Group and Message Consumer Integrations
Begin by configuring the Message consumer connection -
This integration calls the dispatcher, so as I've cloned the dispatcher, I also need to clone the Message Consumer, so that I can amend it.
Activate the Group Consumer Integration -
Step 7 - Test
I enter a couple of orders in the wrong sequence and validate they're in the DB
I check the DB -
I run the scheduled job -
I check the 4 instances of my business integration -
Footnote
I had a challenge while creating this example when it came to calling the business service from the dispatcher.Remember, the payload is stored in the DB as a CLOB -
The Dispatcher calls the business service -
So I have to convert the payload as follows -
oraext:parseEscapedXML (/nssrcmpr:execute/ns22:request-wrapper/ns22:payload )
note the value-of
I need to change this to copy-of
The easiest way to do this is copy the xsl from the Code page.
Edit this in your editor of choice -
and then import into your OIC integration -
thanks to my esteemed colleague Viswanatha B. for this!
I should also be able to do the above with the Data Stitch feature of OIC - I will try this out and update this post accordingly.
No comments:
Post a Comment