mux.rb (4380B)
1 # Derived from libmux, available in Plan 9 under /sys/src/libmux 2 # under the following terms: 3 # 4 # Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology 5 # 6 # Permission is hereby granted, free of charge, to any person obtaining 7 # a copy of this software and associated documentation files (the 8 # "Software"), to deal in the Software without restriction, including 9 # without limitation the rights to use, copy, modify, merge, publish, 10 # distribute, sublicense, and/or sell copies of the Software, and to 11 # permit persons to whom the Software is furnished to do so, subject to 12 # the following conditions: 13 # 14 # The above copyright notice and this permission notice shall be 15 # included in all copies or substantial portions of the Software. 16 require 'socket' 17 18 module R9P 19 class Mux 20 include Qlock 21 include Rendez 22 23 attr_reader :fd 24 25 def initialize(con, opts = {}, &block) 26 qlinit 27 @queue = [] 28 @lock = self 29 @outlock = Mutex.new 30 @inlock = Mutex.new 31 @deserialize = block 32 @freetag = 0 33 @wait = [] 34 @mwait = 0 35 @nwait = 0 36 @mintag = opts[:mintag] || 0 37 @maxtag = opts[:maxtag] || 1<<16-1 38 39 case con 40 when IO 41 @fd = con 42 when String 43 spec = con.split '!' 44 case spec.shift 45 when 'unix' 46 print "#{self.class.inspect}: init UNIXSocket: #{spec[0]}\n" if $debug > 1 47 @fd = UNIXSocket.new spec[0] 48 when 'tcp' 49 @fd = TCPSocket.new *spec 50 else 51 throw "Invalid connection spec" 52 end 53 else 54 throw "Invalid connection spec" 55 end 56 if @fd == nil 57 throw "No connection" 58 end 59 end 60 61 def rpc(dat) 62 r = newrpc dat 63 64 begin 65 qlock 66 while @muxer && @muxer != r && r.data == nil 67 r.sleep 68 end 69 70 if r.data == nil 71 if @muxer && @muxer != r 72 fail 73 end 74 @muxer = r 75 qunlock 76 begin 77 while r.data == nil 78 data = recv 79 if data == nil 80 qlock 81 @queue.delete r 82 throw "unexpected eof" 83 end 84 dispatch data 85 end 86 qlock 87 ensure 88 electmuxer 89 end 90 end 91 92 ensure 93 puttag(r) 94 qunlock 95 end 96 r.data 97 end 98 99 def electmuxer 100 @queue.each { |rpc| 101 if @muxer != rpc && rpc.async == false 102 @muxer = rpc 103 rpc.wakeup 104 return 105 end 106 } 107 @muxer = nil 108 end 109 110 def dispatch(dat) 111 tag = dat.tag - @mintag 112 r = nil 113 qsync { 114 if tag < 0 || tag > @mwait 115 printf "#{$0}: bad rpc tag: %ux\n", dat.tag 116 return 117 end 118 r = @wait[tag] 119 if r == nil || @queue.delete(r) == nil 120 printf "#{$0}: bad rpc tag: %ux (noone waiting on it)\n", dat.tag 121 return 122 end 123 } 124 r.data = dat 125 r.wakeup 126 end 127 128 def gettag(r) 129 tag = 0 130 131 while @nwait == @mwait 132 if @mwait < @maxtag - @mintag 133 mw = if @mwait == 0 134 1 135 else 136 @mwait << 1 137 end 138 @wait[mw] = nil 139 @freetag = @mwait 140 @mwait = mw 141 break 142 end 143 sleep 144 end 145 146 tag = @freetag 147 if @wait[tag] 148 tag.upto(@mwait) { |n| 149 if @wait[n] == nil 150 tag = n 151 break 152 end 153 } 154 0.upto(@freetag) { |n| 155 if @wait[n] == nil 156 tag = n 157 break 158 end 159 } 160 end 161 162 if @wait[tag] 163 throw "nwait botch" 164 end 165 166 @nwait += 1 167 @wait[tag] = r 168 r.tag = tag + @mintag 169 r.data.tag = r.tag 170 r.data = nil 171 r.tag 172 end 173 def puttag(r) 174 t = r.tag 175 if @wait[t] != r 176 fail 177 end 178 @wait[t] = nil 179 @nwait -= 1 180 @freetag = t 181 wakeup 182 end 183 184 def send(dat) 185 dat.serialize 186 len = @fd.syswrite dat.serial 187 print "send(#{dat.class.inspect}): len=#{len} data=#{dat.serial.inspect}" if $debug > 1 188 189 if len < dat.serial.length 190 return false 191 end 192 true 193 end 194 def recv 195 data = @fd.read 4 196 if data 197 len, = data.unpack "V" 198 data << @fd.read(len-4) 199 print "#{self.class.inspect}: recv: data=#{data.inspect}" if $debug > 1 200 @deserialize.call data 201 end 202 end 203 204 def newrpc(dat) 205 rpc = Rpc.new self 206 tag = nil 207 208 rpc.data = dat 209 qsync { 210 gettag rpc 211 @queue << rpc 212 } 213 214 if rpc.tag < 0 || send(dat) == false 215 qsync { 216 @queue.delete rpc 217 puttag rpc 218 } 219 return nil 220 end 221 222 rpc 223 end 224 private :gettag, :puttag,:send, :recv, :newrpc, :electmuxer, :dispatch 225 226 class Rpc 227 include Rendez 228 attr_accessor :data, :waiting, :async, :tag 229 230 def initialize(mux) 231 @lock = mux 232 @mux = mux 233 @data = nil 234 @waiting = true 235 @async = false 236 end 237 end 238 end 239 end