thread_ruby.c (12115B)
1 /* Copyright ©2007-2010 Kris Maglione <maglione.k at Gmail> 2 * See LICENSE file for license details. 3 */ 4 #include <errno.h> 5 #include <stdlib.h> 6 #include <unistd.h> 7 #include <ruby.h> 8 #include "ixp_local.h" 9 10 static IxpThread ixp_rthread; 11 static char RWLock[]; 12 13 /** 14 * Function: ixp_rubyinit 15 * 16 * This function initializes libixp for use in multithreaded 17 * programs embedded in the ruby interpreter. When using a pthread 18 * edition of ruby, ixp_pthread_init should be used instead. When 19 * using libixp in such programs, this function must be called 20 * before any other libixp functions. 21 * 22 * This function is part of libixp_rubythread, which is part of the 23 * libixp distribution, but is not built by default unless enabled 24 * in config.mk. 25 */ 26 int 27 ixp_rubyinit(void) { 28 rb_require("thread.rb"); 29 rb_eval_string(RWLock); 30 ixp_thread = &ixp_rthread; 31 return 0; 32 } 33 34 static char* 35 errbuf(void) { 36 static ID key; 37 volatile VALUE val; 38 39 if(key == 0L) 40 key = rb_intern("_ixp_errbuf"); 41 42 val = rb_thread_local_aref(rb_thread_current(), key); 43 if(NIL_P(val)) { 44 val = rb_str_new(nil, IXP_ERRMAX); 45 rb_thread_local_aset(rb_thread_current(), key, val); 46 } 47 48 Check_Type(val, T_STRING); 49 return RSTRING(val)->ptr; 50 } 51 52 static void 53 save(char *eval, void **place) { 54 *place = (void*)rb_eval_string(eval); 55 rb_gc_register_address((VALUE*)place); 56 } 57 58 static void 59 unsave(void **place) { 60 rb_gc_unregister_address((VALUE*)place); 61 } 62 63 #define call(obj, meth, ...) rb_funcall((VALUE)obj, rb_intern(meth), __VA_ARGS__) 64 65 /* Mutex */ 66 static int 67 initmutex(IxpMutex *m) { 68 save("Mutex.new", &m->aux); 69 return 0; 70 } 71 72 static void 73 mdestroy(IxpMutex *m) { 74 unsave(&m->aux); 75 } 76 77 static void 78 mlock(IxpMutex *m) { 79 call(m->aux, "lock", 0); 80 } 81 82 static int 83 mcanlock(IxpMutex *m) { 84 return call(m->aux, "trylock", 0); 85 } 86 87 static void 88 munlock(IxpMutex *m) { 89 call(m->aux, "unlock", 0); 90 } 91 92 /* RWLock */ 93 static int 94 initrwlock(IxpRWLock *rw) { 95 save("RWLock.new", &rw->aux); 96 return 0; 97 } 98 99 static void 100 rwdestroy(IxpRWLock *rw) { 101 unsave(&rw->aux); 102 } 103 104 static void 105 rlock(IxpRWLock *rw) { 106 call(rw->aux, "rdlock", 0); 107 } 108 109 static int 110 canrlock(IxpRWLock *rw) { 111 return call(rw->aux, "tryrdlock", 0) == Qtrue; 112 } 113 114 static void 115 wlock(IxpRWLock *rw) { 116 call(rw->aux, "wrlock", 0); 117 } 118 119 static int 120 canwlock(IxpRWLock *rw) { 121 return call(rw->aux, "trywrlock", 0) == Qtrue; 122 } 123 124 static void 125 rwunlock(IxpRWLock *rw) { 126 call(rw->aux, "unlock", 0); 127 } 128 129 /* Rendez */ 130 static int 131 initrendez(IxpRendez *r) { 132 save("ConditionVariable.new", &r->aux); 133 return 0; 134 } 135 136 static void 137 rdestroy(IxpRendez *r) { 138 unsave(&r->aux); 139 } 140 141 static void 142 rsleep(IxpRendez *r) { 143 call(r->aux, "wait", 1, (VALUE)r->mutex->aux); 144 } 145 146 static int 147 rwake(IxpRendez *r) { 148 call(r->aux, "signal", 0); 149 return 0; 150 } 151 152 static int 153 rwakeall(IxpRendez *r) { 154 call(r->aux, "broadcast", 0); 155 return 0; 156 } 157 158 /* Yielding IO */ 159 static ssize_t 160 _read(int fd, void *buf, size_t size) { 161 int n; 162 163 rb_thread_wait_fd(fd); 164 n = read(fd, buf, size); 165 166 if(n < 0 && errno == EINTR) 167 rb_thread_schedule(); 168 return n; 169 } 170 171 static ssize_t 172 _write(int fd, const void *buf, size_t size) { 173 int n; 174 175 rb_thread_fd_writable(fd); 176 n = write(fd, buf, size); 177 178 if(n < 0 && errno == EINTR) 179 rb_thread_schedule(); 180 return n; 181 } 182 183 static IxpThread ixp_rthread = { 184 /* Mutex */ 185 .initmutex = initmutex, 186 .lock = mlock, 187 .canlock = mcanlock, 188 .unlock = munlock, 189 .mdestroy = mdestroy, 190 /* RWLock */ 191 .initrwlock = initrwlock, 192 .rlock = rlock, 193 .canrlock = canrlock, 194 .wlock = wlock, 195 .canwlock = canwlock, 196 .runlock = rwunlock, 197 .wunlock = rwunlock, 198 .rwdestroy = rwdestroy, 199 /* Rendez */ 200 .initrendez = initrendez, 201 .sleep = rsleep, 202 .wake = rwake, 203 .wakeall = rwakeall, 204 .rdestroy = rdestroy, 205 /* Other */ 206 .errbuf = errbuf, 207 .read = _read, 208 .write = _write, 209 .select = rb_thread_select, 210 }; 211 212 static char RWLock[] = 213 "class RWLock \n" 214 " def initialize \n" 215 " @rdqueue = [] \n" 216 " @wrqueue = [] \n" 217 " @wrheld = nil \n" 218 " @rdheld = [] \n" 219 " end \n" 220 " \n" 221 " def rdlock \n" 222 " cr = Thread.critical \n" 223 " while (Thread.critical = true; @wrheld != nil && @rwheld != Thread.current)\n" 224 " @rdqueue.push Thread.current \n" 225 " Thread.stop \n" 226 " end \n" 227 " @wrheld = nil \n" 228 " @rdheld.push Thread.current \n" 229 " \n" 230 " @rdqueue.each {|t| t.wakeup} \n" 231 " Thread.critical = cr \n" 232 " self \n" 233 " end \n" 234 " \n" 235 " def wrlock \n" 236 " cr = Thread.critical \n" 237 " while (Thread.critical = true; \n" 238 " !@rdheld.empty? || (@wrheld != Thread.current && @wrheld != nil)) \n" 239 " @wrqueue.push Thread.current \n" 240 " Thread.stop \n" 241 " end \n" 242 " @wrheld = Thread.current \n" 243 " Thread.critical = cr \n" 244 " self \n" 245 " end \n" 246 " \n" 247 " \n" 248 " def tryrdlock \n" 249 " cr = Thread.critical \n" 250 " if @wrheld == nil \n" 251 " rdlock \n" 252 " true \n" 253 " else \n" 254 " false \n" 255 " end \n" 256 " ensure \n" 257 " Thread.critical = cr \n" 258 " end \n" 259 " \n" 260 " def trywrlock \n" 261 " cr = Thread.critical \n" 262 " if @wrheld == nil && @rdheld.empty? \n" 263 " wrlock \n" 264 " true \n" 265 " else \n" 266 " false \n" 267 " end \n" 268 " ensure \n" 269 " Thread.critical = cr \n" 270 " end \n" 271 " \n" 272 " def unlock \n" 273 " cr = Thread.critical \n" 274 " Thread.critical = true \n" 275 " \n" 276 " if @rdheld.include?(Thread.current) \n" 277 " @rdheld.remove!(Thread.current) \n" 278 " raise if @wrheld \n" 279 " elsif @rwheld != Thread.current \n" 280 " raise \n" 281 " end \n" 282 " \n" 283 " @wrheld = nil \n" 284 " if !@rwqueue.empty? && @rdheld.empty? \n" 285 " @wrheld = @wrqueue.shift \n" 286 " elsif !@rdqueue.empty \n" 287 " @wrheld = @rdqueue.shift \n" 288 " end \n" 289 " @wrheld.wakeup if @wrheld \n" 290 " ensure \n" 291 " Thread.critical = cr \n" 292 " end \n" 293 "end \n"; 294