libixp

git clone git://oldgit.suckless.org/libixp/
Log | Files | Refs | LICENSE

rpc.c (4960B)


      1 /* From Plan 9's libmux.
      2  * Copyright (c) 2003 Russ Cox, Massachusetts Institute of Technology
      3  * Distributed under the same terms as libixp.
      4  */
      5 #include <assert.h>
      6 #include <stdlib.h>
      7 #include <stdio.h>
      8 #include <string.h>
      9 #include "ixp_local.h"
     10 
     11 static int gettag(IxpClient*, IxpRpc*);
     12 static void puttag(IxpClient*, IxpRpc*);
     13 static void enqueue(IxpClient*, IxpRpc*);
     14 static void dequeue(IxpClient*, IxpRpc*);
     15 
     16 void
     17 muxinit(IxpClient *mux)
     18 {
     19 	mux->tagrend.mutex = &mux->lk;
     20 	mux->sleep.next = &mux->sleep;
     21 	mux->sleep.prev = &mux->sleep;
     22 	thread->initmutex(&mux->lk);
     23 	thread->initmutex(&mux->rlock);
     24 	thread->initmutex(&mux->wlock);
     25 	thread->initrendez(&mux->tagrend);
     26 }
     27 
     28 void
     29 muxfree(IxpClient *mux)
     30 {
     31 	thread->mdestroy(&mux->lk);
     32 	thread->mdestroy(&mux->rlock);
     33 	thread->mdestroy(&mux->wlock);
     34 	thread->rdestroy(&mux->tagrend);
     35 	free(mux->wait);
     36 }
     37 
     38 static void
     39 initrpc(IxpClient *mux, IxpRpc *r)
     40 {
     41 	r->mux = mux;
     42 	r->waiting = 1;
     43 	r->r.mutex = &mux->lk;
     44 	r->p = nil;
     45 	thread->initrendez(&r->r);
     46 }
     47 
     48 static void
     49 freemuxrpc(IxpRpc *r)
     50 {
     51 	thread->rdestroy(&r->r);
     52 }
     53 
     54 static int
     55 sendrpc(IxpRpc *r, IxpFcall *f)
     56 {
     57 	int ret;
     58 	IxpClient *mux;
     59 	
     60 	ret = 0;
     61 	mux = r->mux;
     62 	/* assign the tag, add selves to response queue */
     63 	thread->lock(&mux->lk);
     64 	r->tag = gettag(mux, r);
     65 	f->hdr.tag = r->tag;
     66 	enqueue(mux, r);
     67 	thread->unlock(&mux->lk);
     68 
     69 	thread->lock(&mux->wlock);
     70 	if(!ixp_fcall2msg(&mux->wmsg, f) || !ixp_sendmsg(mux->fd, &mux->wmsg)) {
     71 		/* werrstr("settag/send tag %d: %r", tag); fprint(2, "%r\n"); */
     72 		thread->lock(&mux->lk);
     73 		dequeue(mux, r);
     74 		puttag(mux, r);
     75 		thread->unlock(&mux->lk);
     76 		ret = -1;
     77 	}
     78 	thread->unlock(&mux->wlock);
     79 	return ret;
     80 }
     81 
     82 static IxpFcall*
     83 muxrecv(IxpClient *mux)
     84 {
     85 	IxpFcall *f;
     86 
     87 	f = nil;
     88 	thread->lock(&mux->rlock);
     89 	if(ixp_recvmsg(mux->fd, &mux->rmsg) == 0)
     90 		goto fail;
     91 	f = emallocz(sizeof *f);
     92 	if(ixp_msg2fcall(&mux->rmsg, f) == 0) {
     93 		free(f);
     94 		f = nil;
     95 	}
     96 fail:
     97 	thread->unlock(&mux->rlock);
     98 	return f;
     99 }
    100 
    101 static void
    102 dispatchandqlock(IxpClient *mux, IxpFcall *f)
    103 {
    104 	int tag;
    105 	IxpRpc *r2;
    106 
    107 	tag = f->hdr.tag - mux->mintag;
    108 	thread->lock(&mux->lk);
    109 	/* hand packet to correct sleeper */
    110 	if(tag < 0 || tag >= mux->mwait) {
    111 		fprintf(stderr, "libixp: recieved unfeasible tag: %d (min: %d, max: %d)\n", f->hdr.tag, mux->mintag, mux->mintag+mux->mwait);
    112 		goto fail;
    113 	}
    114 	r2 = mux->wait[tag];
    115 	if(r2 == nil || r2->prev == nil) {
    116 		fprintf(stderr, "libixp: recieved message with bad tag\n");
    117 		goto fail;
    118 	}
    119 	r2->p = f;
    120 	dequeue(mux, r2);
    121 	thread->wake(&r2->r);
    122 	return;
    123 fail:
    124 	ixp_freefcall(f);
    125 	free(f);
    126 }
    127 
    128 static void
    129 electmuxer(IxpClient *mux)
    130 {
    131 	IxpRpc *rpc;
    132 
    133 	/* if there is anyone else sleeping, wake them to mux */
    134 	for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){
    135 		if(!rpc->async){
    136 			mux->muxer = rpc;
    137 			thread->wake(&rpc->r);
    138 			return;
    139 		}
    140 	}
    141 	mux->muxer = nil;
    142 }
    143 
    144 IxpFcall*
    145 muxrpc(IxpClient *mux, IxpFcall *tx)
    146 {
    147 	IxpRpc r;
    148 	IxpFcall *p;
    149 
    150 	initrpc(mux, &r);
    151 	if(sendrpc(&r, tx) < 0)
    152 		return nil;
    153 
    154 	thread->lock(&mux->lk);
    155 	/* wait for our packet */
    156 	while(mux->muxer && mux->muxer != &r && !r.p)
    157 		thread->sleep(&r.r);
    158 
    159 	/* if not done, there's no muxer; start muxing */
    160 	if(!r.p){
    161 		assert(mux->muxer == nil || mux->muxer == &r);
    162 		mux->muxer = &r;
    163 		while(!r.p){
    164 			thread->unlock(&mux->lk);
    165 			p = muxrecv(mux);
    166 			if(p == nil){
    167 				/* eof -- just give up and pass the buck */
    168 				thread->lock(&mux->lk);
    169 				dequeue(mux, &r);
    170 				break;
    171 			}
    172 			dispatchandqlock(mux, p);
    173 		}
    174 		electmuxer(mux);
    175 	}
    176 	p = r.p;
    177 	puttag(mux, &r);
    178 	thread->unlock(&mux->lk);
    179 	if(p == nil)
    180 		werrstr("unexpected eof");
    181 	return p;
    182 }
    183 
    184 static void
    185 enqueue(IxpClient *mux, IxpRpc *r)
    186 {
    187 	r->next = mux->sleep.next;
    188 	r->prev = &mux->sleep;
    189 	r->next->prev = r;
    190 	r->prev->next = r;
    191 }
    192 
    193 static void
    194 dequeue(IxpClient *mux, IxpRpc *r)
    195 {
    196 	r->next->prev = r->prev;
    197 	r->prev->next = r->next;
    198 	r->prev = nil;
    199 	r->next = nil;
    200 }
    201 
    202 static int 
    203 gettag(IxpClient *mux, IxpRpc *r)
    204 {
    205 	int i, mw;
    206 	IxpRpc **w;
    207 
    208 	for(;;){
    209 		/* wait for a free tag */
    210 		while(mux->nwait == mux->mwait){
    211 			if(mux->mwait < mux->maxtag-mux->mintag){
    212 				mw = mux->mwait;
    213 				if(mw == 0)
    214 					mw = 1;
    215 				else
    216 					mw <<= 1;
    217 				w = realloc(mux->wait, mw * sizeof *w);
    218 				if(w == nil)
    219 					return -1;
    220 				memset(w+mux->mwait, 0, (mw-mux->mwait) * sizeof *w);
    221 				mux->wait = w;
    222 				mux->freetag = mux->mwait;
    223 				mux->mwait = mw;
    224 				break;
    225 			}
    226 			thread->sleep(&mux->tagrend);
    227 		}
    228 
    229 		i=mux->freetag;
    230 		if(mux->wait[i] == 0)
    231 			goto Found;
    232 		for(; i<mux->mwait; i++)
    233 			if(mux->wait[i] == 0)
    234 				goto Found;
    235 		for(i=0; i<mux->freetag; i++)
    236 			if(mux->wait[i] == 0)
    237 				goto Found;
    238 		/* should not fall out of while without free tag */
    239 		abort();
    240 	}
    241 
    242 Found:
    243 	mux->nwait++;
    244 	mux->wait[i] = r;
    245 	r->tag = i+mux->mintag;
    246 	return r->tag;
    247 }
    248 
    249 static void
    250 puttag(IxpClient *mux, IxpRpc *r)
    251 {
    252 	int i;
    253 
    254 	i = r->tag - mux->mintag;
    255 	assert(mux->wait[i] == r);
    256 	mux->wait[i] = nil;
    257 	mux->nwait--;
    258 	mux->freetag = i;
    259 	thread->wake(&mux->tagrend);
    260 	freemuxrpc(r);
    261 }
    262