class DBus::MessageQueue
Encapsulates a socket so that we can {#push} and {#pop} {Message}s.
Constants
- MSG_BUF_SIZE
The buffer size for messages.
Attributes
socket[R]
The socket that is used to connect with the bus.
Public Class Methods
new(address)
click to toggle source
# File lib/dbus/message_queue.rb 24 def initialize(address) 25 DBus.logger.debug "MessageQueue: #{address}" 26 @address = address 27 @buffer = "" 28 # Reduce allocations by using a single buffer for our socket 29 @read_buffer = String.new(capacity: MSG_BUF_SIZE) 30 @is_tcp = false 31 @mutex = Mutex.new 32 connect 33 end
Public Instance Methods
pop(blocking: true)
click to toggle source
@param blocking [Boolean]
true: wait to return a {Message};
false: may return `nil`
@return [Message,nil] one message or nil if unavailable @raise EOFError @todo failure modes
# File lib/dbus/message_queue.rb 41 def pop(blocking: true) 42 # FIXME: this is not enough, the R/W test deadlocks on shared connections 43 @mutex.synchronize do 44 buffer_from_socket_nonblock 45 message = message_from_buffer_nonblock 46 if blocking 47 # we can block 48 while message.nil? 49 r, _d, _d = IO.select([@socket]) 50 if r && r[0] == @socket 51 buffer_from_socket_nonblock 52 message = message_from_buffer_nonblock 53 end 54 end 55 end 56 message 57 end 58 end
push(message)
click to toggle source
# File lib/dbus/message_queue.rb 60 def push(message) 61 @mutex.synchronize do 62 @socket.write(message.marshall) 63 end 64 end
Also aliased as: <<
Private Instance Methods
connect()
click to toggle source
Connect to the bus and initialize the connection.
# File lib/dbus/message_queue.rb 70 def connect 71 addresses = @address.split ";" 72 # connect to first one that succeeds 73 addresses.find do |a| 74 transport, keyvaluestring = a.split ":" 75 kv_list = keyvaluestring.split "," 76 kv_hash = {} 77 kv_list.each do |kv| 78 key, escaped_value = kv.split "=" 79 value = escaped_value.gsub(/%(..)/) { |_m| [Regexp.last_match(1)].pack "H2" } 80 kv_hash[key] = value 81 end 82 case transport 83 when "unix" 84 connect_to_unix kv_hash 85 when "tcp" 86 connect_to_tcp kv_hash 87 when "launchd" 88 connect_to_launchd kv_hash 89 else 90 # ignore, report? 91 end 92 end 93 # returns the address that worked or nil. 94 # how to report failure? 95 end
connect_to_launchd(params)
click to toggle source
# File lib/dbus/message_queue.rb 136 def connect_to_launchd(params) 137 socket_var = params["env"] 138 socket = `launchctl getenv #{socket_var}`.chomp 139 connect_to_unix "path" => socket 140 end
connect_to_tcp(params)
click to toggle source
Connect to a bus over tcp and initialize the connection.
# File lib/dbus/message_queue.rb 98 def connect_to_tcp(params) 99 host = params["host"] 100 port = params["port"] 101 if host && port 102 begin 103 # initialize the tcp socket 104 @socket = TCPSocket.new(host, port.to_i) 105 @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) 106 init_connection 107 @is_tcp = true 108 rescue Exception => e 109 puts "Oops:", e 110 puts "Error: Could not establish connection to: #{host}:#{port}, will now exit." 111 exit(1) # a little harsh 112 end 113 else 114 # Danger, Will Robinson: the specified "path" is not usable 115 puts "Error: supplied params: #{@params}, unusable! sorry." 116 end 117 end
connect_to_unix(params)
click to toggle source
Connect to an abstract unix bus and initialize the connection.
# File lib/dbus/message_queue.rb 120 def connect_to_unix(params) 121 @socket = Socket.new(Socket::Constants::PF_UNIX, Socket::Constants::SOCK_STREAM, 0) 122 @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) 123 if !params["abstract"].nil? 124 sockaddr = if HOST_END == LIL_END 125 "\1\0\0#{params["abstract"]}" 126 else 127 "\0\1\0#{params["abstract"]}" 128 end 129 elsif !params["path"].nil? 130 sockaddr = Socket.pack_sockaddr_un(params["path"]) 131 end 132 @socket.connect(sockaddr) 133 init_connection 134 end
init_connection()
click to toggle source
Initialize the connection to the bus.
# File lib/dbus/message_queue.rb 143 def init_connection 144 client = Authentication::Client.new(@socket) 145 client.authenticate 146 end