1
# -*- encoding: utf-8 -*-
2
3
require 'socket'
4
require 'timeout'
5
require 'io/wait'
6
require 'digest/sha1'
7
8
module Stomp
9
10
  # Low level connection which maps commands and supports
11
  # synchronous receives
12
  class Connection
13
    attr_reader :connection_frame
14
    attr_reader :disconnect_receipt
15
    attr_reader :protocol
16
    attr_reader :session
17
    attr_reader :hb_received # Heartbeat received on time
18
    attr_reader :hb_sent # Heartbeat sent successfully
19
    #alias :obj_send :send
20
21
    def self.default_port(ssl)
22
      ssl ? 61612 : 61613
23
    end
24
25
    # A new Connection object accepts the following parameters:
26
    #
27
    #   login             (String,  default : '')
28
    #   passcode          (String,  default : '')
29
    #   host              (String,  default : 'localhost')
30
    #   port              (Integer, default : 61613)
31
    #   reliable          (Boolean, default : false)
32
    #   reconnect_delay   (Integer, default : 5)
33
    #
34
    #   e.g. c = Connection.new("username", "password", "localhost", 61613, true)
35
    #
36
    # Hash:
37
    #
38
    #   hash = {
39
    #     :hosts => [
40
    #       {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
41
    #       {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
42
    #     ],
43
    #     :reliable => true,
44
    #     :initial_reconnect_delay => 0.01,
45
    #     :max_reconnect_delay => 30.0,
46
    #     :use_exponential_back_off => true,
47
    #     :back_off_multiplier => 2,
48
    #     :max_reconnect_attempts => 0,
49
    #     :randomize => false,
50
    #     :backup => false,
51
    #     :connect_timeout => 0,
52
    #     :connect_headers => {},
53
    #     :parse_timeout => 5,
54
    #     :logger => nil,
55
    #   }
56
    #
57
    #   e.g. c = Connection.new(hash)
58
    #
59
    # TODO
60
    # Stomp URL :
61
    #   A Stomp URL must begin with 'stomp://' and can be in one of the following forms:
62
    #
63
    #   stomp://host:port
64
    #   stomp://host.domain.tld:port
65
    #   stomp://user:pass@host:port
66
    #   stomp://user:pass@host.domain.tld:port
67
    #
68
    def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
69
      @received_messages = []
70
      @protocol = Stomp::SPL_10 # Assumed at first
71
      @hb_received = true # Assumed at first
72
      @hb_sent = true # Assumed at first
73
      @hbs = @hbr = false # Sending/Receiving heartbeats. Assume no for now.
74
75
      if login.is_a?(Hash)
76
        hashed_initialize(login)
77
      else
78
        @host = host
79
        @port = port
80
        @login = login
81
        @passcode = passcode
82
        @reliable = reliable
83
        @reconnect_delay = reconnect_delay
84
        @connect_headers = connect_headers
85
        @ssl = false
86
        @parameters = nil
87
        @parse_timeout = 5		# To override, use hashed parameters
88
        @connect_timeout = 0	# To override, use hashed parameters
89
        @logger = nil     		# To override, use hashed parameters
90
        warn "login looks like a URL, do you have the correct parameters?" if @login =~ /:\/\//
91
      end
92
93
      # Use Mutexes:  only one lock per each thread
94
      # Revert to original implementation attempt
95
      @transmit_semaphore = Mutex.new
96
      @read_semaphore = Mutex.new
97
      @socket_semaphore = Mutex.new
98
99
      @subscriptions = {}
100
      @failure = nil
101
      @connection_attempts = 0
102
103
      socket
104
    end
105
106
    def hashed_initialize(params)
107
108
      @parameters = refine_params(params)
109
      @reliable =  @parameters[:reliable]
110
      @reconnect_delay = @parameters[:initial_reconnect_delay]
111
      @connect_headers = @parameters[:connect_headers]
112
      @parse_timeout =  @parameters[:parse_timeout]
113
      @connect_timeout =  @parameters[:connect_timeout]
114
      @logger =  @parameters[:logger]
115
      #sets the first host to connect
116
      change_host
117
    end
118
119
    # Syntactic sugar for 'Connection.new' See 'initialize' for usage.
120
    def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
121
      Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers)
122
    end
123
124
    def socket
125
      @socket_semaphore.synchronize do
126
        used_socket = @socket
127
        used_socket = nil if closed?
128
129
        while used_socket.nil? || !@failure.nil?
130
          @failure = nil
131
          begin
132
            used_socket = open_socket
133
            # Open complete
134
135
            connect(used_socket)
136
            if @logger && @logger.respond_to?(:on_connected)
137
              @logger.on_connected(log_params)
138
            end
139
            @connection_attempts = 0
140
          rescue
141
            @failure = $!
142
            used_socket = nil
143
            raise unless @reliable
144
            raise if @failure.is_a?(Stomp::Error::LoggerConnectionError)
145
            if @logger && @logger.respond_to?(:on_connectfail)
146
              # on_connectfail may raise
147
              begin
148
                @logger.on_connectfail(log_params)
149
              rescue Exception => aex
150
                raise if aex.is_a?(Stomp::Error::LoggerConnectionError)
151
              end
152
            else
153
              $stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n"
154
            end
155
            raise Stomp::Error::MaxReconnectAttempts if max_reconnect_attempts?
156
157
            sleep(@reconnect_delay)
158
159
            @connection_attempts += 1
160
161
            if @parameters
162
              change_host
163
              increase_reconnect_delay
164
            end
165
          end
166
        end
167
        @socket = used_socket
168
      end
169
    end
170
171
    def refine_params(params)
172
      params = params.uncamelize_and_symbolize_keys
173
      default_params = {
174
        :connect_headers => {},
175
        :reliable => true,
176
        # Failover parameters
177
        :initial_reconnect_delay => 0.01,
178
        :max_reconnect_delay => 30.0,
179
        :use_exponential_back_off => true,
180
        :back_off_multiplier => 2,
181
        :max_reconnect_attempts => 0,
182
        :randomize => false,
183
        :backup => false,
184
        :connect_timeout => 0,
185
        # Parse Timeout
186
        :parse_timeout => 5,
187
        :dmh => false,
188
      }
189
190
      res_params = default_params.merge(params)
191
      if res_params[:dmh]
192
        res_params = _expand_hosts(res_params)
193
      end
194
      return res_params
195
    end
196
197
    def change_host
198
      @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize]
199
200
      # Set first as master and send it to the end of array
201
      current_host = @parameters[:hosts].shift
202
      @parameters[:hosts] << current_host
203
204
      @ssl = current_host[:ssl]
205
      @host = current_host[:host]
206
      @port = current_host[:port] || Connection::default_port(@ssl)
207
      @login = current_host[:login] || ""
208
      @passcode = current_host[:passcode] || ""
209
210
    end
211
212
    def max_reconnect_attempts?
213
      !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts]
214
    end
215
216
    def increase_reconnect_delay
217
218
      @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off]
219
      @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay]
220
221
      @reconnect_delay
222
    end
223
224
    # Is this connection open?
225
    def open?
226
      !@closed
227
    end
228
229
    # Is this connection closed?
230
    def closed?
231
      @closed
232
    end
233
234
    # Begin a transaction, requires a name for the transaction
235
    def begin(name, headers = {})
236
      raise Stomp::Error::NoCurrentConnection if closed?
237
      headers = headers.symbolize_keys
238
      headers[:transaction] = name
239
      _headerCheck(headers)
240
      transmit(Stomp::CMD_BEGIN, headers)
241
    end
242
243
    # Acknowledge a message, used when a subscription has specified
244
    # client acknowledgement ( connection.subscribe "/queue/a", :ack => 'client'g
245
    #
246
    # Accepts a transaction header ( :transaction => 'some_transaction_id' )
247
    def ack(message_id, headers = {})
248
      raise Stomp::Error::NoCurrentConnection if closed?
249
      raise Stomp::Error::MessageIDRequiredError if message_id.nil? || message_id == ""
250
      headers = headers.symbolize_keys
251
      headers[:'message-id'] = message_id
252
      if @protocol >= Stomp::SPL_11
253
        raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
254
      end
255
      _headerCheck(headers)
256
      transmit(Stomp::CMD_ACK, headers)
257
    end
258
259
    # STOMP 1.1+ NACK
260
    def nack(message_id, headers = {})
261
      raise Stomp::Error::NoCurrentConnection if closed?
262
      raise Stomp::Error::UnsupportedProtocolError if @protocol == Stomp::SPL_10
263
      raise Stomp::Error::MessageIDRequiredError if message_id.nil? || message_id == ""
264
      headers = headers.symbolize_keys
265
      headers[:'message-id'] = message_id
266
      raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
267
      _headerCheck(headers)
268
      transmit(Stomp::CMD_NACK, headers)
269
    end
270
271
    # Commit a transaction by name
272
    def commit(name, headers = {})
273
      raise Stomp::Error::NoCurrentConnection if closed?
274
      headers = headers.symbolize_keys
275
      headers[:transaction] = name
276
      _headerCheck(headers)
277
      transmit(Stomp::CMD_COMMIT, headers)
278
    end
279
280
    # Abort a transaction by name
281
    def abort(name, headers = {})
282
      raise Stomp::Error::NoCurrentConnection if closed?
283
      headers = headers.symbolize_keys
284
      headers[:transaction] = name
285
      _headerCheck(headers)
286
      transmit(Stomp::CMD_ABORT, headers)
287
    end
288
289
    # Subscribe to a destination, must specify a name
290
    def subscribe(name, headers = {}, subId = nil)
291
      raise Stomp::Error::NoCurrentConnection if closed?
292
      headers = headers.symbolize_keys
293
      headers[:destination] = name
294
      if @protocol >= Stomp::SPL_11
295
        raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
296
        headers[:id] = subId if headers[:id].nil?
297
      end
298
      _headerCheck(headers)
299
      if @logger && @logger.respond_to?(:on_subscribe)
300
        @logger.on_subscribe(log_params, headers)
301
      end
302
303
      # Store the sub so that we can replay if we reconnect.
304
      if @reliable
305
        subId = name if subId.nil?
306
        raise Stomp::Error::DuplicateSubscription if @subscriptions[subId]
307
        @subscriptions[subId] = headers
308
      end
309
310
      transmit(Stomp::CMD_SUBSCRIBE, headers)
311
    end
312
313
    # Unsubscribe from a destination, which must be specified
314
    def unsubscribe(dest, headers = {}, subId = nil)
315
      raise Stomp::Error::NoCurrentConnection if closed?
316
      headers = headers.symbolize_keys
317
      headers[:destination] = dest
318
      if @protocol >= Stomp::SPL_11
319
        raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
320
      end
321
      _headerCheck(headers)
322
      transmit(Stomp::CMD_UNSUBSCRIBE, headers)
323
      if @reliable
324
        subId = dest if subId.nil?
325
        @subscriptions.delete(subId)
326
      end
327
    end
328
329
    # Publish message to destination
330
    #
331
    # To disable content length header ( :suppress_content_length => true )
332
    # Accepts a transaction header ( :transaction => 'some_transaction_id' )
333
    def publish(destination, message, headers = {})
334
      raise Stomp::Error::NoCurrentConnection if closed?
335
      headers = headers.symbolize_keys
336
      headers[:destination] = destination
337
      _headerCheck(headers)
338
      if @logger && @logger.respond_to?(:on_publish)
339
        @logger.on_publish(log_params, message, headers)
340
      end
341
      transmit(Stomp::CMD_SEND, headers, message)
342
    end
343
344
    def obj_send(*args)
345
      __send__(*args)
346
    end
347
348
    # Send a message back to the source or to the dead letter queue
349
    #
350
    # Accepts a dead letter queue option ( :dead_letter_queue => "/queue/DLQ" )
351
    # Accepts a limit number of redeliveries option ( :max_redeliveries => 6 )
352
    # Accepts a force client acknowledgement option (:force_client_ack => true)
353
    def unreceive(message, options = {})
354
      raise Stomp::Error::NoCurrentConnection if closed?
355
      options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge options
356
      # Lets make sure all keys are symbols
357
      message.headers = message.headers.symbolize_keys
358
359
      retry_count = message.headers[:retry_count].to_i || 0
360
      message.headers[:retry_count] = retry_count + 1
361
      transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}"
362
      message_id = message.headers.delete(:'message-id')
363
364
      begin
365
        self.begin transaction_id
366
367
        if client_ack?(message) || options[:force_client_ack]
368
          self.ack(message_id, :transaction => transaction_id)
369
        end
370
371
        if retry_count <= options[:max_redeliveries]
372
          self.publish(message.headers[:destination], message.body, message.headers.merge(:transaction => transaction_id))
373
        else
374
          # Poison ack, sending the message to the DLQ
375
          self.publish(options[:dead_letter_queue], message.body, message.headers.merge(:transaction => transaction_id, :original_destination => message.headers[:destination], :persistent => true))
376
        end
377
        self.commit transaction_id
378
      rescue Exception => exception
379
        self.abort transaction_id
380
        raise exception
381
      end
382
    end
383
384
    def client_ack?(message)
385
      headers = @subscriptions[message.headers[:destination]]
386
      !headers.nil? && headers[:ack] == "client"
387
    end
388
389
    # Close this connection
390
    def disconnect(headers = {})
391
      raise Stomp::Error::NoCurrentConnection if closed?
392
      headers = headers.symbolize_keys
393
      _headerCheck(headers)
394
      if @protocol >= Stomp::SPL_11
395
        @st.kill if @st # Kill ticker thread if any
396
        @rt.kill if @rt # Kill ticker thread if any
397
      end
398
      transmit(Stomp::CMD_DISCONNECT, headers)
399
      @disconnect_receipt = receive if headers[:receipt]
400
      if @logger && @logger.respond_to?(:on_disconnect)
401
        @logger.on_disconnect(log_params)
402
      end
403
      close_socket
404
    end
405
406
    # Return a pending message if one is available, otherwise
407
    # return nil
408
    def poll
409
      raise Stomp::Error::NoCurrentConnection if closed?
410
      # No need for a read lock here.  The receive method eventually fulfills
411
      # that requirement.
412
      return nil if @socket.nil? || !@socket.ready?
413
      receive
414
    end
415
416
    # Receive a frame, block until the frame is received
417
    def __old_receive
418
      # The receive may fail so we may need to retry.
419
      while TRUE
420
        begin
421
          used_socket = socket
422
          return _receive(used_socket)
423
        rescue
424
          @failure = $!
425
          raise unless @reliable
426
          errstr = "receive failed: #{$!}"
427
          if @logger && @logger.respond_to?(:on_miscerr)
428
            @logger.on_miscerr(log_params, errstr)
429
          else
430
            $stderr.print errstr
431
          end
432
        end
433
      end
434
    end
435
436
    def receive
437
      raise Stomp::Error::NoCurrentConnection if closed?
438
      super_result = __old_receive
439
      if super_result.nil? && @reliable && !closed?
440
        errstr = "connection.receive returning EOF as nil - resetting connection.\n"
441
        if @logger && @logger.respond_to?(:on_miscerr)
442
          @logger.on_miscerr(log_params, errstr)
443
        else
444
          $stderr.print errstr
445
        end
446
        @socket = nil
447
        super_result = __old_receive
448
      end
449
      #
450
      if @logger && @logger.respond_to?(:on_receive)
451
        @logger.on_receive(log_params, super_result)
452
      end
453
      return super_result
454
    end
455
456
    # Convenience method
457
    def set_logger(logger)
458
      @logger = logger
459
    end
460
461
    # Convenience method
462
    def valid_utf8?(s)
463
      case RUBY_VERSION
464
        when /1\.8/
465
          rv = _valid_utf8?(s)
466
        else
467
          rv = s.encoding.name != Stomp::UTF8 ? false : s.valid_encoding?
468
      end
469
      rv
470
    end
471
472
    # Convenience method for clients, return a SHA1 digest for arbitrary data
473
    def sha1(data)
474
      Digest::SHA1.hexdigest(data)
475
    end
476
477
    # Convenience method for clients, return a type 4 UUID.
478
    def uuid()
479
      b = []
480
      0.upto(15) do |i|
481
        b << rand(255)
482
      end
483
	    b[6] = (b[6] & 0x0F) | 0x40
484
	    b[8] = (b[8] & 0xbf) | 0x80
485
      #             0  1  2  3   4   5  6  7   8  9  10 11 12 13 14 15
486
	    rs = sprintf("%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x%02x%02x",
487
        b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15])
488
      rs
489
    end
490
491
    private
492
493
      def _expand_hosts(hash)
494
        new_hash = hash.clone
495
        new_hash[:hosts_cloned] = hash[:hosts].clone
496
        new_hash[:hosts] = []
497
        #
498
        hash[:hosts].each do |host_parms|
499
          ai = Socket.getaddrinfo(host_parms[:host], nil, nil, Socket::SOCK_STREAM)
500
          next if ai.nil? || ai.size == 0
501
          info6 = ai.detect {|info| info[4] == Socket::AF_INET6}
502
          info4 = ai.detect {|info| info[4] == Socket::AF_INET}
503
          if info6
504
            new_hostp = host_parms.clone
505
            new_hostp[:host] = info6[3]
506
            new_hash[:hosts] << new_hostp
507
          end
508
          if info4
509
            new_hostp = host_parms.clone
510
            new_hostp[:host] = info4[3]
511
            new_hash[:hosts] << new_hostp
512
          end
513
        end
514
        return new_hash
515
      end
516
517
      def _receive( read_socket )
518
        @read_semaphore.synchronize do
519
          line = read_socket.gets
520
          return nil if line.nil?
521
          # If the reading hangs for more than X seconds, abort the parsing process.
522
          # X defaults to 5.  Override allowed in connection hash parameters.
523
          Timeout::timeout(@parse_timeout, Stomp::Error::PacketParsingTimeout) do
524
            # Reads the beginning of the message until it runs into a empty line
525
            message_header = ''
526
            begin
527
              message_header += line
528
              line = read_socket.gets
529
              raise Stomp::Error::StompServerError if line.nil?
530
            end until line =~ /^\s?\n$/
531
532
            # Checks if it includes content_length header
533
            content_length = message_header.match /content-length\s?:\s?(\d+)\s?\n/
534
            message_body = ''
535
536
            # If content_length is present, read the specified amount of bytes
537
            if content_length
538
              message_body = read_socket.read content_length[1].to_i
539
              raise Stomp::Error::InvalidMessageLength unless parse_char(read_socket.getc) == "\0"
540
            # Else read the rest of the message until the first \0
541
            else
542
              message_body = read_socket.readline("\0")
543
              message_body.chop!
544
            end
545
546
            # If the buffer isn't empty, reads trailing new lines.
547
            #
548
            # Note: experiments with JRuby seem to show that .ready? never
549
            # returns true.  This means that this code to drain trailing new
550
            # lines never runs using JRuby.
551
            #
552
            # Note 2: the draining of new lines mmust be done _after_ a message
553
            # is read.  Do _not_ leave them on the wire and attempt to drain them
554
            # at the start of the next read.  Attempting to do that breaks the
555
            # asynchronous nature of the 'poll' method.
556
            while read_socket.ready?
557
              last_char = read_socket.getc
558
              break unless last_char
559
              if parse_char(last_char) != "\n"
560
                read_socket.ungetc(last_char)
561
                break
562
              end
563
            end
564
            # And so, a JRuby hack.  Remove any new lines at the start of the
565
            # next buffer.
566
            message_header.gsub!(/^\n?/, "")
567
568
            if @protocol >= Stomp::SPL_11
569
              @lr = Time.now.to_f if @hbr
570
            end
571
            # Adds the excluded \n and \0 and tries to create a new message with it
572
            msg = Message.new(message_header + "\n" + message_body + "\0", @protocol >= Stomp::SPL_11)
573
            #
574
            if @protocol >= Stomp::SPL_11 && msg.command != Stomp::CMD_CONNECTED
575
              msg.headers = _decodeHeaders(msg.headers)
576
            end
577
            msg
578
          end
579
        end
580
      end
581
582
      def parse_char(char)
583
        RUBY_VERSION > '1.9' ? char : char.chr
584
      end
585
586
      def transmit(command, headers = {}, body = '')
587
        # The transmit may fail so we may need to retry.
588
        while TRUE
589
          begin
590
            used_socket = socket
591
            _transmit(used_socket, command, headers, body)
592
            return
593
          rescue Stomp::Error::MaxReconnectAttempts => e
594
              raise
595
          rescue
596
            @failure = $!
597
            raise unless @reliable
598
            errstr = "transmit to #{@host} failed: #{$!}\n"
599
            if @logger && @logger.respond_to?(:on_miscerr)
600
              @logger.on_miscerr(log_params, errstr)
601
            else
602
              $stderr.print errstr
603
            end
604
          end
605
        end
606
      end
607
608
      def _transmit(used_socket, command, headers = {}, body = '')
609
        if @protocol >= Stomp::SPL_11 && command != Stomp::CMD_CONNECT
610
          headers = _encodeHeaders(headers)
611
        end
612
        @transmit_semaphore.synchronize do
613
          # Handle nil body
614
          body = '' if body.nil?
615
          # The content-length should be expressed in bytes.
616
          # Ruby 1.8: String#length => # of bytes; Ruby 1.9: String#length => # of characters
617
          # With Unicode strings, # of bytes != # of characters.  So, use String#bytesize when available.
618
          body_length_bytes = body.respond_to?(:bytesize) ? body.bytesize : body.length
619
620
          # ActiveMQ interprets every message as a BinaryMessage
621
          # if content_length header is included.
622
          # Using :suppress_content_length => true will suppress this behaviour
623
          # and ActiveMQ will interpret the message as a TextMessage.
624
          # For more information refer to http://juretta.com/log/2009/05/24/activemq-jms-stomp/
625
          # Lets send this header in the message, so it can maintain state when using unreceive
626
          headers[:'content-length'] = "#{body_length_bytes}" unless headers[:suppress_content_length]
627
          headers[:'content-type'] = "text/plain; charset=UTF-8" unless headers[:'content-type']
628
          used_socket.puts command
629
          headers.each do |k,v|
630
            if v.is_a?(Array)
631
              v.each do |e|
632
                used_socket.puts "#{k}:#{e}"
633
              end
634
            else
635
              used_socket.puts "#{k}:#{v}"
636
            end
637
          end
638
          used_socket.puts
639
          used_socket.write body
640
          used_socket.write "\0"
641
642
          if @protocol >= Stomp::SPL_11
643
            @ls = Time.now.to_f if @hbs
644
          end
645
646
        end
647
      end
648
649
      def open_tcp_socket
650
      	tcp_socket = nil
651
652
        if @logger && @logger.respond_to?(:on_connecting)
653
          @logger.on_connecting(log_params)
654
        end
655
656
      	Timeout::timeout(@connect_timeout, Stomp::Error::SocketOpenTimeout) do
657
        	tcp_socket = TCPSocket.open @host, @port
658
      	end
659
660
        tcp_socket
661
      end
662
663
      def open_ssl_socket
664
        require 'openssl' unless defined?(OpenSSL)
665
        begin # Any raised SSL exceptions
666
          ctx = OpenSSL::SSL::SSLContext.new
667
          ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE # Assume for now
668
          #
669
          # Note: if a client uses :ssl => true this results in the gem using
670
          # the _default_ Ruby ciphers list.  This is _known_ to fail in later
671
          # Ruby releases.  The gem provides a default cipher list that may
672
          # function in these cases.  To use this connect with:
673
          # * :ssl => Stomp::SSLParams.new
674
          # * :ssl => Stomp::SSLParams.new(..., :ciphers => Stomp::DEFAULT_CIPHERS)
675
          #
676
          # If connecting with an SSLParams instance, and the _default_ Ruby
677
          # ciphers list is required, use:
678
          # * :ssl => Stomp::SSLParams.new(..., :use_ruby_ciphers => true)
679
          #
680
          # If a custom ciphers list is required, connect with:
681
          # * :ssl => Stomp::SSLParams.new(..., :ciphers => custom_ciphers_list)
682
          #
683
          if @ssl != true
684
            #
685
            # Here @ssl is:
686
            # * an instance of Stomp::SSLParams
687
            # Control would not be here if @ssl == false or @ssl.nil?.
688
            #
689
690
            # Back reference the SSLContext
691
            @ssl.ctx = ctx
692
693
            # Server authentication parameters if required
694
            if @ssl.ts_files
695
              ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
696
              truststores = OpenSSL::X509::Store.new
697
              fl = @ssl.ts_files.split(",")
698
              fl.each do |fn|
699
                # Add next cert file listed
700
                raise Stomp::Error::SSLNoTruststoreFileError if !File::exists?(fn)
701
                raise Stomp::Error::SSLUnreadableTruststoreFileError if !File::readable?(fn)
702
                truststores.add_file(fn)
703
              end
704
              ctx.cert_store = truststores
705
            end
706
707
            # Client authentication parameters
708
            # Both cert file and key file must be present or not, it can not be a mix
709
            raise Stomp::Error::SSLClientParamsError if @ssl.cert_file.nil? && !@ssl.key_file.nil?
710
            raise Stomp::Error::SSLClientParamsError if !@ssl.cert_file.nil? && @ssl.key_file.nil?
711
            if @ssl.cert_file # Any check will do here
712
              raise Stomp::Error::SSLNoCertFileError if !File::exists?(@ssl.cert_file)
713
              raise Stomp::Error::SSLUnreadableCertFileError if !File::readable?(@ssl.cert_file)
714
              ctx.cert = OpenSSL::X509::Certificate.new(File.read(@ssl.cert_file))
715
              raise Stomp::Error::SSLNoKeyFileError if !File::exists?(@ssl.key_file)
716
              raise Stomp::Error::SSLUnreadableKeyFileError if !File::readable?(@ssl.key_file)
717
              ctx.key  = OpenSSL::PKey::RSA.new(File.read(@ssl.key_file))
718
            end
719
720
            # Cipher list
721
            if !@ssl.use_ruby_ciphers # No Ruby ciphers (the default)
722
              if @ssl.ciphers # User ciphers list?
723
                ctx.ciphers = @ssl.ciphers # Accept user supplied ciphers
724
              else
725
                ctx.ciphers = Stomp::DEFAULT_CIPHERS # Just use Stomp defaults
726
              end
727
            end
728
          end
729
730
          #
731
          ssl = nil
732
          if @logger && @logger.respond_to?(:on_ssl_connecting)
733
            @logger.on_ssl_connecting(log_params)
734
          end
735
736
        	Timeout::timeout(@connect_timeout, Stomp::Error::SocketOpenTimeout) do
737
          	ssl = OpenSSL::SSL::SSLSocket.new(open_tcp_socket, ctx)
738
        	end
739
          def ssl.ready?
740
            ! @rbuffer.empty? || @io.ready?
741
          end
742
          ssl.connect
743
          if @ssl != true
744
            # Pass back results if possible
745
            if RUBY_VERSION =~ /1\.8\.[56]/
746
              @ssl.verify_result = "N/A for Ruby #{RUBY_VERSION}"
747
            else
748
              @ssl.verify_result = ssl.verify_result
749
            end
750
            @ssl.peer_cert = ssl.peer_cert
751
          end
752
          if @logger && @logger.respond_to?(:on_ssl_connected)
753
            @logger.on_ssl_connected(log_params)
754
          end
755
          ssl
756
        rescue Exception => ex
757
          if @logger && @logger.respond_to?(:on_ssl_connectfail)
758
            lp = log_params.clone
759
            lp[:ssl_exception] = ex
760
            @logger.on_ssl_connectfail(lp)
761
          end
762
          #
763
          raise # Reraise
764
        end
765
      end
766
767
      def close_socket
768
        begin
769
          # Need to set @closed = true before closing the socket
770
          # within the @read_semaphore thread
771
          @closed = true
772
          @read_semaphore.synchronize do
773
            @socket.close
774
          end
775
        rescue
776
          #Ignoring if already closed
777
        end
778
        @closed
779
      end
780
781
      def open_socket
782
        used_socket = @ssl ? open_ssl_socket : open_tcp_socket
783
        # try to close the old connection if any
784
        close_socket
785
786
        @closed = false
787
        # Use keepalive
788
        used_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
789
        used_socket
790
      end
791
792
      def connect(used_socket)
793
        @connect_headers = {} unless @connect_headers # Caller said nil/false
794
        headers = @connect_headers.clone
795
        headers[:login] = @login
796
        headers[:passcode] = @passcode
797
        _pre_connect
798
        _transmit(used_socket, "CONNECT", headers)
799
        @connection_frame = _receive(used_socket)
800
        _post_connect
801
        @disconnect_receipt = nil
802
        @session = @connection_frame.headers["session"] if @connection_frame
803
        # replay any subscriptions.
804
        @subscriptions.each { |k,v| _transmit(used_socket, Stomp::CMD_SUBSCRIBE, v) }
805
      end
806
807
      def log_params
808
        lparms = @parameters.clone if @parameters
809
        lparms = {} unless lparms
810
        lparms[:cur_host] = @host
811
        lparms[:cur_port] = @port
812
        lparms[:cur_login] = @login
813
        lparms[:cur_passcode] = @passcode
814
        lparms[:cur_ssl] = @ssl
815
        lparms[:cur_recondelay] = @reconnect_delay
816
        lparms[:cur_parseto] = @parse_timeout
817
        lparms[:cur_conattempts] = @connection_attempts
818
        #
819
        lparms
820
      end
821
822
      def _pre_connect
823
        @connect_headers = @connect_headers.symbolize_keys
824
        raise Stomp::Error::ProtocolErrorConnect if (@connect_headers[:"accept-version"] && !@connect_headers[:host])
825
        raise Stomp::Error::ProtocolErrorConnect if (!@connect_headers[:"accept-version"] && @connect_headers[:host])
826
        return unless (@connect_headers[:"accept-version"] && @connect_headers[:host]) # 1.0
827
        # Try 1.1 or greater
828
        okvers = []
829
        avers = @connect_headers[:"accept-version"].split(",")
830
        avers.each do |nver|
831
          if Stomp::SUPPORTED.index(nver)
832
            okvers << nver
833
          end
834
        end
835
        raise Stomp::Error::UnsupportedProtocolError if okvers == []
836
        @connect_headers[:"accept-version"] = okvers.join(",") # This goes to server
837
        # Heartbeats - pre connect
838
        return unless @connect_headers[:"heart-beat"]
839
        _validate_hbheader()
840
      end
841
842
      def _post_connect
843
        return unless (@connect_headers[:"accept-version"] && @connect_headers[:host])
844
        return if @connection_frame.command == Stomp::CMD_ERROR
845
        cfh = @connection_frame.headers.symbolize_keys
846
        @protocol = cfh[:version]
847
        # Should not happen, but check anyway
848
        raise Stomp::Error::UnsupportedProtocolError unless Stomp::SUPPORTED.index(@protocol)
849
        # Heartbeats
850
        return unless @connect_headers[:"heart-beat"]
851
        _init_heartbeats()
852
      end
853
854
      def _validate_hbheader()
855
        return if @connect_headers[:"heart-beat"] == "0,0" # Caller does not want heartbeats.  OK.
856
        parts = @connect_headers[:"heart-beat"].split(",")
857
        if (parts.size != 2) || (parts[0] != parts[0].to_i.to_s) || (parts[1] != parts[1].to_i.to_s)
858
          raise Stomp::Error::InvalidHeartBeatHeaderError
859
        end
860
      end
861
862
      def _init_heartbeats()
863
        return if @connect_headers[:"heart-beat"] == "0,0" # Caller does not want heartbeats.  OK.
864
        #
865
        @cx = @cy = @sx = @sy = 0, # Variable names as in spec
866
        #
867
        @sti = @rti = 0.0 # Send/Receive ticker interval.
868
        #
869
        @ls = @lr = -1.0 # Last send/receive time (from Time.now.to_f)
870
        #
871
        @st = @rt = nil # Send/receive ticker thread
872
        #
873
        cfh = @connection_frame.headers.symbolize_keys
874
        return if cfh[:"heart-beat"] == "0,0" # Server does not want heartbeats
875
        #
876
        parts = @connect_headers[:"heart-beat"].split(",")
877
        @cx = parts[0].to_i
878
        @cy = parts[1].to_i
879
        #
880
        parts = cfh[:"heart-beat"].split(",")
881
        @sx = parts[0].to_i
882
        @sy = parts[1].to_i
883
        # Catch odd situations like someone has used => heart-beat:000,00000
884
        return if (@cx == 0 && @cy == 0) || (@sx == 0 && @sy == 0)
885
        #
886
        @hbs = @hbr = true # Sending/Receiving heartbeats. Assume yes at first.
887
        # Check for sending
888
        @hbs = false if @cx == 0 || @sy == 0
889
        # Check for receiving
890
        @hbr = false if @sx == 0 || @cy == 0
891
        # Should not do heartbeats at all
892
        return if (!@hbs && !@hbr)
893
        # If sending
894
        if @hbs
895
          sm = @cx >= @sy ? @cx : @sy # ticker interval, ms
896
          @sti = 1000.0 * sm # ticker interval, μs
897
          @ls = Time.now.to_f # best guess at start
898
          _start_send_ticker
899
        end
900
901
        # If receiving
902
        if @hbr
903
          rm = @sx >= @cy ? @sx : @cy # ticker interval, ms
904
          @rti = 1000.0 * rm # ticker interval, μs
905
          @lr = Time.now.to_f # best guess at start
906
          _start_receive_ticker
907
        end
908
909
      end
910
911
      def _start_send_ticker
912
        sleeptime = @sti / 1000000.0 # Sleep time secs
913
        @st = Thread.new {
914
          while true do
915
            sleep sleeptime
916
            curt = Time.now.to_f
917
            if @logger && @logger.respond_to?(:on_hbfire)
918
              @logger.on_hbfire(log_params, "send_fire", curt)
919
            end
920
            delta = curt - @ls
921
            if delta > (@sti - (@sti/5.0)) / 1000000.0 # Be tolerant (minus)
922
              if @logger && @logger.respond_to?(:on_hbfire)
923
                @logger.on_hbfire(log_params, "send_heartbeat", curt)
924
              end
925
              # Send a heartbeat
926
              @transmit_semaphore.synchronize do
927
                begin
928
                  @socket.puts
929
                  @ls = curt # Update last send
930
                  @hb_sent = true # Reset if necessary
931
                rescue Exception => sendex
932
                  @hb_sent = false # Set the warning flag
933
                  if @logger && @logger.respond_to?(:on_hbwrite_fail)
934
                    @logger.on_hbwrite_fail(log_params, {"ticker_interval" => @sti,
935
                      "exception" => sendex})
936
                  end
937
                  raise # Re-raise.  What else could be done here?
938
                end
939
              end
940
            end
941
            Thread.pass
942
          end
943
        }
944
      end
945
946
      def _start_receive_ticker
947
        sleeptime = @rti / 1000000.0 # Sleep time secs
948
        @rt = Thread.new {
949
          while true do
950
            sleep sleeptime
951
            curt = Time.now.to_f
952
            if @logger && @logger.respond_to?(:on_hbfire)
953
              @logger.on_hbfire(log_params, "receive_fire", curt)
954
            end
955
            delta = curt - @lr
956
            if delta > ((@rti + (@rti/5.0)) / 1000000.0) # Be tolerant (plus)
957
              if @logger && @logger.respond_to?(:on_hbfire)
958
                @logger.on_hbfire(log_params, "receive_heartbeat", curt)
959
              end
960
              # Client code could be off doing something else (that is, no reading of
961
              # the socket has been requested by the caller).  Try to  handle that case.
962
              lock = @read_semaphore.try_lock
963
              if lock
964
                last_char = @socket.getc
965
                plc = parse_char(last_char)
966
                if plc == "\n" # Server Heartbeat
967
                  @lr = Time.now.to_f
968
                else
969
                  @socket.ungetc(last_char)
970
                end
971
                @read_semaphore.unlock
972
              else
973
                # Shrug.  Have not received one.  Just set warning flag.
974
                @hb_received = false
975
                if @logger && @logger.respond_to?(:on_hbread_fail)
976
                  @logger.on_hbread_fail(log_params, {"ticker_interval" => @rti})
977
                end
978
              end
979
            else
980
              @hb_received = true # Reset if necessary
981
            end
982
            Thread.pass
983
          end
984
        }
985
      end
986
987
    # Ref:
988
    # http://unicode.org/mail-arch/unicode-ml/y2003-m02/att-0467/01-The_Algorithm_to_Valide_an_UTF-8_String
989
    #
990
    def _valid_utf8?(string)
991
      case RUBY_VERSION
992
        when /1\.8\.[56]/
993
          bytes = []
994
          0.upto(string.length-1) {|i|
995
            bytes << string[i]
996
          }
997
        else
998
          bytes = string.bytes
999
      end
1000
1001
      #
1002
      valid = true
1003
      index = -1
1004
      nb_hex = nil
1005
      ni_hex = nil
1006
      state = "start"
1007
      next_byte_save = nil
1008
      #
1009
      bytes.each do |next_byte|
1010
        index += 1
1011
        next_byte_save = next_byte
1012
        ni_hex = sprintf "%x", index
1013
        nb_hex = sprintf "%x", next_byte
1014
        # puts "Top: #{next_byte}(0x#{nb_hex}), index: #{index}(0x#{ni_hex})" if DEBUG
1015
        case state
1016
1017
          # State: 'start'
1018
          # The 'start' state:
1019
          # * handles all occurrences of valid single byte characters i.e., the ASCII character set
1020
          # * provides state transition logic for start bytes of valid characters with 2-4 bytes
1021
          # * signals a validation failure for all other single bytes
1022
          #
1023
          when "start"
1024
            # puts "state: start" if DEBUG
1025
            case next_byte
1026
1027
              # ASCII
1028
              # * Input = 0x00-0x7F : change state to START
1029
              when (0x00..0x7f)
1030
                # puts "state: start 1" if DEBUG
1031
                state = "start"
1032
1033
              # Start byte of two byte characters
1034
              # * Input = 0xC2-0xDF: change state to A
1035
              when (0xc2..0xdf)
1036
                # puts "state: start 2" if DEBUG
1037
                state = "a"
1038
1039
              # Start byte of some three byte characters
1040
              # * Input = 0xE1-0xEC, 0xEE-0xEF: change state to B
1041
              when (0xe1..0xec)
1042
                # puts "state: start 3" if DEBUG
1043
                state = "b"
1044
              when (0xee..0xef)
1045
                # puts "state: start 4" if DEBUG
1046
                state = "b"
1047
1048
              # Start byte of special three byte characters
1049
              # * Input = 0xE0: change state to C
1050
              when 0xe0
1051
                # puts "state: start 5" if DEBUG
1052
                state = "c"
1053
1054
              # Start byte of the remaining three byte characters
1055
              # * Input = 0xED: change state to D
1056
              when 0xed
1057
                # puts "state: start 6" if DEBUG
1058
                state = "d"
1059
1060
              # Start byte of some four byte characters
1061
              # * Input = 0xF1-0xF3:change state to E
1062
              when (0xf1..0xf3)
1063
                # puts "state: start 7" if DEBUG
1064
                state = "e"
1065
1066
              # Start byte of special four byte characters
1067
              # * Input = 0xF0: change state to F
1068
              when 0xf0
1069
                # puts "state: start 8" if DEBUG
1070
                state = "f"
1071
1072
              # Start byte of very special four byte characters
1073
              # * Input = 0xF4: change state to G
1074
              when 0xf4
1075
                # puts "state: start 9" if DEBUG
1076
                state = "g"
1077
1078
              # All other single characters are invalid
1079
              # * Input = Others (0x80-0xBF,0xC0-0xC1, 0xF5-0xFF): ERROR
1080
              else
1081
                valid = false
1082
                break
1083
            end # of the inner case, the 'start' state
1084
1085
          # The last continuation byte of a 2, 3, or 4 byte character
1086
          # State: 'a'
1087
          #  o Input = 0x80-0xBF: change state to START
1088
          #  o Others: ERROR
1089
          when "a"
1090
            # puts "state: a" if DEBUG
1091
            if (0x80..0xbf) === next_byte
1092
              state = "start"
1093
            else
1094
              valid = false
1095
              break
1096
            end
1097
1098
          # The first continuation byte for most 3 byte characters
1099
          # (those with start bytes in: 0xe1-0xec or 0xee-0xef)
1100
          # State: 'b'
1101
          # o Input = 0x80-0xBF: change state to A
1102
          # o Others: ERROR
1103
          when "b"
1104
            # puts "state: b" if DEBUG
1105
            if (0x80..0xbf) === next_byte
1106
              state = "a"
1107
            else
1108
              valid = false
1109
              break
1110
            end
1111
1112
          # The first continuation byte for some special 3 byte characters
1113
          # (those with start byte 0xe0)
1114
          # State: 'c'
1115
          # o Input = 0xA0-0xBF: change state to A
1116
          # o Others: ERROR
1117
          when "c"
1118
            # puts "state: c" if DEBUG
1119
            if (0xa0..0xbf) === next_byte
1120
              state = "a"
1121
            else
1122
              valid = false
1123
              break
1124
            end
1125
1126
          # The first continuation byte for the remaining 3 byte characters
1127
          # (those with start byte 0xed)
1128
          # State: 'd'
1129
          # o Input = 0x80-0x9F: change state to A
1130
          # o Others: ERROR
1131
          when "d"
1132
            # puts "state: d" if DEBUG
1133
            if (0x80..0x9f) === next_byte
1134
              state = "a"
1135
            else
1136
              valid = false
1137
              break
1138
            end
1139
1140
          # The first continuation byte for some 4 byte characters
1141
          # (those with start bytes in: 0xf1-0xf3)
1142
          # State: 'e'
1143
          # o Input = 0x80-0xBF: change state to B
1144
          # o Others: ERROR
1145
          when "e"
1146
            # puts "state: e" if DEBUG
1147
            if (0x80..0xbf) === next_byte
1148
              state = "b"
1149
            else
1150
              valid = false
1151
              break
1152
            end
1153
1154
          # The first continuation byte for some special 4 byte characters
1155
          # (those with start byte 0xf0)
1156
          # State: 'f'
1157
          # o Input = 0x90-0xBF: change state to B
1158
          # o Others: ERROR
1159
          when "f"
1160
            # puts "state: f" if DEBUG
1161
            if (0x90..0xbf) === next_byte
1162
              state = "b"
1163
            else
1164
              valid = false
1165
              break
1166
            end
1167
1168
          # The first continuation byte for the remaining 4 byte characters
1169
          # (those with start byte 0xf4)
1170
          # State: 'g'
1171
          # o Input = 0x80-0x8F: change state to B
1172
          # o Others: ERROR
1173
          when "g"
1174
            # puts "state: g" if DEBUG
1175
            if (0x80..0x8f) === next_byte
1176
              state = "b"
1177
            else
1178
              valid = false
1179
              break
1180
            end
1181
1182
          #
1183
          else
1184
            raise RuntimeError, "state: default"
1185
        end
1186
      end
1187
      #
1188
      # puts "State at end: #{state}" if DEBUG
1189
      # Catch truncation at end of string
1190
      if valid and state != 'start'
1191
        # puts "Resetting valid value" if DEBUG
1192
        valid = false
1193
      end
1194
      #
1195
      valid
1196
    end # of _valid_utf8?
1197
1198
    def _headerCheck(h)
1199
      return if @protocol == Stomp::SPL_10 # Do nothing for this environment
1200
      #
1201
      h.each_pair do |k,v|
1202
        # Keys here are symbolized
1203
        ks = k.to_s
1204
        ks.force_encoding(Stomp::UTF8) if ks.respond_to?(:force_encoding)
1205
        raise Stomp::Error::UTF8ValidationError unless valid_utf8?(ks)
1206
        #
1207
        if v.is_a?(Array)
1208
          v.each do |e|
1209
            e.force_encoding(Stomp::UTF8) if e.respond_to?(:force_encoding)
1210
            raise Stomp::Error::UTF8ValidationError unless valid_utf8?(e)
1211
          end
1212
        else
1213
          vs = v.to_s + "" # Values are usually Strings, but could be TrueClass or Symbol
1214
          # The + "" forces an 'unfreeze' if necessary
1215
          vs.force_encoding(Stomp::UTF8) if vs.respond_to?(:force_encoding)
1216
          raise Stomp::Error::UTF8ValidationError unless valid_utf8?(vs)
1217
        end
1218
      end
1219
    end
1220
1221
    #
1222
    def _encodeHeaders(h)
1223
      eh = {}
1224
      h.each_pair do |k,v|
1225
        # Keys are symbolized
1226
        ks = k.to_s
1227
        if v.is_a?(Array)
1228
          kenc = Stomp::HeaderCodec::encode(ks)
1229
          eh[kenc] = []
1230
          v.each do |e|
1231
            eh[kenc] << Stomp::HeaderCodec::encode(e)
1232
          end
1233
        else
1234
          vs = v.to_s
1235
          eh[Stomp::HeaderCodec::encode(ks)] = Stomp::HeaderCodec::encode(vs)
1236
        end
1237
      end
1238
      eh
1239
    end
1240
1241
    #
1242
    def _decodeHeaders(h)
1243
      dh = {}
1244
      h.each_pair do |k,v|
1245
        # Keys here are NOT! symbolized
1246
        if v.is_a?(Array)
1247
          kdec = Stomp::HeaderCodec::decode(k)
1248
          dh[kdec] = []
1249
          v.each do |e|
1250
            dh[kdec] << Stomp::HeaderCodec::decode(e)
1251
          end
1252
        else
1253
          vs = v.to_s
1254
          dh[Stomp::HeaderCodec::decode(k)] = Stomp::HeaderCodec::decode(vs)
1255
        end
1256
      end
1257
      dh
1258
    end
1259
1260
  end # class
1261
1262
end # module