Class SQS::QueueService
In: lib/openwfe/extras/util/sqs.rb
Parent: Object

Methods

Constants

AWS_VERSION = "2006-04-01"
DEFAULT_QUEUE_HOST = "queue.amazonaws.com"

Public Class methods

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 99
 99:         def initialize (queue_host=nil)
100: 
101:             @queue_host = queue_host
102:             @queue_host = DEFAULT_QUEUE_HOST unless @queue_host
103:         end

Public Instance methods

Creates a queue.

If the queue name doesn‘t comply with SQS requirements for it, an error will be raised.

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 132
132:         def create_queue (queue_name)
133: 
134:             doc = do_action :post, @queue_host, "/?QueueName=#{queue_name}"
135: 
136:             doc.elements.each("//QueueUrl") do |e|
137:                 return e.text.to_s
138:             end
139:         end

Deletes a given message.

The queue might be a queue name (String) or a Queue instance.

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 233
233:         def delete_message (queue, message_id)
234: 
235:             queue = resolve_queue(queue)
236: 
237:             path = "#{queue.path}/#{message_id}"
238:             #path = "#{queue.path}/#{CGI::escape(message_id)}"
239: 
240:             doc = do_action :delete, queue.host, path
241: 
242:             SQS::get_element_text(doc, "//StatusCode") == "Success"
243:         end

Deletes the queue. Returns true if the delete was successful. You can empty a queue by called the method flush_queue

If ‘force’ is set to true, a flush will be performed on the queue before the actual delete operation. It should ensure a successful removal of the queue.

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 283
283:         def delete_queue (queue, force=false)
284: 
285:             queue = resolve_queue(queue)
286: 
287:             flush_queue(queue) if force
288:             
289:             begin
290: 
291:                 doc = do_action :delete, @queue_host, queue.path
292: 
293:             rescue Exception => e
294: 
295:                 return false if e.message.match "^400 .*$"
296:             end
297: 
298:             SQS::get_element_text(doc, "//StatusCode") == "Success"
299:         end

Use with care !

Attempts at deleting all the messages in a queue. Returns the total count of messages deleted.

A call on this method might take a certain time, as it has to delete each message individually. AWS will perhaps add a proper ‘flush_queue’ method later.

The queue might be a queue name (String) or a Queue instance.

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 257
257:         def flush_queue (queue)
258: 
259:             count = 0
260: 
261:             loop do
262: 
263:                 l = get_messages(queue, :timeout => 0, :count => 255)
264:                 break if l.length < 1
265: 
266:                 l.each do |m|
267:                     m.delete
268:                     count += 1
269:                 end
270:             end
271: 
272:             return count
273:         end

Retrieves a single message from a queue. Returns an instance of Message.

The queue might be a queue name (String) or a Queue instance.

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 212
212:         def get_message (queue, message_id)
213: 
214:             queue = resolve_queue(queue)
215: 
216:             path = "#{queue.path}/#{message_id}"
217: 
218:             begin
219:                 doc = do_action :get, queue.host, path
220:                 Message.new(queue, doc.root.elements[1])
221:             rescue Exception => e
222:                 #puts e.message
223:                 return nil if e.message.match "^404 .*$"
224:                 raise e
225:             end
226:         end

Retrieves a bunch of messages from a queue. Returns a list of Message instances.

There are actually two optional params that this method understands :

  • :timeout the duration in seconds of the message visibility in the
              queue
    
  • :count the max number of message to be returned by this call

The queue might be a queue name (String) or a Queue instance.

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 180
180:         def get_messages (queue, params={})
181: 
182:             queue = resolve_queue(queue)
183: 
184:             path = "#{queue.path}/front"
185: 
186:             path += "?" if params.size > 0
187: 
188:             timeout = params[:timeout]
189:             count = params[:count]
190: 
191:             path += "VisibilityTimeout=#{timeout}" if timeout
192:             path += "&" if timeout and count
193:             path += "NumberOfMessages=#{count}" if count
194: 
195:             doc = do_action :get, queue.host, path
196: 
197:             messages = []
198: 
199:             doc.elements.each("//Message") do |me|
200:                 messages << Message.new(queue, me)
201:             end
202: 
203:             messages
204:         end

Given a queue name, a Queue instance is returned.

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 304
304:         def get_queue (queue_name)
305: 
306:             l = list_queues(queue_name)
307: 
308:             l.each do |q|
309:                 return q if q.name == queue_name
310:             end
311: 
312:             #return nil
313:             raise "found no queue named '#{queue_name}'"
314:         end

Lists the queues for the active AWS account. If ‘prefix’ is given, only queues whose name begin with that prefix will be returned.

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 110
110:         def list_queues (prefix=nil)
111: 
112:             queues = []
113: 
114:             path = "/"
115:             path = "#{path}?QueueNamePrefix=#{prefix}" if prefix
116: 
117:             doc = do_action :get, @queue_host, path
118: 
119:             doc.elements.each("//QueueUrl") do |e|
120:                 queues << Queue.new(self, e)
121:             end
122: 
123:             return queues
124:         end

Given some content (‘text/plain’ content), send it as a message to a queue. Returns the SQS message id (a String).

The queue might be a queue name (String) or a Queue instance.

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 148
148:         def put_message (queue, content)
149: 
150:             queue = resolve_queue(queue)
151: 
152:             doc = do_action :put, queue.host, "#{queue.path}/back", content
153: 
154:             #puts doc.to_s
155: 
156:             #status_code = SQS::get_element_text(doc, '//StatusCode')
157:             #message_id = SQS::get_element_text(doc, '//MessageId')
158:             #request_id = SQS::get_element_text(doc, '//RequestId')
159:             #{ :status_code => status_code, 
160:             #  :message_id => message_id, 
161:             #  :request_id => request_id }
162: 
163:             SQS::get_element_text(doc, '//MessageId')
164:         end
send_message(queue, content)

Alias for put_message

Protected Instance methods

The actual http request/response job is done here.

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 331
331:             def do_action (action, host, path, content=nil)
332: 
333:                 #puts "___path : #{path}"
334: 
335:                 doc = nil
336: 
337:                 http = Net::HTTP.new(host)
338:                 http.start do
339: 
340:                     date = Time.now.httpdate
341: 
342:                     req = if action == :get
343:                         Net::HTTP::Get.new(path)
344:                     elsif action == :post
345:                         Net::HTTP::Post.new(path)
346:                     elsif action == :put
347:                         Net::HTTP::Put.new(path)
348:                     else #action == :delete
349:                         Net::HTTP::Delete.new(path)
350:                     end
351: 
352:                     req['AWS-Version'] = AWS_VERSION
353:                     req['Date'] = date
354:                     req['Content-type'] = 'text/plain'
355: 
356:                     if action == :put or action == :post
357:                         req.body = content
358:                         req['Content-length'] = content.length.to_s if content
359:                     end
360: 
361:                     req['Authorization'] = generate_auth_header(
362:                         action, path, date, "text/plain")
363: 
364:                     #req.each_header do |k, v|
365:                     #    puts " - '#{k}' => '#{v}'"
366:                     #end
367: 
368:                     res = http.request(req)
369: 
370:                     case res
371:                     when Net::HTTPSuccess, Net::HTTPRedirection
372:                         doc = REXML::Document.new(res.read_body)
373:                     else
374:                         res.error!
375:                     end
376:                 end
377:                 raise_errors(doc)
378:                 return doc
379:             end

Generates the ‘AWS x:y" authorization header value.

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 400
400:             def generate_auth_header (action, path, date, content_type)
401: 
402:                 s = ""
403:                 s << action.to_s.upcase
404:                 s << "\n"
405: 
406:                 #s << Base64.encode64(Digest::MD5.digest(content)).strip                  #    if content
407:                     #
408:                     # documented but not necessary (not working)
409:                 s << "\n"
410: 
411:                 s << content_type
412:                 s << "\n"
413: 
414:                 s << date
415:                 s << "\n"
416: 
417:                 i = path.index '?'
418:                 path = path[0..i-1] if i
419:                 s << path
420: 
421:                 #puts ">>>#{s}<<<"
422: 
423:                 digest = OpenSSL::Digest::Digest.new 'sha1'
424: 
425:                 key = ENV['AMAZON_SECRET_ACCESS_KEY']
426: 
427:                 raise "No $AMAZON_SECRET_ACCESS_KEY env variable found" \
428:                     unless key
429: 
430:                 sig = OpenSSL::HMAC.digest(digest, key, s)
431:                 sig = Base64.encode64(sig).strip
432: 
433:                 "AWS #{ENV['AMAZON_ACCESS_KEY_ID']}:#{sig}"
434:             end

Scans the SQS XML reply for potential errors and raises an error if he encounters one.

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 385
385:             def raise_errors (doc)
386: 
387:                 doc.elements.each("//Error") do |e|
388: 
389:                     code = get_element_text(e, "Code")
390:                     return unless code
391: 
392:                     message = get_element_text(e, "Message")
393:                     raise "SQS::#{code} : #{m.text.to_s}"
394:                 end
395:             end

‘queue’ might be a Queue instance or a queue name. If it‘s a Queue instance, it is immediately returned, else the Queue instance is looked up and returned.

[Source]

     # File lib/openwfe/extras/util/sqs.rb, line 323
323:             def resolve_queue (queue)
324:                 return queue if queue.kind_of? Queue
325:                 return get_queue(queue.to_s)
326:             end

[Validate]