r9p

git clone git://oldgit.suckless.org/r9p/
Log | Files | Refs

mux.rb (4380B)


      1 # Derived from libmux, available in Plan 9 under /sys/src/libmux
      2 # under the following terms:
      3 #
      4 # Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology
      5 # 
      6 # Permission is hereby granted, free of charge, to any person obtaining
      7 # a copy of this software and associated documentation files (the
      8 # "Software"), to deal in the Software without restriction, including
      9 # without limitation the rights to use, copy, modify, merge, publish,
     10 # distribute, sublicense, and/or sell copies of the Software, and to
     11 # permit persons to whom the Software is furnished to do so, subject to
     12 # the following conditions:
     13 # 
     14 # The above copyright notice and this permission notice shall be
     15 # included in all copies or substantial portions of the Software.
     16 require 'socket'
     17 
     18 module R9P
     19 	class Mux
     20 		include Qlock
     21 		include Rendez
     22 
     23 		attr_reader :fd
     24 
     25 		def initialize(con, opts = {}, &block)
     26 			qlinit
     27 			@queue = []
     28 			@lock = self
     29 			@outlock = Mutex.new
     30 			@inlock = Mutex.new
     31 			@deserialize = block
     32 			@freetag = 0
     33 			@wait = []
     34 			@mwait = 0
     35 			@nwait = 0
     36 			@mintag = opts[:mintag] || 0
     37 			@maxtag = opts[:maxtag] || 1<<16-1
     38 
     39 			case con
     40 			when IO
     41 				@fd = con
     42 			when String
     43 				spec = con.split '!'
     44 				case spec.shift
     45 				when 'unix'
     46 					print "#{self.class.inspect}: init UNIXSocket: #{spec[0]}\n" if $debug > 1
     47 					@fd = UNIXSocket.new spec[0]
     48 				when 'tcp'
     49 					@fd = TCPSocket.new *spec
     50 				else
     51 					throw "Invalid connection spec"
     52 				end
     53 			else
     54 				throw "Invalid connection spec"
     55 			end
     56 			if @fd == nil
     57 				throw "No connection"
     58 			end
     59 		end
     60 
     61 		def rpc(dat)
     62 			r = newrpc dat
     63 
     64 			begin
     65 				qlock
     66 				while @muxer && @muxer != r && r.data == nil
     67 					r.sleep
     68 				end
     69 
     70 				if r.data == nil
     71 					if @muxer && @muxer != r
     72 						fail
     73 					end
     74 					@muxer = r
     75 					qunlock
     76 					begin
     77 						while r.data == nil
     78 							data = recv
     79 							if data == nil
     80 								qlock
     81 								@queue.delete r
     82 								throw "unexpected eof"
     83 							end
     84 							dispatch data
     85 						end
     86 						qlock
     87 					ensure
     88 						electmuxer
     89 					end
     90 				end
     91 
     92 			ensure
     93 				puttag(r)
     94 				qunlock
     95 			end
     96 			r.data
     97 		end
     98 
     99 		def electmuxer
    100 			@queue.each { |rpc|
    101 				if @muxer != rpc && rpc.async == false
    102 					@muxer = rpc
    103 					rpc.wakeup
    104 					return
    105 				end
    106 			}
    107 			@muxer = nil
    108 		end
    109 
    110 		def dispatch(dat)
    111 			tag = dat.tag - @mintag
    112 			r = nil
    113 			qsync {
    114 				if tag < 0 || tag > @mwait
    115 					printf "#{$0}: bad rpc tag: %ux\n", dat.tag
    116 					return
    117 				end
    118 				r = @wait[tag]
    119 				if r == nil || @queue.delete(r) == nil
    120 					printf "#{$0}: bad rpc tag: %ux (noone waiting on it)\n", dat.tag
    121 					return
    122 				end
    123 			}
    124 			r.data = dat
    125 			r.wakeup
    126 		end
    127 
    128 		def gettag(r)
    129 			tag = 0
    130 
    131 			while @nwait == @mwait
    132 				if @mwait < @maxtag - @mintag
    133 					mw =	if @mwait == 0
    134 							1
    135 						else
    136 							@mwait << 1
    137 						end
    138 					@wait[mw] = nil
    139 					@freetag = @mwait
    140 					@mwait = mw
    141 					break
    142 				end
    143 				sleep
    144 			end
    145 
    146 			tag = @freetag
    147 			if @wait[tag]
    148 				tag.upto(@mwait) { |n|
    149 					if @wait[n] == nil
    150 						tag = n
    151 						break
    152 					end
    153 				}
    154 				0.upto(@freetag) { |n|
    155 					if @wait[n] == nil
    156 						tag = n
    157 						break
    158 					end
    159 				}
    160 			end
    161 
    162 			if @wait[tag]
    163 				throw "nwait botch"
    164 			end
    165 
    166 			@nwait += 1
    167 			@wait[tag] = r
    168 			r.tag = tag + @mintag
    169 			r.data.tag = r.tag
    170 			r.data = nil
    171 			r.tag
    172 		end
    173 		def puttag(r)
    174 			t = r.tag
    175 			if @wait[t] != r
    176 				fail
    177 			end
    178 			@wait[t] = nil
    179 			@nwait -= 1
    180 			@freetag = t
    181 			wakeup
    182 		end
    183 
    184 		def send(dat)
    185 			dat.serialize
    186 			len = @fd.syswrite dat.serial
    187 			print "send(#{dat.class.inspect}): len=#{len} data=#{dat.serial.inspect}" if $debug > 1
    188 
    189 			if len < dat.serial.length
    190 				return false
    191 			end
    192 			true
    193 		end
    194 		def recv
    195 			data = @fd.read 4
    196 			if data
    197 				len, = data.unpack "V"
    198 				data << @fd.read(len-4)
    199 				print "#{self.class.inspect}: recv: data=#{data.inspect}" if $debug > 1
    200 				@deserialize.call data
    201 			end
    202 		end
    203 
    204 		def newrpc(dat)
    205 			rpc = Rpc.new self
    206 			tag = nil
    207 
    208 			rpc.data = dat
    209 			qsync {
    210 				gettag rpc
    211 				@queue << rpc
    212 			}
    213 
    214 			if rpc.tag < 0 || send(dat) == false
    215 				qsync {
    216 					@queue.delete rpc
    217 					puttag rpc
    218 				}
    219 				return nil
    220 			end
    221 
    222 			rpc
    223 		end
    224 		private :gettag, :puttag,:send, :recv, :newrpc, :electmuxer, :dispatch
    225 
    226 		class Rpc
    227 			include Rendez
    228 			attr_accessor :data, :waiting, :async, :tag
    229 
    230 			def initialize(mux)
    231 				@lock = mux
    232 				@mux = mux
    233 				@data = nil
    234 				@waiting = true
    235 				@async = false
    236 			end
    237 		end
    238 	end
    239 end