player.c (2955B)
1 #include <u.h> 2 #include <libc.h> 3 #include <bio.h> 4 #include <thread.h> 5 #include "last.h" 6 #undef pipe 7 8 static char *player[] = { "rc", "-c", "exec " PLAYER, 0 }; 9 10 static QLock lk; 11 static Biobuf *stream; 12 static Biobuf playerin; 13 static int playing; 14 15 static void 16 playerproc(void *v) { 17 Channel *cwait; 18 Waitmsg *m; 19 int fd[3], fdin; 20 int cpid, i; 21 22 fdin = ((int*)v)[0]; 23 cwait = threadwaitchan(); 24 for(;;) { 25 fd[0] = dup(fdin, -1); 26 fd[1] = open("/dev/null", OWRITE); 27 fd[2] = open("/dev/null", OWRITE); 28 notedisable("alarm"); 29 cpid = threadspawn(fd, player[0], player); 30 noteenable("alarm"); 31 do{ 32 m = recvp(cwait); 33 i = 0; 34 if(m == nil) 35 continue; 36 i = m->pid; 37 free(m); 38 }while(i != cpid); 39 msg("Player died.\n"); 40 } 41 } 42 43 void 44 initplayer(void) { 45 static int fd[2]; 46 47 notifyoff("alarm"); 48 notedisable("sys: window size change"); 49 50 if(pipe(fd) < 0) 51 exitsall("pipe"); 52 53 proccreate(playerproc, fd, mainstacksize); 54 Binit(&playerin, fd[1], OWRITE); 55 } 56 57 static int 58 tgetc(Biobuf *b) { 59 int c; 60 61 alarm(1000); 62 c = Bgetc(b); 63 alarm(0); 64 return c; 65 } 66 67 #define GETC(bp) \ 68 ((bp)->icount?(bp)->bbuf[(bp)->bsize+(bp)->icount++]:tgetc((bp))) 69 70 static int 71 scan(int c, char *str) { 72 char *s; 73 74 s=str; 75 do { 76 if(*s++ != c) 77 break; 78 if(*s == '\0') 79 return 1; 80 }while((c = GETC(stream)) != Beof); 81 while(--s > str) 82 Bungetc(stream); 83 return 0; 84 } 85 86 static void 87 proxyproc(void *v) { 88 char errbuf[16]; 89 int c, i, j, bytes; 90 91 qlock(&lk); 92 j = 0; 93 while(playing) { 94 label("Starting stream."); 95 if(debug['p']) 96 msg("New stream\n"); 97 skipping = 0; 98 i = 0; 99 do { 100 stream = httpget(streamhost, streampath); 101 if(stream) 102 break; 103 if(i++ > 5) { 104 msg("Can't get stream: %r\n"); 105 playing = 0; 106 } 107 rerrstr(errbuf, sizeof errbuf); 108 }while(playing && !strcasestr(errbuf, "interrupted")); 109 110 c = 0; 111 bytes = 0; 112 while(playing && (c = GETC(stream)) != Beof) { 113 if(scan(c, "SYNC")) { 114 if(debug['v']) 115 msg("SYNC\n"); 116 skipping = 0; 117 getmeta(); 118 }else 119 if(scan(c, "HTTP/1.0 ")) { 120 msg("Stream died: %.*s\n", Blinelen(stream)-1, Brdline(stream, '\n')); 121 playing = 0; 122 }else { 123 bytes++; 124 if(!skipping) 125 BPUTC(&playerin, c); 126 else { 127 playerin.ocount = -playerin.bsize; 128 /* Waited too long, probably won't actually skip. 129 * Just start a new stream. 130 */ 131 /* 8192 is arbitrary. */ 132 if(skipping++ > 8192) 133 break; 134 } 135 } 136 } 137 if(debug['p'] && !playing) 138 msg("playing => 0\n"); 139 if(c == Beof) 140 msg("Stream died\n"); 141 Bterm(stream); 142 if(bytes > 2048) 143 j = 0; 144 else if(j++ > 15) { 145 msg("Stream won't stay open\n"); 146 playing = 0; 147 } 148 } 149 qunlock(&lk); 150 } 151 152 void 153 newstream(void) { 154 155 if(playing) 156 endstream(); 157 158 playing = 1; 159 proccreate(proxyproc, 0, mainstacksize); 160 } 161 162 void 163 endstream(void) { 164 165 label("Stopped."); 166 if(!playing) 167 return; 168 169 playing = 0; 170 postnote(PNGROUP, getpid(), "alarm"); 171 qlock(&lk); 172 qunlock(&lk); 173 174 if(stream) 175 Bterm(stream); 176 stream = nil; 177 } 178