
#include #include #include #define GET_INDOS 0x34 #define GET_CRIT_ERR 0x5d06 #define FINISHED 0 // 表示线程处于终止态或TCB是空白状态 #define RUNNING 1 // 表示线程处于运行态 #define READY 2 // 表示线程处于就绪态 #define BLOCKED 3 // 表示线程处于阻塞态 #define NTCB 5 // 表示系统允许线程的最大数 #define NTEXT 20 // 消息的最大字节数 #define NBUF 5 // 消息缓冲区的数目 char far *indos_ptr = 0; // INDOS标志的地址 char far *crit_err_ptr = 0; // 严重错误标志的地址 int current = 0; // 当前线程的内部标识符 int timecount = 0; // 从调度至今运行的时间 int TL = 1; // 时间片大小 int sum; int buf[5]; int in, out; /* 记录型信号量 */ typedef struct { int value; // 信号量的值,表示空闲资源总数 struct TCB *wq; // 线程阻塞队列队首指针 }semaphore; /* 线程控制块 */ struct TCB { unsigned char *stack; // 线程堆栈的起始地址 unsigned ss; // 堆栈段址 unsigned sp; // 堆栈指针 char state; // 线程状态,取值可以是FINISHED、RUNNING、READY、BLOCKED char name[10]; // 线程的外部标识符 struct TCB *next; // 指向下一个线程的指针 struct buffer *mq; // 接收线程的消息队列队首指针 semaphore mutex; // 接收线程的消息队列的互斥信号量 semaphore sm; // 接收线程的消息队列的计数信号量,用于实现同步 }tcb[NTCB]; /* 线程的私有堆栈,保存现场信息 */ struct int_regs { unsigned BP, DI, SI, DS, ES, DX, CX, BX, AX, IP, CS, Flags, off, seg; }; /* 消息缓冲区 */ struct buffer { int sender; // 消息发送者的内部标识 int size; // 消息长度<=NTEXT个字节 char text[NTEXT]; // 消息正文 struct buffer *next; // 指向下一个消息缓冲区的指针 }*freebuf; // 空闲消息缓冲队列 semaphore mutexfb = {1, NULL}; // 空闲消息缓冲队列的互斥信号量 semaphore sfb = {NBUF, NULL}; // 空闲消息缓冲队列的计数信号量 semaphore mutex = {1,NULL}; semaphore empty = {5, NULL}; semaphore full = {0, NULL}; typedef int (far *codeptr)(); // 定义一个函数指针类型 void interrupt (*old_int8)(); // 定义一个函数指针old_int8 void initDos(); // 初始化DOS,获得INDOS标志的地址和严重错误标志的地址 void initTcb(); // 初始化TCB int DosBusy(); // 判断当前DOS是否忙碌 int create(char *name, codeptr code, int stackLen); // 线程创建 void destroy(int id); // 线程撤销 void over(); // 线程自动撤销 void interrupt new_int8(); // 时间片到时引起的CPU调度 void interrupt my_swtch(); // 其他原因引起的CPU调度 void block(struct TCB **qp); // 插入线程到阻塞队列 void wakeup_first(struct TCB **qp); // 唤醒阻塞队列队首进程 void P(semaphore *sem); // 对信号量的P操作 void V(semaphore *sem); // 对信号量的V操作 void initBuf(); // 初始化消息缓冲区 struct buffer *getbuf(); // 获取消息空闲缓冲区 void putbuf(struct buffer *buff); // 插入空闲消息缓冲队列 void insert(struct buffer **mq, struct buffer *buff); // 插入消息缓冲区到消息队列 struct buffer *remov(struct buffer **mq, int sender); // 获取消息缓冲区 void send(char *receiver, char *a, int size); // 发送原语 int receive(char *sender, char *b); // 接收原语 int find(); // 寻找READY状态线程的内部标识符 void tcb_state(); // 线程状态信息 int finished(); // 检查除0#线程外的所有其他线程是否都已运行 /* 初始化DOS,获得INDOS标志的地址和严重错误标志的地址 */ void initDos() { union REGS regs; struct SREGS segregs; // 获得 INDOS 标志的地址 regs.h.ah = GET_INDOS; intdosx(®s, ®s, &segregs); // intdosx():Turbo C的库函数,其功能是调用DOS的INT21H中断 indos_ptr = MK_FP(segregs.es, regs.x.bx); // MK_FP():不是一个函数,只是一个宏。其功能是做段基址加上偏移地址的运算,也就是取实际地址。 // 获得严重错误标志的地址,代码中用到的_osmajor、_osminor是Turbo C的全程变量,其中前者为DOS版本号的主要部分,后者为版本号的次要部分。 if(_osmajor < 3) crit_err_ptr = indos_ptr + 1; // 严重错误在INDOS后一字节处 else if(_osmajor == 3 && _osminor == 0) crit_err_ptr = indos_ptr - 1; // 严重错误在INDOS前一字节处 else { regs.x.ax = GET_CRIT_ERR; intdosx(®s, ®s, &segregs); crit_err_ptr = MK_FP(segregs.ds, regs.x.si); } } /* 初始化TCB */ void initTcb() { int id; for(id = 0; id < NTCB; id++) { tcb[id].stack = NULL; tcb[id].state = FINISHED; tcb[id].name[0] = '\\0'; tcb[id].next = NULL; tcb[id].mq = NULL; tcb[id].mutex.value = 1; tcb[id].mutex.wq = NULL; tcb[id].sm.value = 0; tcb[id].sm.wq = NULL; } } /* 判断当前DOS是否忙碌 */ int DosBusy() { if(indos_ptr && crit_err_ptr) return (*indos_ptr || *crit_err_ptr); // 返回值是1,表示dos忙;返回值是0,表示dos不忙 else return -1; // 还没有调用initDos() } /* 线程创建 */ int create(char *name, codeptr code, int stackLen) { int id; struct int_regs far *regs; disable(); // 关中断 for(id = 1; id < NTCB; id++) // 寻找空的TCB { if(tcb[id].state == FINISHED) break; } if(id == NTCB) // 没找到空的TCB { printf("\\n **** 线程 %s 创建失败 ****\\n", name); return -1; } // 初始化TCB tcb[id].stack = (unsigned char *)malloc(stackLen); regs = (struct int_regs far *)(tcb[id].stack + stackLen); regs--; tcb[id].ss = FP_SEG(regs); tcb[id].sp = FP_OFF(regs); tcb[id].state = READY; strcpy(tcb[id].name, name); // 初始化私有堆栈 regs->DS = _DS; regs->ES = _ES; regs->IP = FP_OFF(code); regs->CS = FP_SEG(code); regs->Flags = 0x200; regs->off = FP_OFF(over); regs->seg = FP_SEG(over); printf("\\n **** 线程%d %s 已创建 ****\\n", id, tcb[id].name); enable(); // 开中断 return id; } /* 线程撤销 */ void destroy(int id) { disable(); // 关中断 // 释放私有堆栈空间,清空TCB信息 free(tcb[id].stack); tcb[id].stack = NULL; tcb[id].state = FINISHED; tcb[id].name[0] = '\\0'; tcb[id].next = NULL; tcb[id].mq = NULL; tcb[id].mutex.value = 1; tcb[id].mutex.wq = NULL; tcb[id].sm.value = 0; tcb[id].sm.wq = NULL; printf("\\n **** 线程%d %s 已撤销 ****\\n", id, tcb[id].name); enable(); // 开中断 } /* 用于自动撤销线程 */ void over() { destroy(current); // 撤销当前线程 my_swtch(); // CPU调度 } /* 时间片到时引起的CPU调度 */ void interrupt new_int8() { (*old_int8)(); // 调用原来的时钟中断服务程序 timecount++; // 计时 if(timecount < TL || DosBusy()) // 时间片未到或DOS正忙 return; my_swtch(); // 调用my_swtch()进行重新调度 } /* 其他原因引起的CPU调度 */ void interrupt my_swtch() { int id; disable(); // 关中断 // 保存现行堆栈的段址和栈顶供下次切换时用 tcb[current].ss = _SS; tcb[current].sp = _SP; if(tcb[current].state == RUNNING) tcb[current].state = READY; id = find(); if(id < 0) id = 0; // 切换堆栈 _SS = tcb[id].ss; _SP = tcb[id].sp; tcb[id].state = RUNNING; current = id; timecount = 0; // 重新开始计时 enable(); // 开中断 } /* 插入线程到阻塞队列 */ void block(struct TCB **qp) { struct TCB *tcbp; tcb[current].state = BLOCKED; if((*qp) == NULL) // 若阻塞队列为空,队首指向当前线程 (*qp) = &tcb[current]; else // 否则插入队尾 { tcbp = (*qp); while(tcbp->next != NULL) tcbp = tcbp->next; tcbp->next = &tcb[current]; } tcb[current].next = NULL; my_swtch(); // CPU调度 } /* 唤醒阻塞队列队首进程 */ void wakeup_first(struct TCB **qp) { struct TCB *tcbp; if((*qp) == NULL) return; tcbp = (*qp); (*qp )= (*qp)->next; // 阻塞队列队首指向下一个线程 tcbp->state = READY; tcbp->next = NULL; } /* 对信号量的P操作 */ void P(semaphore *sem) { struct TCB **qp; disable(); // 关中断 sem->value--; // 空闲资源数减1 if(sem->value < 0) // 没有空闲资源 { qp = &(sem->wq); block(qp); // 插入线程到阻塞队列 } enable(); // 开中断 } /* 对信号量的V操作 */ void V(semaphore *sem) { struct TCB **qp; disable(); // 关中断 sem->value++; // 空闲资源数加1 if(sem->value <= 0) // 阻塞队列中有进程 { qp = &(sem->wq); wakeup_first(qp); // 唤醒阻塞队列队首进程 } enable(); // 开中断 } /* 初始化消息缓冲区 */ void initBuf() { struct buffer *bufp, *buff; int i; buff = (struct buffer *)malloc(sizeof(struct buffer)); buff->sender = -1; buff->size = 0; buff->text[0] = '\\0'; freebuf = bufp = buff; for(i = 1; i < NBUF; i++) { buff = (struct buffer *)malloc(sizeof(struct buffer)); buff->sender = -1; buff->size = 0; buff->text[0] = '\\0'; bufp->next = buff; bufp = buff; } bufp->next = NULL; } /* 获取空闲消息缓冲区 */ struct buffer *getbuf() { struct buffer *buff; buff = freebuf; freebuf = freebuf->next; // 空闲消息缓冲队列队首指针指向下一个空闲缓冲区 return buff; } /* 插入空闲消息缓冲队列 */ void putbuf(struct buffer *buff) { struct buffer *bufp = freebuf; if(freebuf == NULL) // 若空闲消息队列为空,队首指向当前消息缓冲区 freebuf = buff; else // 否则插入队尾 { while(bufp->next != NULL) bufp = bufp->next; bufp->next = buff; } buff->next = NULL; } /* 插入消息缓冲区到消息队列 */ void insert(struct buffer **mq, struct buffer *buff) { struct buffer *bufp; if(buff == NULL) return; if((*mq) == NULL) // 若消息队列为空,队首即为当前消息缓冲区 (*mq) = buff; else // 否则插入队尾 { bufp = (*mq); while(bufp->next != NULL) bufp = bufp->next; bufp->next = buff; } buff->next = NULL; } /* 从消息队列获取消息缓冲区 */ struct buffer *remov(struct buffer **mq, int sender) { struct buffer *bufp, *buff; bufp = (*mq); // 若消息队列队首是sender发送的消息,则取出该消息缓冲区 if(bufp->sender == sender) { buff = bufp; (*mq) = buff->next; buff->next = NULL; return buff; } // 寻找发送者为sender的消息缓冲区 while(bufp->next != NULL && bufp->next->sender != sender) bufp = bufp->next; // 若找不到,则返回NULL if(bufp->next == NULL) return NULL; buff = bufp->next; bufp->next = buff->next; buff->next = NULL; return buff; } /* 发送原语 */ void send(char *receiver, char *a, int size) { struct buffer *buff; int i, id; disable(); // 关中断 // 寻找接受者线程 for(id = 0; id < NTCB; id++) { if(!strcmp(receiver, tcb[id].name)) break; } if(id == NTCB) // 如果接收者线程不存在,则不发送,立即返回 { enable(); // 开中断 return; } // 获取空闲消息缓冲区 P(&sfb); P(&mutexfb); buff = getbuf(); V(&mutexfb); buff->sender = current; buff->size = size; for(i = 0; i < buff->size; i++, a++) buff->text[i] = *a; // 将消息缓冲区插入到接收者线程的消息队列末尾 P(&tcb[id].mutex); insert(&(tcb[id].mq), buff); V(&tcb[id].mutex); V(&tcb[id].sm); enable(); // 开中断 } /* 接收原语 */ int receive(char *sender, char *b) { struct buffer *buff; int i, id, size; disable(); // 关中断 // 寻找发送者线程 for(id = 0; id < NTCB; id++) { if(!strcmp(sender, tcb[id].name)) break; } if(id == NTCB) // 如果发送者线程不存在,则不接收,返回-1 { enable(); // 开中断 return -1; } // 从消息队列取得sender发送的消息 P(&tcb[current].sm); P(&tcb[current].mutex); buff = remov(&(tcb[current].mq), id); V(&tcb[current].mutex); // 若消息队列中没有sender发送的消息,返回0 if(buff == NULL) { V(&tcb[current].sm); enable(); return 0; } size = buff->size; for(i = 0; i < buff->size; i++, b++) *b = buff->text[i]; buff->sender = -1; buff->size = 0; buff->text[0] = '\\0'; // 插入空闲消息缓冲队列 P(&mutexfb); putbuf(buff); V(&mutexfb); V(&sfb); enable(); // 开中断 return size; } /* 寻找READY状态线程的内部标识符 */ int find() { int id; for(id = current + 1; id < NTCB; id++) { if(tcb[id].state == READY) return id; } for(id = 0; id < current; id++) { if(tcb[id].state == READY) return id; } return -1; } /* 线程状态信息 */ void tcb_state() { int id; printf("\\n **** 当前线程状态 ****\\n"); for(id = 0; id < NTCB; id++) { printf("线程%d %9s 状态为 ", id, tcb[id].name); switch(tcb[id].state) { case FINISHED: puts("FINISHED"); break; case RUNNING: puts("RUNNING"); break; case READY: puts("READY"); break; case BLOCKED: puts("BLOCKED"); break; } } } /* 检查除0#线程外的所有其他线程是否都已运行完 */ int finished() { int id; for(id = 1; id < NTCB; id++) { if(tcb[id].state != FINISHED) return 0; } return 1; } /* 不断输出a,共50个 */ void f1() { int i, j, k; for(i = 0; i < 50; i++) { putchar('a'); // 延时 for(j = 0; j < 1000; j++) for(k = 0; k < 1000; k++); } } /* 不断输出b,共50个 */ void f2() { long i, j, k; for(i = 0; i < 30; i++) { putchar('b'); // 延时 for(j = 0; j < 1000; j++) for(k = 0; k < 1000; k++); } } /* 不断对sum加1,共50次,与f4互斥 */ void f3() { int i; int tmp; for(i = 0; i < 50; i++) { P(&mutex); tmp = sum; delay(5); // 暂停5毫秒 tmp++; sum = tmp; V(&mutex); } } /* 不断对sum加1,共50次,与f3互斥 */ void f4() { int i; int tmp; for(i = 0; i < 50; i++) { P(&mutex); tmp = sum; delay(10); // 暂停10毫秒 tmp++; sum = tmp; V(&mutex); } } /* 不断计算1至50的平方,并将结果放入buf中 */ void producer() { int i; int tmp; for(i = 1; i <= 50; i++) { tmp = i * i; P(&empty); P(&mutex); buf[in] = tmp; in = (in + 1) % 5; V(&mutex); V(&full); } } /* 不断从缓冲中取出结果,并打印出来 */ void consumer() { int i; int tmp; for(i = 1; i <= 50; i++) { P(&full); P(&mutex); tmp = buf[out]; out = (out + 1) % 5; V(&mutex); V(&empty); printf("%d\\n", tmp); } } /* 不断向receiver发送消息,共10条 */ void sender() { int i; char message[10] = "message"; for(i = 0; i < 10; i++) { message[7] = i + '0'; message[8] = '\\0'; send("receiver", message, strlen(message)); printf("消息 %s 已发送\\n", message); } receive("receiver", message); if(message[0] == 'O' && message[1] == 'K') printf("所有消息已被接收,通信成功"); else printf("消息未被完全接收,通信失败"); } /* 不断接受从sender发过来的消息 */ void receiver() { int i; int size; char message[10]; for(i = 0; i < 10; i++) { while((size = receive("sender", message)) == 0); message[size] = '\\0'; printf("消息 %s 已接收\\n", message); } strcpy(message, "OK"); send("sender", message, strlen(message)); // 发送确认消息 } int main() { int select = -1; initDos(); // 初始化DOS initTcb(); // 初始化TCB initBuf(); // 初始化freebuf old_int8 = getvect(8); // 获取系统原来的时钟中断服务程序的入口地址 // 创建0#线程 strcpy(tcb[0].name, "main"); tcb[0].state = RUNNING; current = 0; while(select) { clrscr(); printf("\\n\\n\\n\\\ 欢迎使用多任务系统\\n"); printf("\\n\***********************************************************\\n"); printf("\* 1.实现线程的并发执行,可设置时间片大小 *\\n"); printf("\* 2.实现线程对同一资源的互斥访问 *\\n"); printf("\* 3.实现生产者和消费者同步问题 *\\n"); printf("\* 4.实现消息缓冲通信 *\\n"); printf("\* 0.退出 *\\n"); printf("\***********************************************************\\n\\n"); printf("\ 您的选择(请输入 0~4 之间的数):"); scanf("%d", &select); clrscr(); switch(select) { case 0: break; case 1: printf("\\n\\实现线程的并发执行\\n"); printf("\\线程f1不断输出字母a,共50次\\n"); printf("\\线程f2不断输出字母b,共30次\\n"); printf("\\请设置时间片大小:"); scanf("%d", &TL); printf("\\请按任意键继续. . . "); getch(); create("f1", (codeptr)f1, 1024); create("f2", (codeptr)f2, 1024); tcb_state(); setvect(8, new_int8); // 为时钟中断设置新的中断服务程序 my_swtch(); // CPU调度 while(!finished()); break; case 2: printf("\\n\\实现线程对同一资源的互斥访问\\n"); printf("\\线程f3和f4共享变量sum\\n"); printf("\\线程f3不断对sum加1,共50次\\n"); printf("\\线程f4不断对sum加1,共50次\\n"); printf("\\请按任意键继续. . . "); getch(); sum = 0; create("f3", (codeptr)f3, 1024); create("f4", (codeptr)f4, 1024); tcb_state(); TL = 1; setvect(8, new_int8); // 为时钟中断设置新的中断服务程序 my_swtch(); // CPU调度 while(!finished()); printf("sum = %d\\n", sum); break; case 3: printf("\\n\\实现生产者和消费者同步问题\\n"); printf("\\线程producer和consumer共享可存放5个整数的缓冲buf\\n"); printf("\\线程producer不断计算1至50的平方,并将结果放入buf中\\n"); printf("\\线程consumer不断从缓冲中取出结果,并打印出来\\n"); printf("\\请按任意键继续. . . "); getch(); in = out = 0; create("producer", (codeptr)producer, 1024); create("consumer", (codeptr)consumer, 1024); tcb_state(); TL = 1; setvect(8, new_int8); // 为时钟中断设置新的中断服务程序 my_swtch(); // CPU调度 while(!finished()); break; case 4: printf("\\n\\实现消息缓冲通信\\n"); printf("\\线程sender和receiver通过消息缓冲区进行通信\\n"); printf("\\线程sender不断向receiver发送消息,共10条\\n"); printf("\\线程receiver不断接受从sender发过来的消息\\n"); printf("\\请按任意键继续. . . "); getch(); create("sender", (codeptr)sender, 1024); create("receiver", (codeptr)receiver, 1024); tcb_state(); TL = 1; setvect(8, new_int8); // 为时钟中断设置新的中断服务程序 my_swtch(); // CPU调度 while(!finished()); break; default: printf("\\n\\对不起,您的输入有误,请重新输入!\\n"); printf("\\请按任意键继续. . . "); getch(); break; } tcb_state(); if(select > 0 && select < 5) { printf("\\n除0号线程外其他线程已执行完成,按任意键结束. . . "); getch(); } setvect(8, old_int8); // 恢复原来的时钟中断服务程序 } // 终止多任务系统 tcb[0].state=FINISHED; tcb[0].name[0]='\\0'; printf("\\n\\n\\n\\\ 谢谢使用多任务系统\\n"); return 0; }
