线程间同步之信号量实现环形buf

  1. 云栖社区>
  2. 博客>
  3. 正文

线程间同步之信号量实现环形buf

余二五 2017-11-07 15:27:00 浏览545


一.概述:

  信号量是一个非负整数的计数器,它通过计数器来实现多线程对临界资源的顺序访问,从而实现线程间的同步。它与进程间通信的信号量不同,进程间通信的信号量是一个信号量集,而线程间同步的信号量是一个信号。还有一点,就是对信号量的操作是原子的。

  信号量与互斥锁的区别:

(1).互斥锁的值只能是0或1,而信号量的值为非负整数。

(2).互斥锁用与实现线程间的互斥,而信号量用于实现线程间的同步。

(3).互斥锁的加锁和解锁必须由同一个线程分别对应使用,而信号量可以由一个线程得到,另一个线程释放。



  下面是在一个环形buf中实现消费者与生产者模型。在这个实现中,要定义两个信号量,一个信号量表示可以读的数据,一个表示可以写的格子,并且只实现了消费者与生产者的同步,没有实现互斥,即生产者和消费者可以同时操作。如果是多生产者与多消费者,则要实现生产者与生产者之间的互斥,消费者与消费者之间的互斥。




二.相关函数:

(1).sem_init函数:int  sem_init(sem_t  *sem, int  pshared,  unsigned int value)

pshared参数:pshared 参数指明信号量是由进程内线程共享,还是由进程之间共享。如果 pshared 的值为 0,那么信号量将被进程内的线程共享,并且应该放置在所有线程都可见的地址上(如全局变量,或者堆上动态分配的变量)。如果 pshared 是非零值,那么信号量将在进程之间共享,并且应该定位共享内存区域

value参数:初始化信号量的值。

返回值:成功时返回 0;错误时,返回 -1,并把 errno 设置为合适的值。

(2).sem_destroy函数:int  sem_destroy(sem_t  *sem)

函数功能:销毁一个信号量。

返回值:成功时返回 0;错误时,返回 -1,并把 errno 设置为合适的值。

(3).sem_wait函数:int sem_wait(sem_t *sem)

函数功能:获得资源,相当与P操作,将信号量减1。(阻塞式的,并且它的操作是原子的)

返回值:成功时都返回 0;错误保持信号量值没有更改,-1 被返回,并设置 errno 来指明错误。

(4).sem_trywait函数:int sem_trywait(sem_t *sem)

函数功能:获得资源,相当与P操作,将信号量减1。(非阻塞式的,并且它的操作是原子的)

返回值:成功时都返回 0;错误保持信号量值没有更改,-1 被返回,并设置 errno 来指明错误。

(5).sem_post函数:int sem_post(sem_t *sem)

函数功能:释放资源,相当与V操作,将信号量加1,并唤醒挂起等待该资源的线程。

返回值:成功时都返回 0;错误保持信号量值没有更改,-1 被返回,并设置 errno 来指明错误。




三.相关代码:

单线程情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
1 #include<stdio.h>
  2 #include<stdlib.h>
  3 #include<pthread.h>
  4 #include<semaphore.h>
  
  6 #define SIZE 20
  const int BLANK_INIT = 20;//定义环形buf一共有20个格子
  const int DATA_INIT = 0;      //开始可以消费的数据为0                                                                                                                                 
  9 sem_t blank;
 10 sem_t data;
 11 int buf[SIZE] = {0};
 12 pthread_t tid1, tid2;
 13 
 14 void* product(void *arg)
 15 {
 16     int num = 1;
 17     int index = 0;
 18     
 19     while(1)
 20     {   
 21         sem_wait(&blank);      //获得格子资源
 22         buf[index++] = num;
 23         index = index % SIZE;
 24         printf("product data is : %d\n", num);
 25         num++;
 26         sem_post(&data);       //释放数据资源
 27     }
 28     return NULL;
 29 }
 30 
 31 void* consumer(void* arg)
 32 {
 33     int index = 0;
 34     int num = 0;
 35     
 36     while(1)
 37     {   
 38         sem_wait(&data);      //获得数据资源
 39         num = buf[index++];
 40         index = index % SIZE;
 41         printf("consumer data is : %d\n", num);
 42         sleep(1);
 43         sem_post(&blank);     //释放格子资源
 44     }
 45 }
 46 
 47 void run_product_consumer()
 48 {
 49     pthread_create(&tid1, NULL, product, NULL);
 50     pthread_create(&tid1, NULL, consumer, NULL);
 51 
 52     pthread_join(tid1, NULL);
 53     pthread_join(tid1, NULL);
 54 }
 55 
 56 void init_allsem()
 57 {
 58     if( sem_init(&blank, 0, BLANK_INIT) < 0)
 59     {
 60         printf("init product error....\n");
 61     }
 62     if( sem_init(&data, 0, DATA_INIT) < 0)
 63     {
 64         printf("init consumer error....\n");
 65     }
 66 }
 67 
 68 void destroy_allsem()
 69 {
 70     if( sem_destroy(&blank) < 0)
 71     {
 72         printf("destroy blank sem error...\n");
 73     }
 74     if( sem_destroy(&data) < 0)
 75     {
 76         printf("destroy data sem error...\n");
 77     }
 78 }
 79 
 80 
 81 
 82 int main()
 83 {
 84     init_allsem();
 85     run_product_consumer();
 86     destroy_alsem();
 87     return 0;
 88 }

执行结果:

wKiom1cfHATAA7kNAABX0v16Hw4381.png

多线程情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
1 #include<stdio.h>
  2 #include<stdlib.h>
  3 #include<pthread.h>
  4 #include<semaphore.h>
  5 #include<sys/types.h>
  
  7 #define SIZE 20
  const int BLANK_INIT = 20;
  const int DATA_INIT = 0;
 10 sem_t blank;
 11 sem_t data;
 12 pthread_mutex_t lock;
 13 int buf[SIZE] = {0};
 14 pthread_t tid1, tid2, tid3, tid4;
 15 
 16 void* product(void *arg)
 17 {
 18     int num = 1;
 19     int index = 0;
 20 
 21     while(1)
 22     {
 23         sem_wait(&blank);      //获得格子资源                                                                                                                  
 24         pthread_mutex_lock(&lock);//多线程加的代码
 25         buf[index++] = num;
 26         index = index % SIZE;
 27         printf("tid is : %lu   product data is : %d\n",pthread_self(), num);
 28         num++;
 29         pthread_mutex_unlock(&lock);//多线程加的代码
 30         sleep(2);              //改
 31         sem_post(&data);       //释放数据资源
 32     }
 33     return NULL;
 34 }
 35 
 36 void* consumer(void* arg)
 37 {
 38     int index = 0;
 39     int num = 0;
 40 
 41     while(1)
 42     {
 43         sem_wait(&data);      //获得数据资源
 44         pthread_mutex_lock(&lock);//多线程加的代码
 45         num = buf[index++];
 46         index = index % SIZE;
 47         printf("tid is :%lu   consumer data is : %d\n",pthread_self(), num);
 48         pthread_mutex_unlock(&lock);//多线程加的代码
 49         sleep(1);
 50         sem_post(&blank);     //释放格子资源
 51     }
 52 }
 53 
 54 void run_product_consumer()
 55 {
 56     pthread_create(&tid1, NULL, product, NULL);
 57     pthread_create(&tid2, NULL, product, NULL);
 58     pthread_create(&tid3, NULL, consumer, NULL);
 59     pthread_create(&tid4, NULL, consumer, NULL);
 60 
 61     pthread_join(tid1, NULL);
 62     pthread_join(tid1, NULL);
 63 }
 64 
 65 void init_allsem()
 66 {
 67     if( sem_init(&blank, 0, BLANK_INIT) < 0)
 68     {
 69         printf("init product error....\n");
 70     }
 71     if( sem_init(&data, 0, DATA_INIT) < 0)
 72     {
 73         printf("init consumer error....\n");
 74     }
 75 }
 76 
 77 void destroy_allsem()
 78 {
 79     if( sem_destroy(&blank) < 0)
 80     {
 81         printf("destroy blank sem error...\n");
 82     }
 83     if( sem_destroy(&data) < 0)
 84     {
 85         printf("destroy data sem error...\n");
 86     }
 87 }
 88 
 89 
 90 
 91 int main()
 92 {
 93     init_allsem();
 94     run_product_consumer();
 95     destroy_allsem();
 96     return 0;
 97 }                                                                                                                                                              
                                                                                                                                                 97,1         底端

执行结果:

wKioL1cfKZHDYruAAABuy4eJJ1A695.png

从图中可以看到有不同的生产者线程,不同的消费者线程。


总结:用于线程的信号量只实现同步功能,而互斥功能要用mutex实现;由于信号量的P,V操作的实现是原子的,所以加锁要加在P,V操作的后面;实现多消费者与多生产者时,消费者与消费者,生产者与生产者之间要实现互斥,而生产者与消费者之间只要实现同步。



附:

通过查看资料,发现一种比较简单的方法就是在代码中使用printf将当前线程的id打印出来。
而这也分成两种情况:
1. 如果是pthread,则使用,
#include <pthread.h>

pthread_t pthread_self(void);

2. 如果不是pthread,即是由内核创建的线程,则使用,
#include <sys/types.h>

pid_t gettid(void);

获取线程所在的进程的id,方法如下:
#include <sys/types.h>
#include <unistd.h>

pid_t getpid(void);
pid_t getppid(void);

所以,我们在代码中使用如下的语句打印:
printf("\ntid=%lu, pid=%lu\n", gettid(), getpid());
这样就能获取当前代码所在的线程和进程了。











本文转自 ye小灰灰  51CTO博客,原文链接:http://blog.51cto.com/10704527/1767944,如需转载请自行联系原作者