int cnd_init(cnd_t*cond);初始化 cond 引用的条件变量。
void cnd_destroy(cnd_t*cond);释放指定条件变量使用的所有资源。
int cnd_signal(cnd_t*cond);在等待指定条件变量的任意数量的线程中,唤醒其中一个线程。
int cnd_broadcast(cnd_t*cond);唤醒所有等待指定条件变量的线程。
int cnd_wait(cnd_t*cond,mtx_t*mtx);阻塞正在调用的线程,并释放指定的互斥。在调用 cnd_wait()之前,线程必须持有互斥。如果另一线程通过发送一个信号解除当前线程的阻塞(也就是说,通过指定同样的条件变量作为参数调用 cond_signal()或 cnd_broadcast()),那么调用 cnd_wait()的线程在 cnd_wait()返回之前会再次获得互斥。
int cnd_timedwait(cnd_t*restrict cond,mtx_t*restrict mtx,const struct timespec*restrict ts);与 cnd_wait()类似,cnd_timedwait()阻塞调用它们的线程,但仅维持由参数 ts 指定的时间。可以通过调用函数 timespec_get()获得一个 struct timespec 对象,它表示当前时间。
/* buffer.h
* 用于线程安全缓冲区的所有声明
*/
#include <stdbool.h>
#include <threads.h>
typedef struct Buffer
{
int *data; // 指向数据数组的指针
size_t size, count; // 元素数量的最大值和当前值
size_t tip, tail; // tip = 下一个空点的索引
mtx_t mtx; // 一个互斥
cnd_t cndPut, cndGet; // 两个条件变量
} Buffer;
bool bufInit( Buffer *bufPtr, size_t size );
void bufDestroy(Buffer *bufPtr);
bool bufPut(Buffer *bufPtr, int data);
bool bufGet(Buffer *bufPtr, int *dataPtr, int sec);
/* -------------------------------------------------------------
* buffer.c
* 定义用于处理Buffer的函数
*/
#include "buffer.h"
#include <stdlib.h> // 为了使用malloc()和free()
bool bufInit( Buffer *bufPtr, size_t size)
{
if ((bufPtr->data = malloc( size * sizeof(int))) == NULL)
return false;
bufPtr->size = size;
bufPtr->count = 0;
bufPtr->tip = bufPtr->tail = 0;
return mtx_init( &bufPtr->mtx, mtx_plain) == thrd_success
&& cnd_init( &bufPtr->cndPut) == thrd_success
&& cnd_init( &bufPtr->cndGet) == thrd_success;
}
void bufDestroy(Buffer *bufPtr)
{
cnd_destroy( &bufPtr->cndGet );
cnd_destroy( &bufPtr->cndPut );
mtx_destroy( &bufPtr->mtx );
free( bufPtr->data );
}
// 在缓冲区中插入一个新元素
bool bufPut(Buffer *bufPtr, int data)
{
mtx_lock( &bufPtr->mtx );
while (bufPtr->count == bufPtr->size)
if (cnd_wait( &bufPtr->cndPut, &bufPtr->mtx ) != thrd_success)
return false;
bufPtr->data[bufPtr->tip] = data;
bufPtr->tip = (bufPtr->tip + 1) % bufPtr->size;
++bufPtr->count;
mtx_unlock( &bufPtr->mtx );
cnd_signal( &bufPtr->cndGet );
return true;
}
// 从缓冲区中移除一个元素
// 如果缓冲区是空的,则等待不超过sec秒
bool bufGet(Buffer *bufPtr, int *dataPtr, int sec)
{
struct timespec ts;
timespec_get( &ts, TIME_UTC ); // 当前时间
ts.tv_sec += sec; // + sec秒延时
mtx_lock( &bufPtr->mtx );
while ( bufPtr->count == 0 )
if (cnd_timedwait(&bufPtr->cndGet,
&bufPtr->mtx, &ts) != thrd_success)
return false;
*dataPtr = bufPtr->data[bufPtr->tail];
bufPtr->tail = (bufPtr->tail + 1) % bufPtr->size;
--bufPtr->count;
mtx_unlock( &bufPtr->mtx );
cnd_signal( &bufPtr->cndPut );
return true;
}
// producer_consumer.c
#include "buffer.h"
#include <stdio.h>
#include <stdlib.h>
#define NP 2 // 生产者的数量
#define NC 3 // 消费者的数量
int producer(void *); // 线程函数
int consumer(void *);
struct Arg { int id; Buffer *bufPtr; }; // 线程函数的参数
_Noreturn void errorExit(const char* msg)
{
fprintf(stderr, "%s\n", msg); exit(0xff);
}
int main(void)
{
printf("Producer-Consumer Demo\n\n");
Buffer buf; // 为5个产品创建一个缓冲区
bufInit( &buf, 5 );
thrd_t prod[NP], cons[NC]; // 线程
struct Arg prodArg[NP], consArg[NC]; // 线程的参数
int i = 0, res = 0;
for ( i = 0; i < NP; ++i ) // 启动生产者
{
prodArg[i].id = i+1, prodArg[i].bufPtr = &buf;
if (thrd_create( &prod[i], producer, &prodArg[i] ) != thrd_success)
errorExit("Thread error.");
}
for ( i = 0; i < NC; ++i ) // 启动消费者
{
consArg[i].id = i+1, consArg[i].bufPtr = &buf;
if ( thrd_create( &cons[i], consumer, &consArg[i] ) != thrd_success)
errorExit("Thread error.");
}
for ( i = 0; i < NP; ++i ) // 等待线程结束
thrd_join(prod[i], &res),
printf("\nProducer %d ended with result %d.\n", prodArg[i].id, res);
for ( i = 0; i < NC; ++i )
thrd_join(cons[i], &res),
printf("Consumer %d ended with result %d.\n", consArg[i].id, res);
bufDestroy( &buf );
return 0;
}
int producer(void *arg) // 生产者线程函数
{
struct Arg *argPtr = (struct Arg *)arg;
int id = argPtr->id;
Buffer *bufPtr = argPtr->bufPtr;
int count = 0;
for (int i = 0; i < 10; ++i)
{
int data = 10*id + i;
if (bufPut( bufPtr, data ))
printf("Producer %d produced %d\n", id, data), ++count;
else
{ fprintf( stderr,
"Producer %d: error storing %d\n", id, data);
return -id;
}
}
return count;
}
int consumer(void *arg) // 消费者线程函数
{
struct Arg *argPtr = (struct Arg *)arg;
int id = argPtr->id;
Buffer *bufPtr = argPtr->bufPtr;
int count = 0;
int data = 0;
while (bufGet( bufPtr, &data, 2 ))
{
++count;
printf("Consumer %d consumed %d\n", id, data);
}
return count;
}
版权说明:Copyright © 广州松河信息科技有限公司 2005-2025 版权所有 粤ICP备16019765号
广州松河信息科技有限公司 版权所有