client.c (3134B)
1 #include <u.h> 2 #include <libc.h> 3 #include <bio.h> 4 #include <thread.h> 5 #include <ixp.h> 6 7 extern char *(*_syserrstr)(void); 8 char *path; 9 10 enum { 11 DATA = 1, 12 THREAD = 2, 13 TRACE = 4, 14 READ = 8, 15 }; 16 int chatty = READ; 17 18 typedef struct arg arg; 19 typedef struct arg2 arg2; 20 struct arg { 21 Rendez r; 22 IxpCFid *f; 23 Channel *ch; 24 int j, k; 25 }; 26 27 struct arg2 { 28 Rendez r; 29 IxpClient *c; 30 Channel *ch; 31 int j; 32 }; 33 34 Channel *chan; 35 int nproc; 36 37 void 38 spawn(void(*f)(void*), void *v, int s) { 39 nproc++; 40 proccreate(f, v, s); 41 } 42 43 void 44 _print(Biobuf *b, char *p, char *end, int j, int k) { 45 for(; p < end; p++) { 46 Bputc(b, *p); 47 if(*p == '\n') { 48 Bflush(b); 49 Bprint(b, ":: %d %d: ", j, k); 50 } 51 } 52 } 53 54 void 55 readfile(IxpCFid *f, int j, int k) { 56 Biobuf *b; 57 char *buf; 58 int n; 59 60 if(chatty&TRACE) 61 fprint(2, "readfile(%p, %d, %d) iounit: %d\n", f, j, k, f->iounit); 62 63 b = Bfdopen(dup(1, -1), OWRITE); 64 if(chatty&DATA) 65 Bprint(b, ":: %d %d: ", j, k); 66 67 buf = ixp_emalloc(f->iounit); 68 while((n = ixp_read(f, buf, f->iounit)) > 0) { 69 if(chatty&READ) 70 fprint(2, "+readfile(%p, %d, %d) n=%d\n", f, j, k, n); 71 if(chatty&DATA) 72 _print(b, buf, buf+n, j, k); 73 sleep(0); 74 } 75 76 if(chatty&TRACE) 77 fprint(2, "-readfile(%p, %d, %d) iounit: %d\n", f, j, k, f->iounit); 78 if(chatty&DATA) 79 Bputc(b, '\n'); 80 Bterm(b); 81 } 82 83 static void 84 _read(void *p) { 85 arg *a; 86 int k; 87 88 a = p; 89 k = a->k; 90 if(chatty&THREAD) 91 print("Start _read: %d\n", a->j, k); 92 93 qlock(a->r.l); 94 sendul(a->ch, 0); 95 rsleep(&a->r); 96 if(chatty&THREAD) 97 print("Wake _read: %d\n", a->j, k); 98 qunlock(a->r.l); 99 100 readfile(a->f, a->j, k); 101 sendul(chan, 0); 102 } 103 104 static void 105 _open(void *p) { 106 arg2 *a2; 107 arg *a; 108 109 a = malloc(sizeof *a); 110 111 a2 = p; 112 113 a->j = a2->j; 114 memset(&a->r, 0, sizeof(a->r)); 115 a->r.l = mallocz(sizeof(QLock), 1); 116 a->ch = chancreate(sizeof(ulong), 0); 117 118 if(chatty&THREAD) 119 print("Start _open: %d\n", a2->j); 120 121 qlock(a2->r.l); 122 sendul(a2->ch, 0); 123 rsleep(&a2->r); 124 if(chatty&THREAD) 125 print("Wake _open: %d\n", a2->j); 126 qunlock(a2->r.l); 127 128 a->f = ixp_open(a2->c, path, OREAD); 129 if(a->f == nil) 130 sysfatal("can't open %q: %r\n", path); 131 sleep(0); 132 133 for(a->k = 0; a->k < 5; a->k++) { 134 spawn(_read, a, mainstacksize); 135 recvul(a->ch); 136 } 137 138 qlock(a->r.l); 139 rwakeupall(&a->r); 140 qunlock(a->r.l); 141 142 sendul(chan, 0); 143 } 144 145 const char *_malloc_options = "A"; 146 147 void 148 threadmain(int argc, char *argv[]) { 149 arg2 *a; 150 char *address; 151 152 USED(argc); 153 USED(argv); 154 address = "tcp!localhost!6663"; 155 path = "/n/local/var/log/messages"; 156 157 a = malloc(sizeof *a); 158 chan = chancreate(sizeof(ulong), 0); 159 160 quotefmtinstall(); 161 162 _syserrstr = ixp_errbuf; 163 if(ixp_pthread_init()) 164 sysfatal("can't init pthread: %r\n"); 165 166 a->c = ixp_mount(address); 167 if(a->c == nil) 168 sysfatal("can't mount: %r\n"); 169 170 memset(&a->r, 0, sizeof(a->r)); 171 a->r.l = mallocz(sizeof(QLock), 1); 172 a->ch = chancreate(sizeof(ulong), 0); 173 for(a->j = 0; a->j < 5; a->j++) { 174 spawn(_open, a, mainstacksize); 175 recvul(a->ch); 176 } 177 178 qlock(a->r.l); 179 if(chatty&THREAD) 180 fprint(2, "qlock()\n"); 181 rwakeupall(&a->r); 182 if(chatty&THREAD) 183 fprint(2, "wokeup\n"); 184 qunlock(a->r.l); 185 if(chatty&THREAD) 186 fprint(2, "unlocked\n"); 187 188 while(nproc--) 189 recvul(chan); 190 } 191