wmii

git clone git://oldgit.suckless.org/wmii/
Log | Files | Refs | README | LICENSE

mux.py (6857B)


      1 # Derived from libmux, available in Plan 9 under /sys/src/libmux
      2 # under the following terms:
      3 #
      4 # Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology
      5 # 
      6 # Permission is hereby granted, free of charge, to any person obtaining
      7 # a copy of this software and associated documentation files (the
      8 # "Software"), to deal in the Software without restriction, including
      9 # without limitation the rights to use, copy, modify, merge, publish,
     10 # distribute, sublicense, and/or sell copies of the Software, and to
     11 # permit persons to whom the Software is furnished to do so, subject to
     12 # the following conditions:
     13 # 
     14 # The above copyright notice and this permission notice shall be
     15 # included in all copies or substantial portions of the Software.
     16 
     17 import os
     18 import sys
     19 import traceback
     20 
     21 from pyxp import fields
     22 from pyxp.dial import dial
     23 from threading import *
     24 Condition = Condition().__class__
     25 
     26 __all__ = 'Mux',
     27 
     28 class Mux(object):
     29     def __init__(self, con, process, flush=None, mintag=0, maxtag=1<<16 - 1):
     30         self.lock = RLock()
     31         self.tagcond = Condition(self.lock)
     32         self.outlock = RLock()
     33         self.inlock = RLock()
     34         self.process = process
     35         self.flush = flush
     36         self.wait = {}
     37         self.free = set(range(mintag, maxtag))
     38         self.mintag = mintag
     39         self.maxtag = maxtag
     40         self.muxer = None
     41 
     42         self.async_mux = Queue(self.mux)
     43         self.async_dispatch = Queue(self.async_dispatch)
     44 
     45         if isinstance(con, basestring):
     46             con = dial(con)
     47         self.fd = con
     48 
     49         if self.fd is None:
     50             raise Exception("No connection")
     51 
     52     def mux(self, rpc):
     53         with self.lock:
     54             try:
     55                 rpc.waiting = True
     56                 while self.muxer and self.muxer != rpc and rpc.data is None:
     57                     rpc.wait()
     58 
     59                 if rpc.data is None:
     60                     assert self.muxer in (rpc, None)
     61                     self.muxer = rpc
     62                     try:
     63                         self.lock.release()
     64                         while rpc.data is None:
     65                             data = self.recv()
     66                             if data is None:
     67                                 raise Exception("unexpected eof")
     68                             self.dispatch(data)
     69                     finally:
     70                         self.lock.acquire()
     71                         self.electmuxer()
     72             except Exception:
     73                 traceback.print_exc(sys.stderr)
     74                 if rpc.tag in self.wait:
     75                     self.wait.pop(rpc.tag)
     76                 if self.flush:
     77                     self.flush(self, rpc.data)
     78                 raise
     79 
     80         return rpc.data
     81 
     82     def rpc(self, dat, async=None):
     83         rpc = self.newrpc(dat, async)
     84         if async:
     85             self.async_mux.push(rpc)
     86         else:
     87             return self.mux(rpc)
     88 
     89     def async_dispatch(self, rpc):
     90         rpc.async(self, rpc.data)
     91 
     92     def electmuxer(self):
     93         for rpc in self.wait.itervalues():
     94             if self.muxer != rpc and rpc.waiting:
     95                 self.muxer = rpc
     96                 rpc.notify()
     97                 return
     98         self.muxer = None
     99 
    100     def dispatch(self, dat):
    101         with self.lock:
    102             rpc = self.wait.get(dat.tag, None)
    103             if rpc:
    104                 self.puttag(rpc)
    105                 rpc.dispatch(dat)
    106             elif False:
    107                 print "bad rpc tag: %u (no one waiting on it)" % dat.tag
    108 
    109     def gettag(self, r):
    110         tag = 0
    111 
    112         while not self.free:
    113             self.tagcond.wait()
    114 
    115         tag = self.free.pop()
    116 
    117         if tag in self.wait:
    118             raise Exception("nwait botch")
    119 
    120         self.wait[tag] = r
    121 
    122         r.tag = tag
    123         r.orig.tag = r.tag
    124         return r.tag
    125 
    126     def puttag(self, rpc):
    127         if rpc.tag in self.wait:
    128             del self.wait[rpc.tag]
    129         self.free.add(rpc.tag)
    130         self.tagcond.notify()
    131 
    132     def send(self, dat):
    133         data = ''.join(dat.marshall())
    134         n = self.fd.send(data)
    135         return n == len(data)
    136     def recv(self):
    137         def readn(fd, n):
    138             data = ''
    139             while len(data) < n:
    140                 try:
    141                     s = fd.recv(n - len(data))
    142                     if len(s) == 0:
    143                         raise Exception('unexpected end of file')
    144                     data += s
    145                 except os.error, e:
    146                     if e.errno != os.errno.EINTR:
    147                         raise e
    148             return data
    149 
    150         try:
    151             with self.inlock:
    152                 data = readn(self.fd, 4)
    153                 if data:
    154                     nmsg = fields.Int.decoders[4](data, 0)
    155                     data += readn(self.fd, nmsg - 4)
    156                     return self.process(data)
    157         except Exception, e:
    158             print e.__class__.__name__
    159             print repr(e)
    160             traceback.print_exc(sys.stderr)
    161             return None
    162 
    163     def newrpc(self, dat, async=None):
    164         rpc = Rpc(self, dat, async)
    165         tag = None
    166 
    167         with self.lock:
    168             self.gettag(rpc)
    169 
    170         if rpc.tag >= 0 and self.send(dat):
    171             return rpc
    172 
    173         with self.lock:
    174             self.puttag(rpc)
    175 
    176 class Rpc(Condition):
    177     def __init__(self, mux, data, async=None):
    178         super(Rpc, self).__init__(mux.lock)
    179         self.mux = mux
    180         self.orig = data
    181         self.data = None
    182         self.async = async
    183         self.waiting = False
    184 
    185     def __repr__(self):
    186         return '<Rpc tag=%s orig=%s data=%s async=%s waiting=%s>' % tuple(map(repr, (self.tag, self.orig, self.data, self.async, self.waiting)))
    187 
    188     def dispatch(self, data=None):
    189         self.data = data
    190         self.notify()
    191         if callable(self.async):
    192             self.mux.async_dispatch(self)
    193 
    194 class Queue(Thread):
    195     _id = 1
    196 
    197     def __init__(self, op):
    198         super(Queue, self).__init__(name='Queue-%d-%s' % (Queue._id, repr(op)))
    199         Queue._id += 1
    200         self.cond = Condition()
    201         self.op = op
    202         self.queue = []
    203         self.daemon = True
    204 
    205     def __call__(self, item):
    206         return self.push(item)
    207 
    208     def push(self, item):
    209         with self.cond:
    210             self.queue.append(item)
    211             if not self.is_alive():
    212                 self.start()
    213             self.cond.notify()
    214     def pop(self, item):
    215         with self.cond:
    216             if item in self.queue:
    217                 self.queue.remove(item)
    218                 return True
    219             return False
    220 
    221     def run(self):
    222         self.cond.acquire()
    223         while True:
    224             while self.queue:
    225                 item = self.queue.pop(0)
    226                 self.cond.release()
    227                 try:
    228                     self.op(item)
    229                 except Exception, e:
    230                     traceback.print_exc(sys.stderr)
    231                 self.cond.acquire()
    232             self.cond.wait()
    233 
    234 # vim:se sts=4 sw=4 et: