Handling Amazon S3 – SQS notification with the Advantco AWS adapter
A typical use case is where an external application uploads a file to a S3 bucket. Once the file upload is complete, Amazon S3 will generate a event notification. This notification can be forward to an Amazon SQS queue, an Amazon SNS topic or something else. In this blog, we describe the steps required to handle the S3 – SQS event notification. Once the event is published to SQS queue, SAP PO will use the Advantco AWS adapter to pull the event message from SQS. The event message has all the information required for us to query the S3 file. We use Async – Sync bridge feature in this scenario to return the file for downstream processing.
Amazon S3 and SQS Configurations:
1. Create SQS queue and Configure SQS permissions
Create the SQS queue and note the queue URL.
Adjust the permissions to allow S3 bucket to publish an event.
2. Create S3 bucket and Configure S3 event notification with SQS
Create the bucket and go to the Advanced settings on the Properties tab.
Create an event and associate it to the SQS queue.
Whenever a new file is uploaded to the S3 bucket, a notification is send to the SQS queue. Below is an example of a notification message. The key field contains the file name that was uploaded.
{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "us-west-2",
"eventTime": "2020-06-02T20:16:46.533Z",
"eventName": "ObjectCreated:CompleteMultipartUpload",
"userIdentity": {
"principalId": "AWS:AIDAJCBSYIOFMZ3QMDZCI"
},
"requestParameters": {
"sourceIPAddress": "96.XX.XX.XX"
},
"responseElements": {
"x-amz-request-id": "263B0CA98470D5CF",
"x-amz-id-2": "zmjRZFRkMQ+op74Oi5U/WAgQf15JKGPx2WB06Mbf8wmy
XXXXXXXKYWDyHVX931Dnc3LNiNj1yMRxsXii6PPpVR"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "US_DEMO_S3_UPLOAD",
"bucket": {
"name": "buck-demo",
"ownerIdentity": {
"principalId": "A1GGXKXXXXXQ1O"
},
"arn": "arn:aws:s3:::buck-demo"
},
"object": {
"key": "FailedRecordsToBeFixed.csv",
"size": 2561,
"eTag": "071af75943e8d8f47cb3ee09136bafee-1",
"sequencer": "005ED6B3B0C2D4DF47"
}
}
}
]
}
SAP PO configurations:
On SAP PO, these are the steps we have to perform.
- Pull S3 event message from the SQS queue, convert the message from JSON to XML format.
- Map the S3 event message to the S3 Query structure that is excepted by the AWS adapter.
- Query and download the file from the S3 bucket and return the file. For this we use the Async- Sync bridge capability of SAP PO.
1. Define the SQS sender channel to pull S3 event message from SQS queue.
Specify the queue URL in the sender channel.
As the S3 event message is in JSON, we have to convert it to XML format.
2. Define message mapping to convert event payload to the S3 Query structure.
We define the mapping from the S3 event message to the S3 query structure. The XSD files can be find below in the Appendix section. We use an UDF to store the S3 filename in the ASMA, this allow us to access the original S3 filename if needed.
3. Define the S3 Query receiver channel to download the file from the S3 bucket.
In the receiver S3 channel, we specify the bucket by using variable substitution. The bucket name is in the S3 Query structure.
To complete the S3 receiver channel configurations, we add the modules for the following:
- Copy the original S3 filename to the return message ASMA
- Return the S3 file to the response iflow.
Conclusion.
It is a very common use case to react to a S3 event and process the uploaded file accordingly. Hope this blog help you understand the steps needed to implement this requirement.
https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
Appendix
S3 event xsd
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="Records">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:float" name="eventVersion"/>
<xs:element type="xs:string" name="eventSource"/>
<xs:element type="xs:string" name="awsRegion"/>
<xs:element type="xs:dateTime" name="eventTime"/>
<xs:element type="xs:string" name="eventName"/>
<xs:element name="userIdentity">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:string" name="principalId"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="requestParameters">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:string" name="sourceIPAddress"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="responseElements">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:string" name="x-amz-request-id"/>
<xs:element type="xs:string" name="x-amz-id-2"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="s3">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:float" name="s3SchemaVersion"/>
<xs:element type="xs:string" name="configurationId"/>
<xs:element name="bucket">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:string" name="name"/>
<xs:element name="ownerIdentity">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:string" name="principalId"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element type="xs:string" name="arn"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="object">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:string" name="key"/>
<xs:element type="xs:int" name="size"/>
<xs:element type="xs:string" name="eTag"/>
<xs:element type="xs:string" name="sequencer"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
S3 Query xsd
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="Query">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:string" name="Region" minOccurs="0" maxOccurs="1"/>
<xs:element type="xs:string" name="Bucket" minOccurs="0" maxOccurs="1"/>
<xs:element type="xs:string" name="Prefix" minOccurs="0" maxOccurs="1"/>
<xs:element type="xs:string" name="Delimiter" minOccurs="0" maxOccurs="1"/>
<xs:element type="xs:string" name="MaxKeys" minOccurs="0" maxOccurs="1"/>
<xs:element type="xs:string" name="EnableRequesterPays" minOccurs="0" maxOccurs="1"/>
<xs:element type="xs:string" name="Marker" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
Please reach out to our sales team at sales@advantco.com if you have any questions.