asyncclient.py (5763B)
1 from pyxp import client, fcall 2 from pyxp.client import * 3 from functools import wraps 4 5 def send(iter, val, default=None): 6 try: 7 return iter.send(val) 8 except StopIteration: 9 return default 10 11 def awithfile(fn): 12 @wraps(fn) 13 def wrapper(self, path, *args, **kwargs): 14 gen = fn(self, *args, **kwargs) 15 callback, fail, mode = next(gen) 16 def cont(file): 17 send(gen, file) 18 self.aopen(path, cont, fail=fail or callback, mode=mode) 19 return wrapper 20 21 def requestchain(fn): 22 @wraps(fn) 23 def wrapper(self, *args, **kwargs): 24 gen = fn(self, *args, **kwargs) 25 callback, fail = next(gen) 26 27 def cont(val): 28 data = gen.send(val) 29 if isinstance(data, fcall.Fcall): 30 self._dorpc(data, cont, fail or callback) 31 else: 32 Client.respond(callback, data) 33 cont(None) 34 return wrapper 35 36 class Client(client.Client): 37 ROOT_FID = 0 38 39 def _awalk(fn): 40 @wraps(fn) 41 @requestchain 42 def wrapper(self, *args, **kwargs): 43 gen = fn(self, *args, **kwargs) 44 path, callback, fail = next(gen) 45 46 path = self._splitpath(path) 47 fid = self._getfid() 48 ofid = ROOT_FID 49 50 def fail_(resp, exc, tb): 51 if ofid != ROOT_FID: 52 self._aclunk(fid) 53 self.respond(fail or callback, resp, exc, tb) 54 yield callback, fail_ 55 56 while path: 57 wname = path[:fcall.MAX_WELEM] 58 path = path[fcall.MAX_WELEM:] 59 60 resp = yield fcall.Twalk(fid=ofid, newfid=fid, wname=wname) 61 ofid = fid 62 63 resp = fid 64 while resp is not None: 65 resp = yield send(gen, resp) 66 67 return wrapper 68 69 _file = property(lambda self: File) 70 71 @_awalk 72 def _aopen(self, path, mode, fcall, callback, fail=None, origpath=None): 73 path = self._splitpath(path) 74 75 fcall.fid = yield path, callback, fail 76 resp = yield fcall 77 yield self._file(self, origpath or '/'.join(path), resp, fcall.fid, mode, 78 cleanup=lambda: self._aclunk(fcall.fid)) 79 80 def aopen(self, path, callback=True, fail=None, mode=OREAD): 81 assert callable(callback) 82 self._aopen(path, mode, fcall.Topen(mode=mode), callback, fail) 83 84 def acreate(self, path, callback=True, fail=None, mode=OREAD, perm=0): 85 path = self._splitpath(path) 86 name = path.pop() 87 88 self._aopen(path, mode, 89 fcall.Tcreate(mode=mode, name=name, perm=perm), 90 callback if callable(callback) else lambda resp: resp and resp.close(), 91 fail, origpath='/'.join(path + [name])) 92 93 @_awalk 94 def aremove(self, path, callback=True, fail=None): 95 yield fcall.Tremove(fid=(yield path, callback, fail)) 96 97 @_awalk 98 def astat(self, path, callback, fail=None): 99 resp = yield fcall.Tstat(fid=(yield path, callback, fail)) 100 yield resp.stat 101 102 @awithfile 103 def aread(self, callback, fail=None, count=None, offset=None, buf=''): 104 file = yield callback, fail, OREAD 105 file.aread(callback, fail, count, offset, buf) 106 107 @awithfile 108 def awrite(self, data, callback=True, fail=None, offset=None): 109 file = yield callback, fail, OWRITE 110 file.awrite(data, callback, fail, offset) 111 112 @awithfile 113 def areadlines(self, callback): 114 file = yield callback, fail, OREAD 115 file.areadlines(callback) 116 117 class File(client.File): 118 119 @requestchain 120 def stat(self, callback, fail=None): 121 yield callback, fail 122 resp = yield fcall.Tstat() 123 yield resp.stat 124 125 @requestchain 126 def aread(self, callback, fail=None, count=None, offset=None, buf=''): 127 yield callback, fail 128 129 setoffset = offset is None 130 if count is None: 131 count = self.iounit 132 if offset is None: 133 offset = self.offset 134 135 res = [] 136 while count > 0: 137 n = min(count, self.iounit) 138 count -= n 139 resp = yield fcall.Tread(offset=offset, count=n) 140 res.append(resp.data) 141 offset += len(resp.data) 142 if len(resp.data) == 0: 143 break 144 145 if setoffset: 146 self.offset = offset 147 yield ''.join(res) 148 149 def areadlines(self, callback): 150 class ctxt: 151 last = None 152 def cont(data, exc, tb): 153 res = True 154 if data: 155 lines = data.split('\n') 156 if ctxt.last: 157 lines[0] = ctxt.last + lines[0] 158 for i in range(0, len(lines) - 1): 159 res = callback(lines[i]) 160 if res is False: 161 return 162 ctxt.last = lines[-1] 163 self.aread(cont) 164 else: 165 if ctxt.last: 166 callback(ctxt.last) 167 callback(None) 168 self.aread(cont) 169 170 @requestchain 171 def awrite(self, data, callback=True, fail=None, offset=None): 172 yield callback, fail 173 setoffset = offset is None 174 if offset is None: 175 offset = self.offset 176 177 off = 0 178 while off < len(data): 179 n = min(len(data), self.iounit) 180 resp = yield fcall.Twrite(offset=offset, data=data[off:off+n]) 181 off += resp.count 182 offset += resp.count 183 184 if setoffset: 185 self.offset = offset 186 yield off 187 188 @requestchain 189 def aremove(self, callback=True, fail=None): 190 yield callback, fail 191 yield fcall.Tremove() 192 self.close() 193 yield True 194 195 # vim:se sts=4 sw=4 et: