Linux下并发网络设计之I/O复用

简介: I/O 流:   首先我们来定义流的概念,一个流可以是文件,socket,pipe等等可以进行I/O操作的内核对象。   不管是文件,还是套接字,还是管道,我们都可以把他们看作流。   之后我们来讨论I/O的操作,通过read,我们可以从流中读入数据;通过write,我们可以往流写入数据。

I/O 流:

  首先我们来定义流的概念,一个流可以是文件,socket,pipe等等可以进行I/O操作的内核对象。

  不管是文件,还是套接字,还是管道,我们都可以把他们看作流。
  之后我们来讨论I/O的操作,通过read,我们可以从流中读入数据;通过write,我们可以往流写入数据。
  现在假定一个情形,我们需要从流中读数据, 但是流中还没有数据,(典型的例子为,客户端要从socket读如数据,但是服务器还没有把数据传回来),这时候该怎么办?
 
  阻塞:
    阻塞是个什么概念呢?比如某个时候你在等快递,但是你不知道快递什么时候过来,而且你没有别的事可以干(或者说接下来的事要等快递来了才能做);那么你可以去睡觉了,因为你知道快递把货送来时一定会给你打个电话(假定一定能叫醒你)。
  非阻塞忙轮询:
    接着上面等快递的例子,如果用忙轮询的方法,那么你需要知道快递员的手机号,然后每分钟给他挂个电话:“你到了没?”
       很明显一般人不会用第二种做法,不仅显很无脑,浪费话费不说,还占用了快递员大量的时间。
       大部分程序也不会用第二种做法,因为第一种方法经济而简单,经济是指消耗很少的CPU时间,如果线程睡眠了,就掉出了系统的调度队列,暂时不会去瓜分CPU宝贵的时间片了。
     为了了解阻塞是如何进行的,我们来讨论缓冲区,以及内核缓冲区,最终把I/O事件解释清楚。缓冲区的引入是为了减少频繁I/O操作而引起频繁的系统调用 (你知道它很慢的),当你操作一个流时,更多的是以缓冲区为单位进行操作,这是相对于用户空间而言。对于内核来说,也需要缓冲区。
 
假设有一个管道,进程A为管道的写入方,B为管道的读出方。
 
假设一开始内核缓冲区是空的,B作为读出方,被阻塞着。然后首先A往管道写入,这时候内核缓冲区由空的状态变到非空状态,内核就会产生一个事件告诉B该醒来了,这个事件姑且称之为“缓冲区非空”。
    但是“缓冲区非空”事件通知B后,B却还没有读出数据;且内核许诺了不能把写入管道中的数据丢掉这个时候,A写入的数据会滞留在内核缓冲区中,如果内核也 缓冲区满了,B仍未开始读数据,最终内核缓冲区会被填满,这个时候会产生一个I/O事件,告诉进程A,你该等等(阻塞)了,我们把这个事件定义为“缓冲区 满”。
 
假设后来B终于开始读数据了,于是内核的缓冲区空了出来,这时候内核会告诉A,内核缓冲区有空位了,你可以从长眠中醒来了,继续写数据了,我们把这个事件叫做“缓冲区非满”
    也许事件Y1已经通知了A,但是A也没有数据写入了,而B继续读出数据,知道内核缓冲区空了。这个时候内核就告诉B,你需要阻塞了!,我们把这个时间定为“缓冲区空”。
 
这 四个情形涵盖了四个I/O事件, 缓冲区满,缓冲区空,缓冲区非空,缓冲区非满(注都是说的内核缓冲区,且这四个术语都是我生造的,仅为解释其原理而造)。 这四个I/O事件是进行阻塞同步的根本。(如果不能理解“同步”是什么概念,请学习操作系统的锁,信号量,条件变量等任务同步方面的相关知识)。
 
    然后我们来说说阻塞I/O的缺点。但是阻塞I/O模式下,一个线程只能处理一个流的I/O事件。如果想要同时处理多个流,要么多进程(fork),要么多线程(pthread_create),很不幸这两种方法效率都不高。
    于是再来考虑非阻塞忙轮询的I/O方式,我们发现我们可以同时处理多个流了(把一个流从阻塞模式切换到非阻塞模式再此不予讨论):
while true {
for i in stream[]; {
if i has data
read until unavailable
}
}
    我们只要不停的把所有流从头到尾问一遍,又从头开始。这样就可以处理多个流了,但这样的做法显然不好,因为如果所有的流都没有数据,那么只会白白浪费 CPU。这里要补充一点,阻塞模式下,内核对于I/O事件的处理是阻塞或者唤醒,而非阻塞模式下则把I/O事件交给其他对象(后文介绍的select以及 epoll)处理甚至直接忽略。
 
    为了避免CPU空转,可以引进了一个代理(一开始有一位叫做select的代理,后来又有一位叫做poll的代理,不过两者的本质是一样的)。这个代理比 较厉害,可以同时观察许多流的I/O事件,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中醒来,于是我们的程序就会轮 询一遍所有的流(于是我们可以把“忙”字去掉了)。代码长这样:
while true {
select(streams[])
for i in streams[] {
if i has data
read until unavailable
}
}
    于是,如果没有I/O事件产生,我们的程序就会阻塞在select处。但是依然有个问题,我们从select那里仅仅知道了,有I/O事件发生了,但却并 不知道是那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。
    但是使用select,我们有O(n)的无差别轮询复杂度,同时处理的流越多,没一次无差别轮询时间就越长。
再次说了这么多,终于能好好解释epoll了
    epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll之会把哪个流发生了怎样的I/O事件通知我们。此时我们对这些流的操作都是有意义的。(复杂度降低到了O(1))
    在讨论epoll的实现细节之前,先把epoll的相关操作列出:
 
epoll_create 创建一个epoll对象,一般epollfd = epoll_create()
 
epoll_ctl (epoll_add/epoll_del的合体),往epoll对象中增加/删除某一个流的某一个事件
比如
epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//注册缓冲区非空事件,即有数据流入
epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//注册缓冲区非满事件,即流可以被写入
epoll_wait(epollfd,...)等待直到注册的事件发生
(注:当对一个非阻塞流的读写发生缓冲区满或缓冲区空,write/read会返回-1,并设置errno=EAGAIN。而epoll只关心缓冲区非满和缓冲区非空事件)。
 
一个epoll模式的代码大概的样子是:
while true {
active_stream[] = epoll_wait(epollfd)
for i in active_stream[] {
read or write till
}
}
 
  /*epoll+线程池 服务器端例程*/
1
#include <unistd.h> 2 #include <sys/types.h> /* basic system data types */ 3 #include <sys/socket.h> /* basic socket definitions */ 4 #include <netinet/in.h> /* sockaddr_in{} and other Internet defns */ 5 #include <arpa/inet.h> /* inet(3) functions */ 6 #include <sys/epoll.h> /* epoll function */ 7 #include <fcntl.h> /* nonblocking */ 8 #include <sys/resource.h> /*setrlimit */ 9 10 #include <pthread.h> 11 #include <assert.h> 12 13 #include <stdlib.h> 14 #include <errno.h> 15 #include <stdio.h> 16 #include <string.h> 17 18 #define MAXEPOLLSIZE 10000 19 #define MAXLINE 10240 20 21 typedef struct msgdata{ 22 char name[10]; 23 int connfd; 24 char buffer[BUFSIZ]; 25 char target[10]; 26 struct msgdata *next; 27 }Msg; 28 29 typedef struct worker{ 30 //回调函数,任务运行时会调用此函数,也可以声明为其他形式 31 //该函数参数是任意类型的, 参数也是任意类型的; 32 void * (*process)(void *arg); 33 void *arg; 34 struct worker *next; 35 }CThread_worker; 36 37 //线程池结构 38 typedef struct{ 39 pthread_mutex_t queue_lock; //互斥量 40 pthread_cond_t queue_ready; //条件变量 41 42 //链表结构, 线程池中所有等待任务 43 CThread_worker *queue_head; 44 45 //是否销毁线程池 46 int shutdown; 47 pthread_t *threadid; 48 49 //线程池中允许的活动线程数目 50 int max_thread_num; 51 //当前等待队列的任务数目 52 int cur_queue_size; 53 }CThread_pool; 54 55 int pool_add_worker(void * (*process)(void *arg), void *arg); 56 void * thread_routine(void *arg); 57 void * myprocess(void *arg); 58 59 //全局静态的结构体指针 60 static CThread_pool *pool = NULL; 61 62 void pool_init(int max_thread_num) 63 { 64 int i; 65 pool = (CThread_pool*)malloc(sizeof(CThread_pool)); 66 //初始化互斥量 67 pthread_mutex_init(&(pool->queue_lock), NULL); 68 //初始化条件变量 69 pthread_cond_init(&(pool->queue_ready), NULL); 70 71 pool->queue_head = NULL; 72 73 //最大线程数目 74 pool->max_thread_num = max_thread_num; 75 //当前线程数目 76 pool->cur_queue_size = 0; 77 78 pool->shutdown = 0; 79 pool->threadid = (pthread_t*)malloc(max_thread_num * sizeof(pthread_t)); 80 81 for(i=0; i<max_thread_num; i++) 82 { 83 pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL); 84 } 85 } 86 87 int pool_add_worker(void * (*process)(void *arg), void *arg) 88 { 89 //构建一个新任务 90 CThread_worker *newworker = (CThread_worker *)malloc(sizeof(CThread_worker)); 91 newworker->process = process; 92 newworker->arg = arg; 93 newworker->next = NULL; 94 95 //加锁互斥量 96 pthread_mutex_lock(&(pool->queue_lock)); 97 //将任务加入到等待队列中 98 CThread_worker *member = pool->queue_head; 99 if(member != NULL) 100 { 101 while(member->next != NULL) 102 member = member->next; 103 member->next = newworker; 104 } 105 else //此时 线程池中是空。 106 { 107 pool->queue_head = newworker; 108 } 109 110 //此时,线程池中有任务 111 assert(pool->queue_head != NULL); 112 pool->cur_queue_size++; 113 pthread_mutex_unlock(&(pool->queue_lock)); 114 115 //等待队列中有任务了,唤醒一个等待线程; 116 //如果此时所有线程都在忙碌,这句话没有任何作用, 117 //因为没有线程等待,怎么唤醒呢; 118 //pthread_cond_signal用于通知阻塞的线程某个特定的条件为真了 119 pthread_cond_signal(&(pool->queue_ready)); 120 return 0; 121 } 122 123 int pool_destroy() 124 { 125 if(pool->shutdown) 126 return -1; //防止调用两次 127 pool->shutdown = 1; 128 129 //唤醒所有等待线程,线程池要销毁了 130 pthread_cond_broadcast(&(pool->queue_ready)); 131 132 //阻塞等待线程退出, 否则就称僵尸了; 133 int i; 134 for(i=0; i<pool->max_thread_num; i++) 135 { 136 pthread_join(pool->threadid[i], NULL); 137 } 138 139 free(pool->threadid); 140 //销毁等待队列 141 CThread_worker *head = NULL; 142 while(pool->queue_head != NULL) 143 { 144 head = pool->queue_head; 145 pool->queue_head = pool->queue_head->next; 146 free(head); 147 } 148 //条件变量 和 互斥量也要销毁 149 pthread_mutex_destroy(&(pool->queue_lock)); 150 pthread_cond_destroy(&(pool->queue_ready)); 151 152 free(pool); 153 //销毁后指针 置空是有必要的,但不是必须的 154 pool = NULL; 155 return 0; 156 } 157 158 //start thread 0x b7771b70 is waiting 159 //thread 0x b7771b70 is waiting 160 void * thread_routine(void *arg) 161 { 162 printf("start thread 0x %x is waiting \n", pthread_self()); 163 while(1) 164 { 165 pthread_mutex_lock(&(pool->queue_lock)); 166 //如果线程池中运行线程数目为0,并且不销毁线程池,则处于阻塞阻塞; 167 //注意:pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁 168 while(pool->cur_queue_size == 0 && !pool->shutdown) 169 { 170 printf("thread 0x %x is waiting \n", pthread_self()); 171 //pthread_cond_wait用于等待某个特定的条件为真 172 pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); 173 } 174 //线程池 要销毁了; 175 if(pool->shutdown) 176 { 177 //遇到break,continue,return等跳转语句,一定要先解锁 178 pthread_mutex_unlock(&(pool->queue_lock)); 179 printf("thread 0x %x will exit\n", pthread_self()); 180 pthread_exit(NULL); 181 } 182 printf("thread 0x %x is starting to work \n",pthread_self()); 183 184 //使用断言 185 assert(pool->cur_queue_size != 0); 186 assert(pool->queue_head != NULL); 187 188 //等待队列长度减去1, 并取出链表中的头元素 189 pool->cur_queue_size--; 190 CThread_worker *worker = pool->queue_head; 191 pool->queue_head = worker->next; 192 pthread_mutex_unlock(&(pool->queue_lock)); 193 194 //调用回调函数,执行任务 195 (*(worker->process))(worker->arg); 196 //销毁线程函数; 197 free(worker); 198 worker = NULL; 199 printf("hello world\n"); 200 } 201 //正常情况下,这里不能到达; 202 pthread_exit(NULL); 203 } 204 ///////////////////////////////////////////////////// 205 int handle(int connfd); 206 207 int setnonblocking(int sockfd) 208 { 209 if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1) { 210 return -1; 211 } 212 return 0; 213 } 214 215 int main(int argc, char **argv) 216 { 217 int servPort = 6888; 218 int listenq = 1024; 219 220 int listenfd, connfd, kdpfd, nfds, n, nread, curfds,acceptCount = 0; 221 struct sockaddr_in servaddr, cliaddr; 222 socklen_t socklen = sizeof(struct sockaddr_in); 223 struct epoll_event ev; 224 struct epoll_event events[MAXEPOLLSIZE]; 225 struct rlimit rt; 226 char buf[MAXLINE]; 227 228 //初始化线程池的相关操作 229 pool_init(3); //线程池中最多3个活动的线程 230 231 /* 设置每个进程允许打开的最大文件数 */ 232 rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE; 233 if (setrlimit(RLIMIT_NOFILE, &rt) == -1) 234 { 235 perror("setrlimit error"); 236 return -1; 237 } 238 239 bzero(&servaddr, sizeof(servaddr)); 240 servaddr.sin_family = AF_INET; 241 servaddr.sin_addr.s_addr = htonl(INADDR_ANY); 242 servaddr.sin_port = htons(servPort); 243 244 listenfd = socket(AF_INET, SOCK_STREAM, 0); 245 if (listenfd == -1) { 246 perror("can't create socket file"); 247 return -1; 248 } 249 250 int opt = 1; 251 setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); 252 253 if (setnonblocking(listenfd) < 0) { 254 perror("setnonblock error"); 255 } 256 257 if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(struct sockaddr)) == -1) 258 { 259 perror("bind error"); 260 return -1; 261 } 262 if (listen(listenfd, listenq) == -1) 263 { 264 perror("listen error"); 265 return -1; 266 } 267 /* 创建 epoll 句柄,把监听 socket 加入到 epoll 集合里 */ 268 kdpfd = epoll_create(MAXEPOLLSIZE); 269 ev.events = EPOLLIN | EPOLLET; 270 ev.data.fd = listenfd; 271 if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listenfd, &ev) < 0) 272 { 273 fprintf(stderr, "epoll set insertion error: fd=%d\n", listenfd); 274 return -1; 275 } 276 curfds = 1; 277 278 printf("epollserver startup,port %d, max connection is %d, backlog is %d\n", servPort, MAXEPOLLSIZE, listenq); 279 280 for (;;) { 281 /* 等待有事件发生 */ 282 nfds = epoll_wait(kdpfd, events, curfds, -1); 283 if (nfds == -1) 284 { 285 perror("epoll_wait"); 286 continue; 287 } 288 /* 处理所有事件 */ 289 for (n = 0; n < nfds; ++n) 290 { 291 if (events[n].data.fd == listenfd) 292 { 293 connfd = accept(listenfd, (struct sockaddr *)&cliaddr,&socklen); 294 if (connfd < 0) 295 { 296 perror("accept error"); 297 continue; 298 } 299 300 sprintf(buf, "accept form %s:%d\n", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port); 301 printf("%d:%s", ++acceptCount, buf); 302 303 if (curfds >= MAXEPOLLSIZE) { 304 fprintf(stderr, "too many connection, more than %d\n", MAXEPOLLSIZE); 305 close(connfd); 306 continue; 307 } 308 if (setnonblocking(connfd) < 0) { 309 perror("setnonblocking error"); 310 } 311 ev.events = EPOLLIN | EPOLLET; 312 ev.data.fd = connfd; 313 if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, connfd, &ev) < 0) 314 { 315 fprintf(stderr, "add socket '%d' to epoll failed: %s\n", connfd, strerror(errno)); 316 return -1; 317 } 318 curfds++; 319 continue; 320 } 321 // 处理客户端请求 这里进入线程池内,处理用户的操作 322 else 323 { 324 pool_add_worker(myprocess, &(events[n].data.fd)); 325 } 326 } 327 } 328 pool_destroy(); 329 close(listenfd); 330 return 0; 331 } 332 /* 333 int handle(int connfd) { 334 int nread; 335 char buf[MAXLINE]; 336 nread = read(connfd, buf, MAXLINE);//读取客户端socket流 337 if (nread == 0) { 338 printf("client close the connection\n"); 339 close(connfd); 340 return -1; 341 } 342 if (nread < 0) { 343 perror("read error"); 344 close(connfd); 345 return -1; 346 } 347 write(connfd, buf, nread);//响应客户端 348 return 0; 349 } 350 */ 351 //int handle(int connfd) 352 void * myprocess(void *arg) 353 { 354 //解包, 加锁把struct(用户名,connfd)放入链表,并在根据用户名在链表中 355 //对应的connfd, 356 int connfd = *((int *)arg); 357 int nread; 358 char buf[BUFSIZ]; 359 nread = read(connfd, buf, MAXLINE);//读取客户端socket流 360 if (nread == 0) 361 { 362 printf("client close the connection\n"); 363 close(connfd); 364 return NULL; 365 } 366 else if (nread < 0) 367 { 368 perror("read error"); 369 close(connfd); 370 return NULL; 371 } 372 else 373 { 374 375 } 376 377 378 write(connfd, buf, nread);//响应客户端 379 return NULL; 380 }

客户端例程:

#include  <unistd.h>
#include  <sys/types.h>       /* basic system data types */
#include  <sys/socket.h>      /* basic socket definitions */
#include  <netinet/in.h>      /* sockaddr_in{} and other Internet defns */
#include  <arpa/inet.h>       /* inet(3) functions */
#include <netdb.h> /*gethostbyname function */

#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>

#define MAXLINE 1024

void handle(int connfd);

int main(int argc, char **argv)
{
    char * servInetAddr = "127.0.0.1";
    int servPort = 6888;
    char buf[MAXLINE];
    int connfd;
    struct sockaddr_in servaddr;

    if (argc == 2) {
        servInetAddr = argv[1];
    }
    if (argc == 3) {
        servInetAddr = argv[1];
        servPort = atoi(argv[2]);
    }
    if (argc > 3) {
        printf("usage: echoclient <IPaddress> <Port>\n");
        return -1;
    }

    connfd = socket(AF_INET, SOCK_STREAM, 0);

      //bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(servPort);
    //inet_pton(AF_INET, servInetAddr, &servaddr.sin_addr);
    servaddr.sin_addr.s_addr = inet_addr(servInetAddr);
    bzero(&(servaddr.sin_zero), 0);

    if (connect(connfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
        perror("connect error");
        return -1;
    }
    printf("welcome to echoclient\n");
    handle(connfd);     /* do it all */
    close(connfd);
    printf("exit\n");
    exit(0);
}

void handle(int sockfd)
{
    char sendline[MAXLINE], recvline[MAXLINE];
    int n;
    for (;;) {
        if (fgets(sendline, MAXLINE, stdin) == NULL)
        {
            break;//read eof
        }
        /*
        //也可以不用标准库的缓冲流,直接使用系统函数无缓存操作
        if (read(STDIN_FILENO, sendline, MAXLINE) == 0) {
            break;//read eof
        }
        */

        n = write(sockfd, sendline, strlen(sendline));
        n = read(sockfd, recvline, MAXLINE);
        if (n == 0) {
            printf("echoclient: server terminated prematurely\n");
            break;
        }
        write(STDOUT_FILENO, recvline, n);
        //如果用标准库的缓存流输出有时会出现问题
        //fputs(recvline, stdout);
    }
}

 

相关文章
|
17天前
|
安全 Linux 虚拟化
网络名称空间在Linux虚拟化技术中的位置
网络名称空间(Network Namespaces)是Linux内核特性之一,提供了隔离网络环境的能力,使得每个网络名称空间都拥有独立的网络设备、IP地址、路由表、端口号范围以及iptables规则等。这一特性在Linux虚拟化技术中占据了核心位置🌟,它不仅为构建轻量级虚拟化解决方案(如容器📦)提供了基础支持,也在传统的虚拟机技术中发挥作用,实现资源隔离和网络虚拟化。
网络名称空间在Linux虚拟化技术中的位置
|
15天前
|
存储 算法 Linux
【实战项目】网络编程:在Linux环境下基于opencv和socket的人脸识别系统--C++实现
【实战项目】网络编程:在Linux环境下基于opencv和socket的人脸识别系统--C++实现
39 6
|
5天前
|
机器学习/深度学习 缓存 监控
linux查看CPU、内存、网络、磁盘IO命令
`Linux`系统中,使用`top`命令查看CPU状态,要查看CPU详细信息,可利用`cat /proc/cpuinfo`相关命令。`free`命令用于查看内存使用情况。网络相关命令包括`ifconfig`(查看网卡状态)、`ifdown/ifup`(禁用/启用网卡)、`netstat`(列出网络连接,如`-tuln`组合)以及`nslookup`、`ping`、`telnet`、`traceroute`等。磁盘IO方面,`iostat`(如`-k -p ALL`)显示磁盘IO统计,`iotop`(如`-o -d 1`)则用于查看磁盘IO瓶颈。
|
3天前
|
网络协议 Linux Shell
【linux网络(一)】初识网络, 理解四层网络模型
【linux网络(一)】初识网络, 理解四层网络模型
|
3天前
|
安全 Ubuntu Linux
Linux 网络操作命令Telnet
Linux 网络操作命令Telnet
16 0
Linux 网络操作命令Telnet
|
3天前
|
Ubuntu Linux
Linux(22) Linux设置网络优先级顺序
Linux(22) Linux设置网络优先级顺序
6 0
|
4天前
|
Ubuntu 网络协议 Linux
Linux(20) Ubuntu 20.04 网络接口自动切换路由配置
Linux(20) Ubuntu 20.04 网络接口自动切换路由配置
27 0
|
1月前
|
机器学习/深度学习 数据采集 人工智能
m基于深度学习网络的手势识别系统matlab仿真,包含GUI界面
m基于深度学习网络的手势识别系统matlab仿真,包含GUI界面
41 0
|
1月前
|
机器学习/深度学习 算法 计算机视觉
基于yolov2深度学习网络的火焰烟雾检测系统matlab仿真
基于yolov2深度学习网络的火焰烟雾检测系统matlab仿真
|
1月前
|
机器学习/深度学习 算法 计算机视觉
m基于深度学习网络的性别识别系统matlab仿真,带GUI界面
m基于深度学习网络的性别识别系统matlab仿真,带GUI界面
29 2