Tuesday, August 11, 2020

#790 OIC - OCI Streaming Adapter - August Release New feature


Introduction

As usual, text in italics from the ORCL docs.

So what is OCI Streaming Service?

The Oracle Cloud Infrastructure Streaming service provides a fully managed, scalable, and durable storage solution for ingesting continuous, high-volume streams of data that you can consume and process in real time. Streaming can be used for messaging, ingesting high-volume data such as application logs, operational telemetry, web click-stream data, or other use cases in which data is produced and processed continually and sequentially in a publish-subscribe messaging model.

More details here

OIC can now leverage the OCI Streaming service via the new adapter.
Here is a very simple example, based on a use case implemented by my colleague, Raghav.




Here I will have 2 integrations -

1. Publishes to the Streaming Service
2. Consumes messages

Publishing to the OCI Streaming Service

I begin by creating the OCI Streaming connection -

It's a good idea to be somewhat familiar with Kafka to appreciate what you need to specify, when creating the connection.

 e.g. bootstrap servers is a comma-separated list of host/ports - the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

































SASL = Simple Authorization Service Layer




















I create an App Driven Integration with a REST trigger.

Request Payload -














Response payload is just a string - rtc.

I drop the OCI Streaming connection and configure as follows -


















Naturally, Stream / Partition must exist beforehand.

I specify the message structure - essentially the same as my input -













































The Mapping is as follows -














I activate the integration and test -
















Consuming Stream

The second integration leverages the same OCI Streaming connection.
It is configured as follows -





































let's look at the 2 options for consuming messages -

1. Read Latest - to leverage this, your integration flow already needs to be running - e.g. scheduled job - polling. Latest will be looking for messages in a 60-second time window.

2. Read from Beginning - Here we consume all messages since the last invoke.

Note, one can also consume based on the partition.
Remember, if you produce message to partition "0", you cannot see this message in partition "1". Ergo - messages do not get replicated.

Consumer Group - A consumer group is a set of instances which coordinates messages from all of the partitions in a stream. Instances in a consumer group maintain group membership through interaction; lack of interaction for a period of time results in a timeout, removing the instance from the group.

So my settings are as follows -







































I use the same xsd -









Now for the integration response mapping -
















My integration is as follows -

























I activate and test -




















Now to the other Consume option -
Consume Messages from Stream by specifying offset -

Firstly - what is offset?
The location of a message within a partition. You can use the offset to restart reading from a stream.

I run the Produce integration twice.
I then run the Consume integration.
The Activity Stream shows me what has been returned from the Stream Invoke -


























Here you see the use of offset -

Now to make the integration a bit more useful -
I add a FOR EACH activity - the repeating element in Message -















The Log Activity -



















I activate and test -
Producing 2 new employees - dino and romy -


















































Check out the adapter docs here


No comments: