| Class | SQS::QueueService |
| In: |
lib/openwfe/extras/util/sqs.rb
|
| Parent: | Object |
| AWS_VERSION | = | "2006-04-01" |
| DEFAULT_QUEUE_HOST | = | "queue.amazonaws.com" |
# File lib/openwfe/extras/util/sqs.rb, line 99
99: def initialize (queue_host=nil)
100:
101: @queue_host = queue_host
102: @queue_host = DEFAULT_QUEUE_HOST unless @queue_host
103: end
Creates a queue.
If the queue name doesn‘t comply with SQS requirements for it, an error will be raised.
# File lib/openwfe/extras/util/sqs.rb, line 132
132: def create_queue (queue_name)
133:
134: doc = do_action :post, @queue_host, "/?QueueName=#{queue_name}"
135:
136: doc.elements.each("//QueueUrl") do |e|
137: return e.text.to_s
138: end
139: end
Deletes a given message.
The queue might be a queue name (String) or a Queue instance.
# File lib/openwfe/extras/util/sqs.rb, line 233
233: def delete_message (queue, message_id)
234:
235: queue = resolve_queue(queue)
236:
237: path = "#{queue.path}/#{message_id}"
238: #path = "#{queue.path}/#{CGI::escape(message_id)}"
239:
240: doc = do_action :delete, queue.host, path
241:
242: SQS::get_element_text(doc, "//StatusCode") == "Success"
243: end
Deletes the queue. Returns true if the delete was successful. You can empty a queue by called the method flush_queue
If ‘force’ is set to true, a flush will be performed on the queue before the actual delete operation. It should ensure a successful removal of the queue.
# File lib/openwfe/extras/util/sqs.rb, line 283
283: def delete_queue (queue, force=false)
284:
285: queue = resolve_queue(queue)
286:
287: flush_queue(queue) if force
288:
289: begin
290:
291: doc = do_action :delete, @queue_host, queue.path
292:
293: rescue Exception => e
294:
295: return false if e.message.match "^400 .*$"
296: end
297:
298: SQS::get_element_text(doc, "//StatusCode") == "Success"
299: end
Use with care !
Attempts at deleting all the messages in a queue. Returns the total count of messages deleted.
A call on this method might take a certain time, as it has to delete each message individually. AWS will perhaps add a proper ‘flush_queue’ method later.
The queue might be a queue name (String) or a Queue instance.
# File lib/openwfe/extras/util/sqs.rb, line 257
257: def flush_queue (queue)
258:
259: count = 0
260:
261: loop do
262:
263: l = get_messages(queue, :timeout => 0, :count => 255)
264: break if l.length < 1
265:
266: l.each do |m|
267: m.delete
268: count += 1
269: end
270: end
271:
272: return count
273: end
Retrieves a single message from a queue. Returns an instance of Message.
The queue might be a queue name (String) or a Queue instance.
# File lib/openwfe/extras/util/sqs.rb, line 212
212: def get_message (queue, message_id)
213:
214: queue = resolve_queue(queue)
215:
216: path = "#{queue.path}/#{message_id}"
217:
218: begin
219: doc = do_action :get, queue.host, path
220: Message.new(queue, doc.root.elements[1])
221: rescue Exception => e
222: #puts e.message
223: return nil if e.message.match "^404 .*$"
224: raise e
225: end
226: end
Retrieves a bunch of messages from a queue. Returns a list of Message instances.
There are actually two optional params that this method understands :
queue
The queue might be a queue name (String) or a Queue instance.
# File lib/openwfe/extras/util/sqs.rb, line 180
180: def get_messages (queue, params={})
181:
182: queue = resolve_queue(queue)
183:
184: path = "#{queue.path}/front"
185:
186: path += "?" if params.size > 0
187:
188: timeout = params[:timeout]
189: count = params[:count]
190:
191: path += "VisibilityTimeout=#{timeout}" if timeout
192: path += "&" if timeout and count
193: path += "NumberOfMessages=#{count}" if count
194:
195: doc = do_action :get, queue.host, path
196:
197: messages = []
198:
199: doc.elements.each("//Message") do |me|
200: messages << Message.new(queue, me)
201: end
202:
203: messages
204: end
Given a queue name, a Queue instance is returned.
# File lib/openwfe/extras/util/sqs.rb, line 304
304: def get_queue (queue_name)
305:
306: l = list_queues(queue_name)
307:
308: l.each do |q|
309: return q if q.name == queue_name
310: end
311:
312: #return nil
313: raise "found no queue named '#{queue_name}'"
314: end
Lists the queues for the active AWS account. If ‘prefix’ is given, only queues whose name begin with that prefix will be returned.
# File lib/openwfe/extras/util/sqs.rb, line 110
110: def list_queues (prefix=nil)
111:
112: queues = []
113:
114: path = "/"
115: path = "#{path}?QueueNamePrefix=#{prefix}" if prefix
116:
117: doc = do_action :get, @queue_host, path
118:
119: doc.elements.each("//QueueUrl") do |e|
120: queues << Queue.new(self, e)
121: end
122:
123: return queues
124: end
Given some content (‘text/plain’ content), send it as a message to a queue. Returns the SQS message id (a String).
The queue might be a queue name (String) or a Queue instance.
# File lib/openwfe/extras/util/sqs.rb, line 148
148: def put_message (queue, content)
149:
150: queue = resolve_queue(queue)
151:
152: doc = do_action :put, queue.host, "#{queue.path}/back", content
153:
154: #puts doc.to_s
155:
156: #status_code = SQS::get_element_text(doc, '//StatusCode')
157: #message_id = SQS::get_element_text(doc, '//MessageId')
158: #request_id = SQS::get_element_text(doc, '//RequestId')
159: #{ :status_code => status_code,
160: # :message_id => message_id,
161: # :request_id => request_id }
162:
163: SQS::get_element_text(doc, '//MessageId')
164: end
The actual http request/response job is done here.
# File lib/openwfe/extras/util/sqs.rb, line 331
331: def do_action (action, host, path, content=nil)
332:
333: #puts "___path : #{path}"
334:
335: doc = nil
336:
337: http = Net::HTTP.new(host)
338: http.start do
339:
340: date = Time.now.httpdate
341:
342: req = if action == :get
343: Net::HTTP::Get.new(path)
344: elsif action == :post
345: Net::HTTP::Post.new(path)
346: elsif action == :put
347: Net::HTTP::Put.new(path)
348: else #action == :delete
349: Net::HTTP::Delete.new(path)
350: end
351:
352: req['AWS-Version'] = AWS_VERSION
353: req['Date'] = date
354: req['Content-type'] = 'text/plain'
355:
356: if action == :put or action == :post
357: req.body = content
358: req['Content-length'] = content.length.to_s if content
359: end
360:
361: req['Authorization'] = generate_auth_header(
362: action, path, date, "text/plain")
363:
364: #req.each_header do |k, v|
365: # puts " - '#{k}' => '#{v}'"
366: #end
367:
368: res = http.request(req)
369:
370: case res
371: when Net::HTTPSuccess, Net::HTTPRedirection
372: doc = REXML::Document.new(res.read_body)
373: else
374: res.error!
375: end
376: end
377: raise_errors(doc)
378: return doc
379: end
Generates the ‘AWS x:y" authorization header value.
# File lib/openwfe/extras/util/sqs.rb, line 400
400: def generate_auth_header (action, path, date, content_type)
401:
402: s = ""
403: s << action.to_s.upcase
404: s << "\n"
405:
406: #s << Base64.encode64(Digest::MD5.digest(content)).strip # if content
407: #
408: # documented but not necessary (not working)
409: s << "\n"
410:
411: s << content_type
412: s << "\n"
413:
414: s << date
415: s << "\n"
416:
417: i = path.index '?'
418: path = path[0..i-1] if i
419: s << path
420:
421: #puts ">>>#{s}<<<"
422:
423: digest = OpenSSL::Digest::Digest.new 'sha1'
424:
425: key = ENV['AMAZON_SECRET_ACCESS_KEY']
426:
427: raise "No $AMAZON_SECRET_ACCESS_KEY env variable found" \
428: unless key
429:
430: sig = OpenSSL::HMAC.digest(digest, key, s)
431: sig = Base64.encode64(sig).strip
432:
433: "AWS #{ENV['AMAZON_ACCESS_KEY_ID']}:#{sig}"
434: end
Scans the SQS XML reply for potential errors and raises an error if he encounters one.
# File lib/openwfe/extras/util/sqs.rb, line 385
385: def raise_errors (doc)
386:
387: doc.elements.each("//Error") do |e|
388:
389: code = get_element_text(e, "Code")
390: return unless code
391:
392: message = get_element_text(e, "Message")
393: raise "SQS::#{code} : #{m.text.to_s}"
394: end
395: end
‘queue’ might be a Queue instance or a queue name. If it‘s a Queue instance, it is immediately returned, else the Queue instance is looked up and returned.
# File lib/openwfe/extras/util/sqs.rb, line 323
323: def resolve_queue (queue)
324: return queue if queue.kind_of? Queue
325: return get_queue(queue.to_s)
326: end