rpc.c (4960B)
1 /* From Plan 9's libmux. 2 * Copyright (c) 2003 Russ Cox, Massachusetts Institute of Technology 3 * Distributed under the same terms as libixp. 4 */ 5 #include <assert.h> 6 #include <stdlib.h> 7 #include <stdio.h> 8 #include <string.h> 9 #include "ixp_local.h" 10 11 static int gettag(IxpClient*, IxpRpc*); 12 static void puttag(IxpClient*, IxpRpc*); 13 static void enqueue(IxpClient*, IxpRpc*); 14 static void dequeue(IxpClient*, IxpRpc*); 15 16 void 17 muxinit(IxpClient *mux) 18 { 19 mux->tagrend.mutex = &mux->lk; 20 mux->sleep.next = &mux->sleep; 21 mux->sleep.prev = &mux->sleep; 22 thread->initmutex(&mux->lk); 23 thread->initmutex(&mux->rlock); 24 thread->initmutex(&mux->wlock); 25 thread->initrendez(&mux->tagrend); 26 } 27 28 void 29 muxfree(IxpClient *mux) 30 { 31 thread->mdestroy(&mux->lk); 32 thread->mdestroy(&mux->rlock); 33 thread->mdestroy(&mux->wlock); 34 thread->rdestroy(&mux->tagrend); 35 free(mux->wait); 36 } 37 38 static void 39 initrpc(IxpClient *mux, IxpRpc *r) 40 { 41 r->mux = mux; 42 r->waiting = 1; 43 r->r.mutex = &mux->lk; 44 r->p = nil; 45 thread->initrendez(&r->r); 46 } 47 48 static void 49 freemuxrpc(IxpRpc *r) 50 { 51 thread->rdestroy(&r->r); 52 } 53 54 static int 55 sendrpc(IxpRpc *r, IxpFcall *f) 56 { 57 int ret; 58 IxpClient *mux; 59 60 ret = 0; 61 mux = r->mux; 62 /* assign the tag, add selves to response queue */ 63 thread->lock(&mux->lk); 64 r->tag = gettag(mux, r); 65 f->hdr.tag = r->tag; 66 enqueue(mux, r); 67 thread->unlock(&mux->lk); 68 69 thread->lock(&mux->wlock); 70 if(!ixp_fcall2msg(&mux->wmsg, f) || !ixp_sendmsg(mux->fd, &mux->wmsg)) { 71 /* werrstr("settag/send tag %d: %r", tag); fprint(2, "%r\n"); */ 72 thread->lock(&mux->lk); 73 dequeue(mux, r); 74 puttag(mux, r); 75 thread->unlock(&mux->lk); 76 ret = -1; 77 } 78 thread->unlock(&mux->wlock); 79 return ret; 80 } 81 82 static IxpFcall* 83 muxrecv(IxpClient *mux) 84 { 85 IxpFcall *f; 86 87 f = nil; 88 thread->lock(&mux->rlock); 89 if(ixp_recvmsg(mux->fd, &mux->rmsg) == 0) 90 goto fail; 91 f = emallocz(sizeof *f); 92 if(ixp_msg2fcall(&mux->rmsg, f) == 0) { 93 free(f); 94 f = nil; 95 } 96 fail: 97 thread->unlock(&mux->rlock); 98 return f; 99 } 100 101 static void 102 dispatchandqlock(IxpClient *mux, IxpFcall *f) 103 { 104 int tag; 105 IxpRpc *r2; 106 107 tag = f->hdr.tag - mux->mintag; 108 thread->lock(&mux->lk); 109 /* hand packet to correct sleeper */ 110 if(tag < 0 || tag >= mux->mwait) { 111 fprintf(stderr, "libixp: recieved unfeasible tag: %d (min: %d, max: %d)\n", f->hdr.tag, mux->mintag, mux->mintag+mux->mwait); 112 goto fail; 113 } 114 r2 = mux->wait[tag]; 115 if(r2 == nil || r2->prev == nil) { 116 fprintf(stderr, "libixp: recieved message with bad tag\n"); 117 goto fail; 118 } 119 r2->p = f; 120 dequeue(mux, r2); 121 thread->wake(&r2->r); 122 return; 123 fail: 124 ixp_freefcall(f); 125 free(f); 126 } 127 128 static void 129 electmuxer(IxpClient *mux) 130 { 131 IxpRpc *rpc; 132 133 /* if there is anyone else sleeping, wake them to mux */ 134 for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){ 135 if(!rpc->async){ 136 mux->muxer = rpc; 137 thread->wake(&rpc->r); 138 return; 139 } 140 } 141 mux->muxer = nil; 142 } 143 144 IxpFcall* 145 muxrpc(IxpClient *mux, IxpFcall *tx) 146 { 147 IxpRpc r; 148 IxpFcall *p; 149 150 initrpc(mux, &r); 151 if(sendrpc(&r, tx) < 0) 152 return nil; 153 154 thread->lock(&mux->lk); 155 /* wait for our packet */ 156 while(mux->muxer && mux->muxer != &r && !r.p) 157 thread->sleep(&r.r); 158 159 /* if not done, there's no muxer; start muxing */ 160 if(!r.p){ 161 assert(mux->muxer == nil || mux->muxer == &r); 162 mux->muxer = &r; 163 while(!r.p){ 164 thread->unlock(&mux->lk); 165 p = muxrecv(mux); 166 if(p == nil){ 167 /* eof -- just give up and pass the buck */ 168 thread->lock(&mux->lk); 169 dequeue(mux, &r); 170 break; 171 } 172 dispatchandqlock(mux, p); 173 } 174 electmuxer(mux); 175 } 176 p = r.p; 177 puttag(mux, &r); 178 thread->unlock(&mux->lk); 179 if(p == nil) 180 werrstr("unexpected eof"); 181 return p; 182 } 183 184 static void 185 enqueue(IxpClient *mux, IxpRpc *r) 186 { 187 r->next = mux->sleep.next; 188 r->prev = &mux->sleep; 189 r->next->prev = r; 190 r->prev->next = r; 191 } 192 193 static void 194 dequeue(IxpClient *mux, IxpRpc *r) 195 { 196 r->next->prev = r->prev; 197 r->prev->next = r->next; 198 r->prev = nil; 199 r->next = nil; 200 } 201 202 static int 203 gettag(IxpClient *mux, IxpRpc *r) 204 { 205 int i, mw; 206 IxpRpc **w; 207 208 for(;;){ 209 /* wait for a free tag */ 210 while(mux->nwait == mux->mwait){ 211 if(mux->mwait < mux->maxtag-mux->mintag){ 212 mw = mux->mwait; 213 if(mw == 0) 214 mw = 1; 215 else 216 mw <<= 1; 217 w = realloc(mux->wait, mw * sizeof *w); 218 if(w == nil) 219 return -1; 220 memset(w+mux->mwait, 0, (mw-mux->mwait) * sizeof *w); 221 mux->wait = w; 222 mux->freetag = mux->mwait; 223 mux->mwait = mw; 224 break; 225 } 226 thread->sleep(&mux->tagrend); 227 } 228 229 i=mux->freetag; 230 if(mux->wait[i] == 0) 231 goto Found; 232 for(; i<mux->mwait; i++) 233 if(mux->wait[i] == 0) 234 goto Found; 235 for(i=0; i<mux->freetag; i++) 236 if(mux->wait[i] == 0) 237 goto Found; 238 /* should not fall out of while without free tag */ 239 abort(); 240 } 241 242 Found: 243 mux->nwait++; 244 mux->wait[i] = r; 245 r->tag = i+mux->mintag; 246 return r->tag; 247 } 248 249 static void 250 puttag(IxpClient *mux, IxpRpc *r) 251 { 252 int i; 253 254 i = r->tag - mux->mintag; 255 assert(mux->wait[i] == r); 256 mux->wait[i] = nil; 257 mux->nwait--; 258 mux->freetag = i; 259 thread->wake(&mux->tagrend); 260 freemuxrpc(r); 261 } 262