Currently, the notification mechanism supports email, http(s) and SQS. The SQS support is attractive because it means you can subscribe an existing SQS queue to a topic in SNS and every time information is published to that topic, a new message will be posted to SQS. That allows you to easily persist the notifications so that they could be logged or further processed at a later time.
Subscribing via the email protocol is very straightforward. You just provide an email address and SNS will send an email message to the address each time information is published to the topic (actually there is a confirmation step that happens first, also via email). Subscribing via HTTP(s) is also easy, you just provide the URL you want SNS to use and then each time information is published to the topic, SNS will POST a JSON payload containing the new information to your URL.
Subscribing an SQS queue, however, is a bit trickier. First, you have to be able to construct the ARN (Amazon Resource Name) of the SQS queue. Secondly, after subscribing the queue you have to set the ACL policy of the queue to allow SNS to send messages to the queue.
To make it easier, I added a new convenience method in the boto SNS module called subscribe_sqs_queue. You pass it the ARN of the SNS topic and the boto Queue object representing the queue and it does all of the hard work for you. You would call the method like this:
>>> import boto
>>> sns = boto.connect_sns()
>>> sqs = boto.connect_sqs()
>>> queue = sqs.lookup('TestSNSNotification')
>>> resp = sns.create_topic('TestSQSTopic')
>>> print resp
{u'CreateTopicResponse': {u'CreateTopicResult': {u'TopicArn': u'arn:aws:sns:us-east-1:963068290131:TestSQSTopic'},
u'ResponseMetadata': {u'RequestId': u'1b0462af-4c24-11df-85e6-1f98aa81cd11'}}}
>>> sns.subscribe_sqs_queue('arn:aws:sns:us-east-1:963068290131:TestSQSTopic', queue)
That should be all you have to do to subscribe your SQS queue to an SNS topic. The basic operations performed are:
- Construct the ARN for the SQS queue. In our example the URL for the queue is https://queue.amazonaws.com/963068290131/TestSNSNotification but the ARN would be "arn:aws:sqs:us-east-1:963068290131:TestSNSNotification"
- Subscribe the SQS queue to the SNS topic
- Construct a JSON policy that grants permission to SNS to perform a SendMessage operation on the queue. See below for an example of the JSON policy.
- Associate the new policy with the SQS queue by calling the set_attribute method of the Queue object with an attribute name of "Policy" and the attribute value being the JSON policy.
The actual policy looks like this:
{"Version": "2008-10-17", "Statement": [{"Resource": "arn:aws:sqs:us-east-1:963068290131:TestSNSNotification", "Effect": "Allow", "Sid": "ad279892-1597-46f8-922c-eb2b545a14a8", "Action": "SQS:SendMessage", "Condition": {"StringLike": {"aws:SourceArn": "arn:aws:sns:us-east-1:963068290131:TestSQSTopic"}}, "Principal": {"AWS": "*"}}]}
The new subscribe_sqs_queue method is available in the current SVN trunk. Check it out and let me know if you run into any problems or have any questions.
After the subscription of the queue is done, the message arrives to queue as a json string. The key "Message" from json / dict has the value that was published.
ReplyDeleteI've done this...
import json
m = queue.read()
print json.loads(m.get_body()).get("Message")
Mitch,
ReplyDeleteI tried to subscribe a queue today without sucess. Don't know if Amazon changed something. Are you expiencing the same problems?
I've not had any problems today. What error are you receiving?
ReplyDeleteTraceback (most recent call last):
ReplyDeleteFile "", line 1, in
File "/Users/director/dev/virtual/sns/src/svn/boto/sns/__init__.py", line 280, in subscribe_sqs_queue
resp = self.subscribe(topic, 'sqs', q_arn)
File "/Users/director/dev/virtual/sns/src/svn/boto/sns/__init__.py", line 253, in subscribe
raise self.ResponseError(response.status, response.reason, body)
boto.exception.BotoServerError: BotoServerError: 403 Forbidden
{"Error":{"Code":"AuthorizationError","Message":"Access denied by policy","Type":"Sender"},"RequestId":"f4243ba7-4cb9-11df-8254-35bf49fc28f1"}
Hey
ReplyDeleteI cant seem to subscribe a queue to a sns topic. Would appreciate some help. I have been trying for 2 days now.
here is the code im using:
q = conn.get_queue('Synctestq')
snsconn.subscribe_sqs_queue('arn:aws:sns:us-east-1:797755126197:Topictest1', q)
policy = {'Id':'SQSDefaultPolicy','Version': '2008-10-17','Statement':[{'Action':'SQS:SendMessage','Condition':{'StringLike':{'AWS:SourceArn':'arn:aws:sns:us-east-1:797755126197:SNStest'}},'Effect':'Allow','Principal':{'AWS':'*'},'Resource': 'arn:aws:sqs:us-east-1:797755126197:Synctestq','Sid': '1'}]}
q.set_attribute('Policy', policy)
One very important thing to note here is that the messages from SNS do NOT come through as base64 encoded, so you'll have to set the message class to "RawMessage" first:
ReplyDeletefrom boto.sqs.message import RawMessage
q.set_message_class(RawMessage)
msg = q.read()
Also, the message "payload" is actually stored under "Message" in the provided JSON:
msg_data = json.loads(msg.get_body())['Message']
and the policy should be more like:
ReplyDelete{"Version": "2008-10-17", "Statement": [{"Resource": "arn:aws:sqs:us-east-1:963068290131:TestSNSNotification", "Effect": "Allow", "Sid": "ad279892-1597-46f8-922c-eb2b545a14a8", "Action": "SQS:SendMessage", "Condition": {"SourceArn": {"aws:SourceArn": "arn:aws:sns:us-east-1:963068290131:TestSQSTopic"}}, "Principal": {"AWS": "*"}}]}
(notice the SourceArn instead of StringLike.
I may be late for the party, sorry Kurt...