/* $Id: buf.c,v 1.22 2004/04/03 04:17:37 jmuelmen Exp $ */ #include #include #include #include #include "buf.h" #include "act.h" #include "scheduler.h" /* buf_buf buffers an action; the return value is the index of the * action in the buffer */ inline int buf_buf (buf_t *b, parse_action_t *a) { if (b->n_actions == N_ACT) { fprintf(stderr, "Bad bad bad! Out of parse buffer space.\n"); exit(1); } b->actions[b->n_actions++] = a; b->changed = 1; pthread_cond_broadcast(&b->has_changed); return b->n_actions - 1; } /* buf_clear discards all the actions that have been buffered */ int buf_clear (buf_t *b) { return buf_clear_from_idx(b, 0); } /* buf_clear discards all the actions that have been buffered */ int buf_clear_from_idx (buf_t *b, int i) { int temp = i; for (; i < b->n_actions; free(b->actions[i++])); return b->n_actions = temp; } /* get a parse action out of the buffer by index */ parse_action_t *buf_get_idx (buf_t *b, int i) { if (i >= b->n_actions) return NULL; return b->actions[i]; } const char *parse_action_name (void (*func) (value_t *, value_t **, int)) { if (func == assign) return "assign"; if (func == fetch_ident) return "fetch_ident"; if (func == NULL) return "no-op"; return "unknown function"; } void buf_dump (buf_t *b) { int i = 0; printf("Address\t\tType\tName\n"); for (; i < 80; ++i) printf("-"); printf("\n"); for (i = 0; i < b->n_actions; ++i) { printf("%p\t", b->actions[i]); switch (b->actions[i]->type) { case func_s: printf("func"); break; case if_s: printf("if (eop: %p, eos: %p)", b->actions[i]->statement.statement.eop, b->actions[i]->statement.statement.eos); break; case at_s: printf("at (eop: %p, eos: %p)", b->actions[i]->statement.statement.eop, b->actions[i]->statement.statement.eos); break; case sched_s: printf("sched (eop: %p, eos: %p)", b->actions[i]->statement.statement.eop, b->actions[i]->statement.statement.eos); break; case call_s: printf("call\tret %p", &b->actions[i]->ret); break; } if (b->actions[i]->type == func_s) { int j; printf("\t%s", parse_action_name(b->actions[i]->statement.func)); printf("\t(%d args: ", b->actions[i]->n_args); for (j = 0; j < b->actions[i]->n_args; ++j) { printf("%p ", b->actions[i]->args[j]); } printf(")"); } printf("\n"); } printf("%d actions buffered.\n", b->n_actions); /* for (i = 0; i < n_actions; ++i) */ /* printf("%s\t%s\t%s\n", idents[i]->name, */ /* idents[i]->type == var ? "var" : "func", */ /* idents[i]->cont.val.is_declared ? "yes" : "no"); */ } int buf_process_segment (buf_t *buf, parse_action_t ***start, parse_action_t *end, int sequential) { buf_t *seq_buf = NULL; pthread_t seq_thr; parse_action_t ***i = start; parse_action_t *eos; while (1) { switch ((**i)->type) { case func_s: parse_action_process(**i); break; case begin_atomic: /* atomic set of instructions follows */ set_atomic(1); break; case end_atomic: /* atomic set of instructions has ended */ set_atomic(0); break; case begin_sequential: /* sequential is special. It creates a new buffer; the buffer contains all the parse actions of which the sequential statement is comprised. The buffer gets its own thread, which begins execution after the buffer has been filled (this eases our locking pain). If you do anything fancy in the sequential statement (like using if, at and sched statements or even another sequential) I will hate you, and so will the parser. */ /* make the seq buf */ seq_buf = (buf_t *)malloc(sizeof(buf_t)); buf_init(seq_buf); set_sequential(seq_buf); while (1) { buf_buf(seq_buf, **i); if ((**i)->type == end_sequential) break; ++*i; } /* tell the parser to stick everything that follows into the buffer *seq_buf */ if (pthread_create(&seq_thr, NULL, sequence, seq_buf)) perror("Creating sequential thread"); set_sequential(NULL); break; case end_sequential: /* sequential set of instructions has ended. Spawn a thread to execute the buffer. The thread will take care of freeing the buffer when it is done */ fprintf(stderr, "Should never be here.\n"); break; case if_s: ifbuf_lock(); (**i)->statement.statement.cancel = 0; eos = (**i)->statement.statement.eos; while (1) { ifbuf_buf(**i); if (**i == eos) break; ++*i; } ifbuf_unlock(); break; case at_s: atbuf_lock(); (**i)->statement.statement.cancel = 0; eos = (**i)->statement.statement.eos; while (1) { atbuf_buf(**i); if (**i == eos) break; ++*i; } atbuf_unlock(); break; case sched_s: schedbuf_lock(); (**i)->statement.statement.cancel = 0; eos = (**i)->statement.statement.eos; while (1) { schedbuf_buf(**i); if (**i == eos) break; ++*i; } schedbuf_unlock(); break; case call_s: /* if the function to be called is NULL, then this is just a wrapper around an argument; we should ignore it */ if (!(**i)->statement.func_call.func) break; { /* get the arguments */ int argc = 0; value_t argv[MAX_ARGS]; parse_action_t *arg = (**i)->statement.func_call.arg_list; while (arg) { /* the value of the argument sits in args[0] */ /* we care about the value *right now*, so we copy the actual value, not the pointer */ /* wait for the argument to be evaluated, then copy it */ if (pthread_mutex_lock(&arg->args[0]->cond_mut)) perror("locking argument return mutex"); while (!arg->args[0]->returned) { if (pthread_cond_wait(&arg->args[0]->has_returned, &arg->args[0]->cond_mut)) perror("waiting for return conditional"); } pthread_mutex_unlock(&arg->args[0]->cond_mut); pthread_mutex_lock(&arg->args[0]->mut); argv[argc++] = *arg->args[0]; pthread_mutex_unlock(&arg->args[0]->mut); arg = arg->statement.func_call.arg_list; } if (sequential) { /* if this is part of a sequential statement, just call the function (blocking the buffer) */ value_t ret = (**i)->statement.func_call.func(argv, argc); pthread_mutex_lock(&(**i)->ret.mut); value_copy(&(**i)->ret, &ret); (**i)->ret.returned = 1; pthread_mutex_unlock(&(**i)->ret.mut); /* then signal that the function call has returned */ pthread_cond_signal(&(**i)->ret.has_returned); } else { /* if the statement isn't sequential, we can tell the scheduler to go send off a thread to execute the function. */ enqueue((**i)->statement.func_call.func, argv, argc, &(**i)->ret); } } break; } /* increment carefully */ if (**i == end) break; ++*i; } return 0; } void buf_init (buf_t *b) { b->n_actions = 0; pthread_mutex_init(&b->mut, NULL); pthread_cond_init(&b->has_changed, NULL); b->changed = 0; } static void buf_timersub (struct timeval *a, struct timeval *b) { a->tv_sec -= (b)->tv_sec; a->tv_usec -= (b)->tv_usec; while (a->tv_usec < 0) { --a->tv_sec; a->tv_usec += 1000000; } while (a->tv_usec >= 1000000) { ++a->tv_sec; a->tv_usec -= 1000000; } } /* sleep */ int buf_sleep (long sec, long us, struct timeval *tv_then) { int stat; struct timeval tv_now; struct timespec ts_sleep, ts_rem1, *ts_now, *ts_later; tv_then->tv_sec += sec; tv_then->tv_usec += us; gettimeofday(&tv_now, NULL); buf_timersub(tv_then, &tv_now); ts_sleep.tv_sec = tv_then->tv_sec; ts_sleep.tv_nsec = 1000 * tv_then->tv_usec; /* printf("Sleeping for %ld sec %ld ns\n", ts_sleep.tv_sec, */ /* ts_sleep.tv_nsec); */ ts_now = &ts_sleep; ts_later = &ts_rem1; do { struct timespec *ts_tmp; stat = nanosleep(ts_now, ts_later); if (stat != -1) { /* printf("Succeeded sleeping\n"); */ break; } /* if (errno != EINTR) { */ /* perror("buf_sleep"); */ /* break; */ /* } */ perror("buf_sleep"); /* swap structs and try again */ ts_tmp = ts_now; ts_now = ts_later; ts_later = ts_tmp; } while (1); return stat; }