class DBus::MessageQueue

Encapsulates a socket so that we can {#push} and {#pop} {Message}s.

Constants

MSG_BUF_SIZE

The buffer size for messages.

Attributes

socket[R]

The socket that is used to connect with the bus.

Public Class Methods

new(address) click to toggle source
   # File lib/dbus/message_queue.rb
24 def initialize(address)
25   DBus.logger.debug "MessageQueue: #{address}"
26   @address = address
27   @buffer = ""
28   # Reduce allocations by using a single buffer for our socket
29   @read_buffer = String.new(capacity: MSG_BUF_SIZE)
30   @is_tcp = false
31   @mutex = Mutex.new
32   connect
33 end

Public Instance Methods

<<(message)
Alias for: push
pop(blocking: true) click to toggle source

@param blocking [Boolean]

true:  wait to return a {Message};
false: may return `nil`

@return [Message,nil] one message or nil if unavailable @raise EOFError @todo failure modes

   # File lib/dbus/message_queue.rb
41 def pop(blocking: true)
42   # FIXME: this is not enough, the R/W test deadlocks on shared connections
43   @mutex.synchronize do
44     buffer_from_socket_nonblock
45     message = message_from_buffer_nonblock
46     if blocking
47       # we can block
48       while message.nil?
49         r, _d, _d = IO.select([@socket])
50         if r && r[0] == @socket
51           buffer_from_socket_nonblock
52           message = message_from_buffer_nonblock
53         end
54       end
55     end
56     message
57   end
58 end
push(message) click to toggle source
   # File lib/dbus/message_queue.rb
60 def push(message)
61   @mutex.synchronize do
62     @socket.write(message.marshall)
63   end
64 end
Also aliased as: <<

Private Instance Methods

connect() click to toggle source

Connect to the bus and initialize the connection.

   # File lib/dbus/message_queue.rb
70 def connect
71   addresses = @address.split ";"
72   # connect to first one that succeeds
73   addresses.find do |a|
74     transport, keyvaluestring = a.split ":"
75     kv_list = keyvaluestring.split ","
76     kv_hash = {}
77     kv_list.each do |kv|
78       key, escaped_value = kv.split "="
79       value = escaped_value.gsub(/%(..)/) { |_m| [Regexp.last_match(1)].pack "H2" }
80       kv_hash[key] = value
81     end
82     case transport
83     when "unix"
84       connect_to_unix kv_hash
85     when "tcp"
86       connect_to_tcp kv_hash
87     when "launchd"
88       connect_to_launchd kv_hash
89     else
90       # ignore, report?
91     end
92   end
93   # returns the address that worked or nil.
94   # how to report failure?
95 end
connect_to_launchd(params) click to toggle source
    # File lib/dbus/message_queue.rb
136 def connect_to_launchd(params)
137   socket_var = params["env"]
138   socket = `launchctl getenv #{socket_var}`.chomp
139   connect_to_unix "path" => socket
140 end
connect_to_tcp(params) click to toggle source

Connect to a bus over tcp and initialize the connection.

    # File lib/dbus/message_queue.rb
 98 def connect_to_tcp(params)
 99   host = params["host"]
100   port = params["port"]
101   if host && port
102     begin
103       # initialize the tcp socket
104       @socket = TCPSocket.new(host, port.to_i)
105       @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
106       init_connection
107       @is_tcp = true
108     rescue Exception => e
109       puts "Oops:", e
110       puts "Error: Could not establish connection to: #{host}:#{port}, will now exit."
111       exit(1) # a little harsh
112     end
113   else
114     # Danger, Will Robinson: the specified "path" is not usable
115     puts "Error: supplied params: #{@params}, unusable! sorry."
116   end
117 end
connect_to_unix(params) click to toggle source

Connect to an abstract unix bus and initialize the connection.

    # File lib/dbus/message_queue.rb
120 def connect_to_unix(params)
121   @socket = Socket.new(Socket::Constants::PF_UNIX, Socket::Constants::SOCK_STREAM, 0)
122   @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
123   if !params["abstract"].nil?
124     sockaddr = if HOST_END == LIL_END
125                  "\1\0\0#{params["abstract"]}"
126                else
127                  "\0\1\0#{params["abstract"]}"
128                end
129   elsif !params["path"].nil?
130     sockaddr = Socket.pack_sockaddr_un(params["path"])
131   end
132   @socket.connect(sockaddr)
133   init_connection
134 end
init_connection() click to toggle source

Initialize the connection to the bus.

    # File lib/dbus/message_queue.rb
143 def init_connection
144   client = Authentication::Client.new(@socket)
145   client.authenticate
146 end