Hi Support Team,
I’ve been trying for months to get this lab to work and keep getting the same error message.
Low-priority message was processed too soon.
See the screenshots and yaml file below:
topicarn=$(aws sns list-topics --query “Topics[?contains(TopicArn, ‘xfusion-Priority-Queues-Topic’)].TopicArn” --output text)
aws sns publish --topic-arn $topicarn --message ‘High Priority message 1’ --message-attributes ‘{“priority” : { “DataType”:“String”, “StringValue”:“high”}}’
aws sns publish --topic-arn $topicarn --message ‘High Priority message 2’ --message-attributes ‘{“priority” : { “DataType”:“String”, “StringValue”:“high”}}’
aws sns publish --topic-arn $topicarn --message ‘Low Priority message 1’ --message-attributes ‘{“priority” : { “DataType”:“String”, “StringValue”:“low”}}’
aws sns publish --topic-arn $topicarn --message ‘Low Priority message 2’ --message-attributes ‘{“priority” : { “DataType”:“String”, “StringValue”:“low”}}’
AWSTemplateFormatVersion: ‘2010-09-09’
Description: SQS priority queues template
Resources:
SQSHighPriorityQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 180
QueueName: xfusion-High-Priority-Queue
SQSLowPriorityQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 180
DelaySeconds: 20
QueueName: xfusion-Low-Priority-Queue
PriorityQueuesTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: xfusion-Priority-Queues-Topic
SQSHighQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues:
- !Ref SQSHighPriorityQueue
PolicyDocument:
Id: AllowIncomingMessageFromSNS
Statement:
-
Effect: Allow
Principal: ‘*’
Action:
- sqs:SendMessage
Resource:
- !GetAtt SQSHighPriorityQueue.Arn
Condition:
ArnEquals:
aws:SourceArn: !Ref PriorityQueuesTopic
SQSLowQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues:
- !Ref SQSLowPriorityQueue
PolicyDocument:
Id: AllowIncomingMessageFromSNS
Statement:
-
Effect: Allow
Principal: ‘*’
Action:
- sqs:SendMessage
Resource:
- !GetAtt SQSLowPriorityQueue.Arn
Condition:
ArnEquals:
aws:SourceArn: !Ref PriorityQueuesTopic
SNSHighSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref PriorityQueuesTopic
Endpoint: !GetAtt SQSHighPriorityQueue.Arn
Protocol: sqs
RawMessageDelivery: true
FilterPolicy: {“priority”: [“high”]}
SNSLowSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref PriorityQueuesTopic
Endpoint: !GetAtt SQSLowPriorityQueue.Arn
Protocol: sqs
RawMessageDelivery: true
FilterPolicy: {“priority”: [“low”]}
LambdaRole:
Type: AWS::IAM::Role
Properties:
RoleName: lambda_execution_role
AssumeRolePolicyDocument:
Version: ‘2012-10-17’
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonSQSFullAccess
- arn:aws:iam::aws:policy/AmazonSNSFullAccess
- arn:aws:iam::aws:policy/AWSLambda_FullAccess
LambdaFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName:
Fn::Sub: xfusion-priorities-queue-function
Description: Priority queue function
Runtime: python3.9
Code:
ZipFile: >
import boto3
import os
sqs = boto3.client(‘sqs’)
def delete_message(queue_url, receipt_handle, message):
response = sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
return “Message " + “'” + message + “'” + " deleted”
def poll_messages(queue_url):
QueueUrl=queue_url
response = sqs.receive_message(
QueueUrl=QueueUrl,
AttributeNames=[],
MaxNumberOfMessages=1,
MessageAttributeNames=['All'],
WaitTimeSeconds=3
)
if "Messages" in response:
receipt_handle=response['Messages'][0]['ReceiptHandle']
message = response['Messages'][0]['Body']
delete_response = delete_message(QueueUrl,receipt_handle,message)
return delete_response
else:
return "No more messages to poll"
def lambda_handler(event, context):
response = poll_messages(os.environ['high_priority_queue'])
if response == "No more messages to poll":
response = poll_messages(os.environ['low_priority_queue'])
return response
Handler: index.lambda_handler
MemorySize: 128
Timeout: 10
Role:
Fn::GetAtt:
- LambdaRole
- Arn
Environment:
Variables:
high_priority_queue: !Ref SQSHighPriorityQueue
low_priority_queue: !Ref SQSLowPriorityQueue
Outputs:
SNSTopicARN:
Value: !Ref PriorityQueuesTopic
I then invoke it from the cli
aws lambda invoke
–function-name xfusion-priorities-queue-function
–payload ‘{}’
response1.json && cat response1.json
aws lambda invoke
–function-name xfusion-priorities-queue-function
–payload ‘{}’
response1.json && cat response2.json
aws lambda invoke
–function-name xfusion-priorities-queue-function
–payload ‘{}’
response1.json && cat response3.json
aws lambda invoke
–function-name xfusion-priorities-queue-function
–payload ‘{}’
response1.json && cat response4.json