client.py (9838B)
1 # Copyright (C) 2009 Kris Maglione 2 3 import operator 4 import os 5 import re 6 import sys 7 from threading import * 8 import traceback 9 10 import pyxp 11 from pyxp import fcall, fields 12 from pyxp.mux import Mux 13 from pyxp.types import * 14 15 if os.environ.get('NAMESPACE', None): 16 namespace = os.environ['NAMESPACE'] 17 else: 18 try: 19 namespace = '/tmp/ns.%s.%s' % ( 20 os.environ['USER'], 21 re.sub(r'\.0$', '', os.environ['DISPLAY'])) 22 except Exception: 23 pass 24 NAMESPACE = namespace 25 26 OREAD = 0x00 27 OWRITE = 0x01 28 ORDWR = 0x02 29 OEXEC = 0x03 30 OEXCL = 0x04 31 OTRUNC = 0x10 32 OREXEC = 0x20 33 ORCLOSE = 0x40 34 OAPPEND = 0x80 35 36 ROOT_FID = 0 37 38 class ProtocolException(Exception): 39 pass 40 class RPCError(Exception): 41 pass 42 43 class Client(object): 44 ROOT_FID = 0 45 46 @staticmethod 47 def respond(callback, data, exc=None, tb=None): 48 if hasattr(callback, 'func_code'): 49 callback(*(data, exc, tb)[0:callback.func_code.co_argcount]) 50 elif callable(callback): 51 callback(data) 52 53 def __enter__(self): 54 return self 55 def __exit__(self, *args): 56 self._cleanup() 57 58 def __init__(self, conn=None, namespace=None, root=None): 59 if not conn and namespace: 60 conn = 'unix!%s/%s' % (NAMESPACE, namespace) 61 try: 62 self.lastfid = ROOT_FID 63 self.fids = set() 64 self.lock = RLock() 65 66 def process(data): 67 return fcall.Fcall.unmarshall(data)[1] 68 self.mux = Mux(conn, process, maxtag=256) 69 70 resp = self._dorpc(fcall.Tversion(version=pyxp.VERSION, msize=65535)) 71 if resp.version != pyxp.VERSION: 72 raise ProtocolException, "Can't speak 9P version '%s'" % resp.version 73 self.msize = resp.msize 74 75 self._dorpc(fcall.Tattach(fid=ROOT_FID, afid=fcall.NO_FID, 76 uname=os.environ['USER'], aname='')) 77 78 if root: 79 path = self._splitpath(root) 80 resp = self._dorpc(fcall.Twalk(fid=ROOT_FID, 81 newfid=ROOT_FID, 82 wname=path)) 83 except Exception: 84 traceback.print_exc(sys.stdout) 85 if getattr(self, 'mux', None): 86 self.mux.fd.close() 87 raise 88 89 def _cleanup(self): 90 try: 91 for f in self.files: 92 f.close() 93 finally: 94 self.mux.fd.close() 95 self.mux = None 96 97 def _dorpc(self, req, callback=None, error=None): 98 def doresp(resp): 99 if isinstance(resp, fcall.Rerror): 100 raise RPCError, "%s[%d] RPC returned error: %s" % ( 101 req.__class__.__name__, resp.tag, resp.ename) 102 if req.type != resp.type ^ 1: 103 raise ProtocolException, "Missmatched RPC message types: %s => %s" % ( 104 req.__class__.__name__, resp.__class__.__name__) 105 return resp 106 107 def next(mux, resp): 108 try: 109 res = doresp(resp) 110 except Exception, e: 111 self.respond(error or callback, None, e, None) 112 else: 113 self.respond(callback, res) 114 115 if not callback: 116 return doresp(self.mux.rpc(req)) 117 self.mux.rpc(req, next) 118 119 def _splitpath(self, path): 120 if isinstance(path, list): 121 return path 122 return [v for v in path.split('/') if v != ''] 123 124 def _getfid(self): 125 with self.lock: 126 if self.fids: 127 return self.fids.pop() 128 self.lastfid += 1 129 return self.lastfid 130 def _putfid(self, fid): 131 with self.lock: 132 self.fids.add(fid) 133 134 def _aclunk(self, fid, callback=None): 135 def next(resp, exc, tb): 136 if resp: 137 self._putfid(fid) 138 self.respond(callback, resp, exc, tb) 139 self._dorpc(fcall.Tclunk(fid=fid), next) 140 141 def _clunk(self, fid): 142 try: 143 self._dorpc(fcall.Tclunk(fid=fid)) 144 finally: 145 self._putfid(fid) 146 147 def _walk(self, path): 148 fid = self._getfid() 149 ofid = ROOT_FID 150 while True: 151 self._dorpc(fcall.Twalk(fid=ofid, newfid=fid, 152 wname=path[0:fcall.MAX_WELEM])) 153 path = path[fcall.MAX_WELEM:] 154 ofid = fid 155 if len(path) == 0: 156 break 157 158 @apply 159 class Res: 160 def __enter__(res): 161 return fid 162 def __exit__(res, exc_type, exc_value, traceback): 163 if exc_type: 164 self._clunk(fid) 165 return Res 166 167 _file = property(lambda self: File) 168 def _open(self, path, mode, fcall, origpath=None): 169 resp = None 170 171 with self._walk(path) as nfid: 172 fid = nfid 173 fcall.fid = fid 174 resp = self._dorpc(fcall) 175 176 def cleanup(): 177 self._aclunk(fid) 178 file = self._file(self, origpath or '/'.join(path), resp, fid, mode, cleanup) 179 return file 180 181 def open(self, path, mode=OREAD): 182 path = self._splitpath(path) 183 184 return self._open(path, mode, fcall.Topen(mode=mode)) 185 186 def create(self, path, mode=OREAD, perm=0): 187 path = self._splitpath(path) 188 name = path.pop() 189 190 return self._open(path, mode, fcall.Tcreate(mode=mode, name=name, perm=perm), 191 origpath='/'.join(path + [name])) 192 193 def remove(self, path): 194 path = self._splitpath(path) 195 196 with self._walk(path) as fid: 197 self._dorpc(fcall.Tremove(fid=fid)) 198 199 def stat(self, path): 200 path = self._splitpath(path) 201 202 try: 203 with self._walk(path) as fid: 204 resp = self._dorpc(fcall.Tstat(fid= fid)) 205 st = resp.stat 206 self._clunk(fid) 207 return st 208 except RPCError: 209 return None 210 211 def read(self, path, *args, **kwargs): 212 with self.open(path) as f: 213 return f.read(*args, **kwargs) 214 def readlines(self, path, *args, **kwargs): 215 with self.open(path) as f: 216 for l in f.readlines(*args, **kwargs): 217 yield l 218 def readdir(self, path, *args, **kwargs): 219 with self.open(path) as f: 220 for s in f.readdir(*args, **kwargs): 221 yield s 222 def write(self, path, *args, **kwargs): 223 with self.open(path, OWRITE) as f: 224 return f.write(*args, **kwargs) 225 226 class File(object): 227 228 def __enter__(self): 229 return self 230 def __exit__(self, *args): 231 self.close() 232 233 def __init__(self, client, path, fcall, fid, mode, cleanup): 234 self.lock = RLock() 235 self.client = client 236 self.path = path 237 self.fid = fid 238 self._cleanup = cleanup 239 self.mode = mode 240 self.iounit = fcall.iounit 241 self.qid = fcall.qid 242 self.closed = False 243 244 self.offset = 0 245 def __del__(self): 246 if not self.closed: 247 self._cleanup() 248 249 def _dorpc(self, fcall, async=None, error=None): 250 if hasattr(fcall, 'fid'): 251 fcall.fid = self.fid 252 return self.client._dorpc(fcall, async, error) 253 254 def stat(self): 255 resp = self._dorpc(fcall.Tstat()) 256 return resp.stat 257 258 def read(self, count=None, offset=None, buf=''): 259 if count is None: 260 count = self.iounit 261 res = [] 262 with self.lock: 263 offs = self.offset 264 if offset is not None: 265 offs = offset 266 while count > 0: 267 n = min(count, self.iounit) 268 count -= n 269 270 resp = self._dorpc(fcall.Tread(offset=offs, count=n)) 271 data = resp.data 272 273 offs += len(data) 274 res.append(data) 275 276 if len(data) < n: 277 break 278 if offset is None: 279 self.offset = offs 280 return ''.join(res) 281 282 def readlines(self): 283 last = None 284 while True: 285 data = self.read() 286 if not data: 287 break 288 lines = data.split('\n') 289 if last: 290 lines[0] = last + lines[0] 291 last = None 292 for i in range(0, len(lines) - 1): 293 yield lines[i] 294 last = lines[-1] 295 if last: 296 yield last 297 298 def write(self, data, offset=None): 299 if offset is None: 300 offset = self.offset 301 off = 0 302 with self.lock: 303 offs = self.offset 304 if offset is not None: 305 offs = offset 306 while off < len(data): 307 n = min(len(data), self.iounit) 308 309 resp = self._dorpc(fcall.Twrite(offset=offs, 310 data=data[off:off+n])) 311 off += resp.count 312 offs += resp.count 313 if resp.count < n: 314 break 315 if offset is None: 316 self.offset = offs 317 return off 318 def readdir(self): 319 if not self.qid.type & Qid.QTDIR: 320 raise Exception, "Can only call readdir on a directory" 321 off = 0 322 while True: 323 data = self.read(self.iounit, off) 324 if not data: 325 break 326 off += len(data) 327 for s in Stat.unmarshall_list(data): 328 yield s 329 330 def close(self): 331 assert not self.closed 332 self.closed = True 333 try: 334 self._cleanup() 335 except: 336 pass 337 self.tg = None 338 self.fid = None 339 self.client = None 340 self.qid = None 341 342 def remove(self): 343 try: 344 self._dorpc(fcall.Tremove()) 345 finally: 346 self.close() 347 348 # vim:se sts=4 sw=4 et: