Class Stomp::Client
In: lib/stomp.rb
Parent: Object
Message Client Connection lib/stomp.rb Stomp dot/m_0_0.png

Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.

Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.

Methods

abort   acknowledge   begin   close   commit   join   new   open   open?   register_receipt_listener   send   subscribe   unsubscribe  

Public Class methods

Accepts a username (default ""), password (default ""), host (default localhost), and port (default 61613)

[Source]

     # File lib/stomp.rb, line 269
269:     def initialize user="", pass="", host="localhost", port=61613, reliable=false
270:       if user =~ /stomp:\/\/(\w+):(\d+)/
271:         user = ""
272:         pass = ""
273:         host = $1
274:         port = $2
275:         reliable = false
276:       elsif user =~ /stomp:\/\/(\w+):(\w+)@(\w+):(\d+)/
277:         user = $1
278:         pass = $2
279:         host = $3
280:         port = $4
281:         reliable = false
282:       end
283:       
284:       @id_mutex = Mutex.new
285:       @ids = 1
286:       @connection = Connection.open user, pass, host, port, reliable
287:       @listeners = {}
288:       @receipt_listeners = {}
289:       @running = true
290:       @replay_messages_by_txn = Hash.new
291:       @listener_thread = Thread.start do
292:         while @running
293:           message = @connection.receive
294:           case
295:           when message == NIL
296:             break
297:           when message.command == 'MESSAGE'
298:             if listener = @listeners[message.headers['destination']]
299:               listener.call(message)
300:             end
301:           when message.command == 'RECEIPT'
302:             if listener = @receipt_listeners[message.headers['receipt-id']]
303:               listener.call(message)
304:             end
305:           end
306:         end
307:       end
308:     end

Accepts a username (default ""), password (default ""), host (default localhost), and port (default 61613)

[Source]

     # File lib/stomp.rb, line 318
318:     def self.open user="", pass="", host="localhost", port=61613, reliable=false
319:       Client.new user, pass, host, port, reliable
320:     end

Public Instance methods

Abort a transaction by name

[Source]

     # File lib/stomp.rb, line 328
328:     def abort name, headers={}
329:       @connection.abort name, headers
330: 
331:       # lets replay any ack'd messages in this transaction      
332:       replay_list = @replay_messages_by_txn[name]
333:       if replay_list
334:         replay_list.each do |message| 
335:           if listener = @listeners[message.headers['destination']]
336:             listener.call(message)
337:           end
338:         end
339:       end
340:     end

Acknowledge a message, used then a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp.rb, line 369
369:     def acknowledge message, headers={}
370:       txn_id = headers[:transaction]
371:       if txn_id
372:         # lets keep around messages ack'd in this transaction in case we rollback
373:         replay_list = @replay_messages_by_txn[txn_id]
374:         if replay_list == nil
375:           replay_list = []
376:           @replay_messages_by_txn[txn_id] = replay_list
377:         end
378:         replay_list << message
379:       end
380:       if block_given?
381:         headers['receipt'] = register_receipt_listener lambda {|r| yield r}
382:       end
383:       @connection.ack message.headers['message-id'], headers
384:     end

Begin a transaction by name

[Source]

     # File lib/stomp.rb, line 323
323:     def begin name, headers={}
324:       @connection.begin name, headers
325:     end

Close out resources in use by this client

[Source]

     # File lib/stomp.rb, line 405
405:     def close
406:       @connection.disconnect
407:       @running = false 
408:     end

Commit a transaction by name

[Source]

     # File lib/stomp.rb, line 343
343:     def commit name, headers={}
344:       txn_id = headers[:transaction]
345:       @replay_messages_by_txn.delete(txn_id)
346:       @connection.commit name, headers
347:     end

Join the listener thread for this client, generally used to wait for a quit signal

[Source]

     # File lib/stomp.rb, line 312
312:     def join
313:       @listener_thread.join
314:     end

Is this client open?

[Source]

     # File lib/stomp.rb, line 400
400:     def open?
401:       @connection.open?
402:     end

Send message to destination

If a block is given a receipt will be requested and passed to the block on receipt

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp.rb, line 392
392:     def send destination, message, headers = {}
393:       if block_given?
394:         headers['receipt'] = register_receipt_listener lambda {|r| yield r}
395:       end
396:       @connection.send destination, message, headers
397:     end

Subscribe to a destination, must be passed a block which will be used as a callback listener

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp.rb, line 353
353:     def subscribe destination, headers={}
354:       raise "No listener given" unless block_given?
355:       @listeners[destination] = lambda {|msg| yield msg}
356:       @connection.subscribe destination, headers
357:     end

Unsubecribe from a channel

[Source]

     # File lib/stomp.rb, line 360
360:     def unsubscribe name, headers={}
361:       @connection.unsubscribe name, headers
362:       @listeners[name] = nil
363:     end

Private Instance methods

[Source]

     # File lib/stomp.rb, line 411
411:     def register_receipt_listener listener
412:       id = -1
413:       @id_mutex.synchronize do
414:         id = @ids.to_s
415:         @ids = @ids.succ
416:       end
417:       @receipt_listeners[id] = listener
418:       id
419:     end

[Validate]