AWS Level 4 Integrating AWS SQS and SNS for Reliable Messaging

Hi Support Team,

I’ve been trying for months to get this lab to work and keep getting the same error message.
Low-priority message was processed too soon.
See the screenshots and yaml file below:

topicarn=$(aws sns list-topics --query “Topics[?contains(TopicArn, ‘xfusion-Priority-Queues-Topic’)].TopicArn” --output text)
aws sns publish --topic-arn $topicarn --message ‘High Priority message 1’ --message-attributes ‘{“priority” : { “DataType”:“String”, “StringValue”:“high”}}’
aws sns publish --topic-arn $topicarn --message ‘High Priority message 2’ --message-attributes ‘{“priority” : { “DataType”:“String”, “StringValue”:“high”}}’
aws sns publish --topic-arn $topicarn --message ‘Low Priority message 1’ --message-attributes ‘{“priority” : { “DataType”:“String”, “StringValue”:“low”}}’
aws sns publish --topic-arn $topicarn --message ‘Low Priority message 2’ --message-attributes ‘{“priority” : { “DataType”:“String”, “StringValue”:“low”}}’

AWSTemplateFormatVersion: ‘2010-09-09’
Description: SQS priority queues template

Resources:
SQSHighPriorityQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 180
QueueName: xfusion-High-Priority-Queue

SQSLowPriorityQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 180
DelaySeconds: 20
QueueName: xfusion-Low-Priority-Queue

PriorityQueuesTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: xfusion-Priority-Queues-Topic

SQSHighQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues:
- !Ref SQSHighPriorityQueue
PolicyDocument:
Id: AllowIncomingMessageFromSNS
Statement:
-
Effect: Allow
Principal: ‘*’
Action:
- sqs:SendMessage
Resource:
- !GetAtt SQSHighPriorityQueue.Arn
Condition:
ArnEquals:
aws:SourceArn: !Ref PriorityQueuesTopic

SQSLowQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues:
- !Ref SQSLowPriorityQueue
PolicyDocument:
Id: AllowIncomingMessageFromSNS
Statement:
-
Effect: Allow
Principal: ‘*’
Action:
- sqs:SendMessage
Resource:
- !GetAtt SQSLowPriorityQueue.Arn
Condition:
ArnEquals:
aws:SourceArn: !Ref PriorityQueuesTopic

SNSHighSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref PriorityQueuesTopic
Endpoint: !GetAtt SQSHighPriorityQueue.Arn
Protocol: sqs
RawMessageDelivery: true
FilterPolicy: {“priority”: [“high”]}

SNSLowSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref PriorityQueuesTopic
Endpoint: !GetAtt SQSLowPriorityQueue.Arn
Protocol: sqs
RawMessageDelivery: true
FilterPolicy: {“priority”: [“low”]}

LambdaRole:
Type: AWS::IAM::Role
Properties:
RoleName: lambda_execution_role
AssumeRolePolicyDocument:
Version: ‘2012-10-17’
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonSQSFullAccess
- arn:aws:iam::aws:policy/AmazonSNSFullAccess
- arn:aws:iam::aws:policy/AWSLambda_FullAccess

LambdaFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName:
Fn::Sub: xfusion-priorities-queue-function
Description: Priority queue function
Runtime: python3.9
Code:
ZipFile: >
import boto3
import os
sqs = boto3.client(‘sqs’)
def delete_message(queue_url, receipt_handle, message):
response = sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
return “Message " + “'” + message + “'” + " deleted”

      def poll_messages(queue_url):
          QueueUrl=queue_url
          response = sqs.receive_message(
              QueueUrl=QueueUrl,
              AttributeNames=[],
              MaxNumberOfMessages=1,
              MessageAttributeNames=['All'],
              WaitTimeSeconds=3
          )
          if "Messages" in response:
              receipt_handle=response['Messages'][0]['ReceiptHandle']
              message = response['Messages'][0]['Body']
              delete_response = delete_message(QueueUrl,receipt_handle,message)
              return delete_response
          else:
              return "No more messages to poll"

      def lambda_handler(event, context):
          response = poll_messages(os.environ['high_priority_queue'])
          if response == "No more messages to poll":
              response = poll_messages(os.environ['low_priority_queue'])
          return response

  Handler: index.lambda_handler
  MemorySize: 128
  Timeout: 10
  Role:
    Fn::GetAtt:
      - LambdaRole
      - Arn
  Environment:
    Variables:
      high_priority_queue: !Ref SQSHighPriorityQueue
      low_priority_queue: !Ref SQSLowPriorityQueue

Outputs:
SNSTopicARN:
Value: !Ref PriorityQueuesTopic

I then invoke it from the cli

aws lambda invoke
–function-name xfusion-priorities-queue-function
–payload ‘{}’
response1.json && cat response1.json

aws lambda invoke
–function-name xfusion-priorities-queue-function
–payload ‘{}’
response1.json && cat response2.json

aws lambda invoke
–function-name xfusion-priorities-queue-function
–payload ‘{}’
response1.json && cat response3.json

aws lambda invoke
–function-name xfusion-priorities-queue-function
–payload ‘{}’
response1.json && cat response4.json