mux.py (6857B)
1 # Derived from libmux, available in Plan 9 under /sys/src/libmux 2 # under the following terms: 3 # 4 # Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology 5 # 6 # Permission is hereby granted, free of charge, to any person obtaining 7 # a copy of this software and associated documentation files (the 8 # "Software"), to deal in the Software without restriction, including 9 # without limitation the rights to use, copy, modify, merge, publish, 10 # distribute, sublicense, and/or sell copies of the Software, and to 11 # permit persons to whom the Software is furnished to do so, subject to 12 # the following conditions: 13 # 14 # The above copyright notice and this permission notice shall be 15 # included in all copies or substantial portions of the Software. 16 17 import os 18 import sys 19 import traceback 20 21 from pyxp import fields 22 from pyxp.dial import dial 23 from threading import * 24 Condition = Condition().__class__ 25 26 __all__ = 'Mux', 27 28 class Mux(object): 29 def __init__(self, con, process, flush=None, mintag=0, maxtag=1<<16 - 1): 30 self.lock = RLock() 31 self.tagcond = Condition(self.lock) 32 self.outlock = RLock() 33 self.inlock = RLock() 34 self.process = process 35 self.flush = flush 36 self.wait = {} 37 self.free = set(range(mintag, maxtag)) 38 self.mintag = mintag 39 self.maxtag = maxtag 40 self.muxer = None 41 42 self.async_mux = Queue(self.mux) 43 self.async_dispatch = Queue(self.async_dispatch) 44 45 if isinstance(con, basestring): 46 con = dial(con) 47 self.fd = con 48 49 if self.fd is None: 50 raise Exception("No connection") 51 52 def mux(self, rpc): 53 with self.lock: 54 try: 55 rpc.waiting = True 56 while self.muxer and self.muxer != rpc and rpc.data is None: 57 rpc.wait() 58 59 if rpc.data is None: 60 assert self.muxer in (rpc, None) 61 self.muxer = rpc 62 try: 63 self.lock.release() 64 while rpc.data is None: 65 data = self.recv() 66 if data is None: 67 raise Exception("unexpected eof") 68 self.dispatch(data) 69 finally: 70 self.lock.acquire() 71 self.electmuxer() 72 except Exception: 73 traceback.print_exc(sys.stderr) 74 if rpc.tag in self.wait: 75 self.wait.pop(rpc.tag) 76 if self.flush: 77 self.flush(self, rpc.data) 78 raise 79 80 return rpc.data 81 82 def rpc(self, dat, async=None): 83 rpc = self.newrpc(dat, async) 84 if async: 85 self.async_mux.push(rpc) 86 else: 87 return self.mux(rpc) 88 89 def async_dispatch(self, rpc): 90 rpc.async(self, rpc.data) 91 92 def electmuxer(self): 93 for rpc in self.wait.itervalues(): 94 if self.muxer != rpc and rpc.waiting: 95 self.muxer = rpc 96 rpc.notify() 97 return 98 self.muxer = None 99 100 def dispatch(self, dat): 101 with self.lock: 102 rpc = self.wait.get(dat.tag, None) 103 if rpc: 104 self.puttag(rpc) 105 rpc.dispatch(dat) 106 elif False: 107 print "bad rpc tag: %u (no one waiting on it)" % dat.tag 108 109 def gettag(self, r): 110 tag = 0 111 112 while not self.free: 113 self.tagcond.wait() 114 115 tag = self.free.pop() 116 117 if tag in self.wait: 118 raise Exception("nwait botch") 119 120 self.wait[tag] = r 121 122 r.tag = tag 123 r.orig.tag = r.tag 124 return r.tag 125 126 def puttag(self, rpc): 127 if rpc.tag in self.wait: 128 del self.wait[rpc.tag] 129 self.free.add(rpc.tag) 130 self.tagcond.notify() 131 132 def send(self, dat): 133 data = ''.join(dat.marshall()) 134 n = self.fd.send(data) 135 return n == len(data) 136 def recv(self): 137 def readn(fd, n): 138 data = '' 139 while len(data) < n: 140 try: 141 s = fd.recv(n - len(data)) 142 if len(s) == 0: 143 raise Exception('unexpected end of file') 144 data += s 145 except os.error, e: 146 if e.errno != os.errno.EINTR: 147 raise e 148 return data 149 150 try: 151 with self.inlock: 152 data = readn(self.fd, 4) 153 if data: 154 nmsg = fields.Int.decoders[4](data, 0) 155 data += readn(self.fd, nmsg - 4) 156 return self.process(data) 157 except Exception, e: 158 print e.__class__.__name__ 159 print repr(e) 160 traceback.print_exc(sys.stderr) 161 return None 162 163 def newrpc(self, dat, async=None): 164 rpc = Rpc(self, dat, async) 165 tag = None 166 167 with self.lock: 168 self.gettag(rpc) 169 170 if rpc.tag >= 0 and self.send(dat): 171 return rpc 172 173 with self.lock: 174 self.puttag(rpc) 175 176 class Rpc(Condition): 177 def __init__(self, mux, data, async=None): 178 super(Rpc, self).__init__(mux.lock) 179 self.mux = mux 180 self.orig = data 181 self.data = None 182 self.async = async 183 self.waiting = False 184 185 def __repr__(self): 186 return '<Rpc tag=%s orig=%s data=%s async=%s waiting=%s>' % tuple(map(repr, (self.tag, self.orig, self.data, self.async, self.waiting))) 187 188 def dispatch(self, data=None): 189 self.data = data 190 self.notify() 191 if callable(self.async): 192 self.mux.async_dispatch(self) 193 194 class Queue(Thread): 195 _id = 1 196 197 def __init__(self, op): 198 super(Queue, self).__init__(name='Queue-%d-%s' % (Queue._id, repr(op))) 199 Queue._id += 1 200 self.cond = Condition() 201 self.op = op 202 self.queue = [] 203 self.daemon = True 204 205 def __call__(self, item): 206 return self.push(item) 207 208 def push(self, item): 209 with self.cond: 210 self.queue.append(item) 211 if not self.is_alive(): 212 self.start() 213 self.cond.notify() 214 def pop(self, item): 215 with self.cond: 216 if item in self.queue: 217 self.queue.remove(item) 218 return True 219 return False 220 221 def run(self): 222 self.cond.acquire() 223 while True: 224 while self.queue: 225 item = self.queue.pop(0) 226 self.cond.release() 227 try: 228 self.op(item) 229 except Exception, e: 230 traceback.print_exc(sys.stderr) 231 self.cond.acquire() 232 self.cond.wait() 233 234 # vim:se sts=4 sw=4 et: