Linux libaio 异步I/O

简介:

水平有限,错误请指出


最近准备仔细看看innodb 异步I/O的实现,而在LINUX平台下Innodb中一般我们使用的都是libaio叫做LINUX NATIVE AIO,这有别于POSIX实现的AIO,因为以前对异步I/O并不熟悉,因为在很多LINUX 系统编程书籍上都没有介绍,而网上也是资料不多。当然其好处还是非常明显的,能够在使用O_DIRECT 打开文件的情况下,保证性能而不是消耗CPU资源在等待I/O落盘上,在MYSQL中如果使用了O_DIRECT打开了数据文件那么异步I/O将会发挥作用。当然MYSQL有一个自己模拟的异步I/O但是在现在支持LIBAIO的LINUX中一般都是 NATIVE AIO了。所以我仔细看了一下LINUX NATIVE AIO。
我主要参考如下文章:
http://www.360doc.com/content/14/0109/14/9008018_343854180.shtml
应该是谷歌的工程师写的,他最后列子我也详细的研究了一遍并且进行了修改/编译加上了中文注释。
吐槽一把Linux系统编程久了不用好多都模糊了,明年一定要好好复习一下。


一、基本数据结构简介

  • io_context_t:它是一个成为上下文的结构,在内部它包含一个完成队列,在线程之间是可以共享的。
  • iocb:单次读写操作需求,下面是主要的一些定义
void* data;
short aio_lio_opcode;
int aio_fildes;
union
{
 strcut
 {
   void* buf;
   unsigned long nbytes;
   long long offset;
 } c;
} u;

data:是一个用户定义传入数据
aio_lio_opcode:是一个标示可以取

IO_CMD_PREAD:读
IO_CMD_PWRITE:写
或者其他支持的标志

aio_fileds:是iocb读取或者写入的文件描述符fd
u.c.buf:是一个读取或者写入的内存数据指针
u.c.nbytes:内存数据字节长度
u.c.offset:读取文件的偏移量

其次union u中实际包含其他的可能的I/O类型如下,有兴趣的需要在看看

       union {
               struct io_iocb_common           c;
               struct io_iocb_vector           v;
               struct io_iocb_poll             poll;
               struct io_iocb_sockaddr saddr;
       } u;

iocb应该使用io_prep_pread和io_prep_pwrite进行初始化如下:

void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset);
void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset);

我们发现 int fd,void *buf, size_t count, long long offset 刚好对应了

aio_fileds:
u.c.buf:
u.c.nbytes:
u.c.offset:

而aio_lio_opcode可以从调用方式(io_prep_pread/io_prep_pwrite)看出来。

  • io_event

void* data:和iocb中的用户数据同一指针
struct iocb *obj:也就是iocb
long loing res:读或者写的字节数

二、相关函数

  • 1、建立一个io_context_t上下文
    int io_setup(unsigned nr_events, io_context_t *ctxp);

  • nr_events:本io_context_t支持的最大event最大队列,注意和后面的io_getevents的nr兼容.

  • ctxp:一根指针io_context_t用于初始化,但是这个io_context_t必须提前建立好并且赋值为0.
    LINUX MAIN PAGE:

DESCRIPTION
      io_setup()  creates  an  asynchronous  I/O  context capable of receiving at least nr_events.  ctxp must not point to an AIO context that already
      exists, and must be initialized to 0 prior to the call.  On successful creation of the AIO context, *ctxp is filled in with the  resulting  han-
      dle.

RETURN VALUE
      On success, io_setup() returns 0.  For the failure return, see NOTES.
ERRORS
      EAGAIN The specified nr_events exceeds the user's limit of available events.
      EFAULT An invalid pointer is passed for ctxp.
      EINVAL ctxp is not initialized, or the specified nr_events exceeds internal limits.  nr_events should be greater than 0.
      ENOMEM Insufficient kernel resources are available.
      ENOSYS io_setup() is not implemented on this architecture.
  • 2、销毁一个io_context_t上下文
    这个没啥好说的看看原型和LINUX MAIN PAGE即可
    int io_destroy(aio_context_t ctx);
    LINUX MAIN PAGE:
DESCRIPTION
       io_destroy()  removes  the  asynchronous  I/O context from the list of I/O contexts and then destroys it.  io_destroy() can also cancel any out-
       standing asynchronous I/O actions on ctx and block on completion.

RETURN VALUE
       On success, io_destroy() returns 0.  For the failure return, see NOTES.

ERRORS
       EFAULT The context pointed to is invalid.
       EINVAL The AIO context specified by ctx is invalid.
       ENOSYS io_destroy() is not implemented on this architecture.
  • 3、提交异步I/O操作
    int io_submit(io_context_t ctx_id, long nr, struct iocb **iocbpp);

  • ctx_id:异步I/O上下文

  • nr:后面iocb数组的长度

  • iocbpp:也就是iocb的数组
    我们可以发现一个io_submit可以提交多个iocb异步I/O需求,但是它们之间是没有顺序的,如果提交多个iocb需求可以显著的提高性能,正常情况下其不会被堵塞,如果被堵塞可能由于没有使用O_DIRECT打开文件导致

LINUX MAIN PAGE:

DESCRIPTION
       io_submit()  queues  nr  I/O request blocks for processing in the AIO context ctx_id.  iocbpp should be an array of nr AIO control blocks, which
       will be submitted to context ctx_id.
RETURN VALUE
       On success, io_submit() returns the number of iocbs submitted (which may be 0 if nr is zero).  For the failure return, see NOTES.
ERRORS
       EAGAIN Insufficient resources are available to queue any iocbs.
       EBADF  The file descriptor specified in the first iocb is invalid.
       EFAULT One of the data structures points to invalid data.
       EINVAL The aio_context specified by ctx_id is invalid.  nr is less than 0.  The iocb at *iocbpp[0] is not properly initialized, or the operation
              specified is invalid for the file descriptor in the iocb.
       ENOSYS io_submit() is not implemented on this architecture.
  • 4、获取I/O完成状态
    int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);

  • ctx_id:上下文

  • min_nr:io_getevents函数调用在达到min_nr将会返回结果io_events

  • nr:最大的io_event返回个数

  • events:它是一个返回io_event的一个数组

  • timeout:调用io_getevents堵塞的最大时间,如果达到这个值io_getevents函数调用 ,将会提前结束,返回实际的events数组和个数,可能会少于nr。
    LINUX MAIN PAGE:

DESCRIPTION
       io_getevents()  attempts  to  read  at least min_nr events and up to nr events from the completion queue of the AIO context specified by ctx_id.
       timeout specifies the amount of time to wait for events, where a NULL timeout waits until at least min_nr events  have  been  seen.   Note  that
       timeout is relative and will be updated if not NULL and the operation blocks.

RETURN VALUE
       On success, io_getevents() returns the number of events read: 0 if no events are available, or less than min_nr if the timeout has elapsed.  For
       the failure return, see NOTES.

ERRORS
       EFAULT Either events or timeout is an invalid pointer.
       EINVAL ctx_id is invalid.  min_nr is out of range or nr is out of range.
       EINTR  Interrupted by a signal handler; see signal(7).
       ENOSYS io_getevents() is not implemented on this architecture.

三、列子

如果要明白呢一个系统函数的使用最重要的还是看看它的使用套路,下面的列子最好能够好好看看

/*************************************************************************
 > File Name: test.cpp
 > Author: gaopeng QQ:22389860 all right reserved
 > Mail: gaopp_200217@163.com
 > Created Time: Fri 17 Nov 2017 12:52:56 AM CST
************************************************************************/

#include<iostream>
#include<libaio.h>
#include<stdlib.h>
#include<stdio.h>
#include<sys/stat.h>
#include<sys/types.h>
#include<fcntl.h>

#define PAGE_SIZE 1<<12
#define CHECK_ERROR(r,w,fs,f) (check_error(r,w,fs,f,__FILE__, __LINE__)) 
#define MAX_NR 10000
#define MIN_NR 1



/*
* ret:fun ret
* what:what value
* sf_flag:succuess or failure
*        1 succuess
*        0 failure
* 
*/


int check_error(int ret,int what,int sf_flag,const char* fun, const char* szFile, const int iLine)
{
   if(sf_flag == 1)
   {
       if(ret != what)
       {
           printf("error file:%s line:%d",szFile,iLine);
           perror(fun);
           exit(-1);
       }
   }
   else if(sf_flag == 0)
   {
       if(ret == what)
       {
           printf("error file:%s line:%d",szFile,iLine);
           perror(fun);
           exit(-1);
       }
   }
   return 0;
}

class AIOre
{
   public:
               int* buffer;
       virtual void Complete(int ret) = 0;
       AIOre()
       {
                       int ret = posix_memalign((void**)(&buffer),PAGE_SIZE,PAGE_SIZE);/* 分配一个4096(bytes)大小的4096对其内存空间 */
           CHECK_ERROR(ret,1,0,"posix_memalign");
       }
       virtual ~AIOre()
       {
                       printf("Virtual AIOre destory function to free this buffer!");
           free(buffer);
       }
};

class Adder
{
       public:
               virtual void Add(int amount) = 0;
               virtual ~Adder(){};
};

class AIORead:public AIOre
{
   private:
       Adder* adder;//父类指针
   public:
       AIORead(Adder* adder):AIOre()//父类指针指向子类对象
               {
                      this->adder = adder;
               }
       virtual void Complete(int res)
       {
           //return check
           int value = buffer[0];
                       printf("Read of %d Completed %d res ",value,res);
           //多态
           adder->Add(value);
       }

};

class AIOWrite:public AIOre
{
   private:
       int value;
   public:
       AIOWrite(int value):AIOre()
               {
       buffer[0] = value;
               this->value = value;
               }
       virtual void Complete(int res)
       {
           //error check
                       printf("Write of %d Completed %d \n",value,res);
       }
};

class AIOAdder:public Adder
{
   public:
       int fd;
       io_context_t ioctx;
       int counter; /* 偏移量 */
               int reap_counter; /* event个数 */
               int sum; /* */
               int length; /* 文件大小/PAGE_SIZE */

       AIOAdder(int length)
       {
           ioctx = 0;//必须初始化为0
           counter = 0;
                       reap_counter = 0;
           sum = 0;
           this->length = length;
       }

               void init() /* 初始化打开文件并且预分配文件大小 */
       {
           printf("Open file\n");
                       fd = open("test",O_RDWR|O_DIRECT|O_CREAT,0644); //必须包含O_DIRECT
           CHECK_ERROR(fd,0,-1,"open");
           printf("Allocating enough space for the sum\n");
           {
               int ret = fallocate(fd,0,0,PAGE_SIZE*length);/* 预先分配length*4096大小的文件 */
               CHECK_ERROR(fd,1,0,"fallocate");
           }
           printf("Setting the io Context\n");
           {
                               int ret = io_setup(100,&ioctx); /* 初始化ioctx*/
               CHECK_ERROR(ret,1,0,"io_setup");
           }
       }

       virtual void Add(int amount)
       {
           sum += amount;
           printf("Adding %d for toal of %d \n",amount,sum);
       }
       
       void submitwr()
       {
           printf("submitting a wirte to  %d \n",counter);
           struct iocb iocb;//建立一个异步I/O需求
           struct iocb* iocbs = &iocb;
                       AIORe *req = new AIOWrite(counter); /* 这里使用counter去初始化buffer buffer 4K大小 但是counter只有4 BYTES */
           /* void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset); */
           /* 初始化这个异步I/O需求 counter为偏移量 */
           io_prep_pwrite(&iocb,fd,req->buffer,PAGE_SIZE,counter*PAGE_SIZE);
                       iocb.data=req; /* 用户指针实际上就是本次提交Write操作的类对象指针用于释放buffer */
           int res = io_submit(ioctx,1,&iocbs);/* 提交这个I/O不会堵塞 */
           CHECK_ERROR(ret,1,0,"io_submit");
       }

       void writefile()
       {
           reap_counter = 0;
                       for(counter = 0;counter < length;counter++) /* 偏移量不断增加不断写入 */
           {
                               submitwr(); /* 异步提交操作 实际在多线程下本线程提交后则可以干其他事情了不会堵塞等待而耗费CPU */
                               reap(); /* 获得i/o状态 */
           }
                       reapremain();
       }

       void submitrd()
       {
           printf("submitting a read from  %d \n",counter);
           struct iocb iocb;//建立一个异步I/O需求
           struct iocb* iocbs = &iocb;
           AIORe *req = new AIORead(this);
           /* void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset); */
           io_prep_pread(&iocb,fd,req->buffer,PAGE_SIZE,counter*PAGE_SIZE);
           iocb.data = req;
           int res = io_submit(ioctx,1,&iocbs);
           CHECK_ERROR(ret,1,0,"io_submit");
           printf("test:%p %p\n",&iocb,iocbs);
       }

       void readfile()
       {
           reap_counter = 0;
           for(counter=0;counter<length;counter++)
           {
               submitrd();
               reap();//here paramter used
           }
           reap remaining();
       }

       int doreap(int min_nr)
       {
           printf("Reap between %ld and %ld io_events\n",min_nr,max_nr);//what mean
                       struct io_event* events = new io_event[MAX_NR]; /* pending I/O max support */
           struct timespec timeout;
           timeout.tv_sec = 0;
           timeout.tv_nsec = 100000000;
           int num_events;
           printf("Calling io_getevents");
                       num_events = io_getevents(ioctx,min_nr,MAX_NR,events,&timeout); /* 获得异步I/O event个数 */
                       CHECK_ERROR(num_events,-1,0,"io_getevents");
           printf("Calling completion function on results");
                       for(int i = 0;i<num_events;i++) /* 开始获取每一个event并且做相应处理 */
           {
                               struct io_event event = events[i];
                               AIORe* req = (AIORe*)(event.data); /* 多态AIORe可以是度或者写及 AIOWrite/AIORead */
               req->Complete(event.res);
                               delete req; /* 到这里一次I/O就完成了,删除内存对象包含buffer */
           }
           delete events;
           printf("Reaped %ld io_getevents",num_events);
                       reap_counter = num_events+reap_counter; /* 将event个数汇总 */
                       return num_events; /* 返回本次获取的event个数 */
       }
       
       void reap()
       {
                       if(counter >= MIN_NR) /* 如果大于了min_nr才开始reap */
           {
                               doreap(MIN_NR);
           }
       }

               void reapremain() /* 做最后的reap */
       {
           while(reap_counter<length)
           {
               doreap(1);
           }
       }

       ~AIOAdder()
       {
           printf("Closing AIO context and file");
           io_destroy(ioctx);
           colse(fd);
       }

               int Sum()
       {
           printf("Writing consecutive integers to file");
           writefile();
           printf("Rriting consecutive integers to file");
           readfile();
           return sum;
       }

};

int main()
{
   AIOAdder adder(10000);
   adder.init();/* 文件预先分配大小 */
   adder.writefile();
   adder.readfile();

}
270
image.png
相关文章
|
4月前
|
缓存 网络协议 Unix
Linux(UNIX)五种网络I/O模型与IO多路复用
Linux(UNIX)五种网络I/O模型与IO多路复用
109 0
|
6月前
|
存储 缓存 Linux
百度搜索:蓝易云【如何在Linux系统服务器中测试存储/磁盘I/O性能?】
这些工具可以帮助你测试磁盘的读取和写入性能,并提供各种性能指标和统计数据。请注意,在运行这些测试时,确保没有重要的数据存储在被测试的磁盘上,并谨慎操作以避免对系统和数据造成不必要的影响。
88 0
|
6月前
|
监控 网络协议 Java
I/O多路复用【Linux/网络】(C++实现select、poll和epoll服务器)(上)
I/O多路复用【Linux/网络】(C++实现select、poll和epoll服务器)
105 0
|
1月前
|
缓存 Ubuntu 网络协议
Linux系统编程之文件I/O函数的使用:介绍文件I/O函数的基本概念、用法和实现方式
Linux系统编程之文件I/O函数的使用:介绍文件I/O函数的基本概念、用法和实现方式
20 1
|
28天前
|
监控 网络协议 Linux
Linux I/O多路复用深入解析:从select到epoll的演进之路
Linux I/O多路复用深入解析:从select到epoll的演进之路
68 0
|
28天前
|
监控 算法 Unix
【Linux 异步操作】深入理解 Linux 异步通知机制:原理、应用与实例解析
【Linux 异步操作】深入理解 Linux 异步通知机制:原理、应用与实例解析
59 0
|
29天前
|
Unix Linux C++
【C/C++ 造轮子】Linux异步计时器:深入探讨和应用 (Linux Asynchronous Timers: An In-depth Exploration and Application)
【C/C++ 造轮子】Linux异步计时器:深入探讨和应用 (Linux Asynchronous Timers: An In-depth Exploration and Application)
58 1
|
5月前
|
存储 缓存 数据管理
深入理解Linux内核I/O机制:探索文件系统与设备驱动(上)
深入理解Linux内核I/O机制:探索文件系统与设备驱动
|
3月前
|
Java Unix Linux
Linux 系统-网络I/O模型
网络 I/O操作过程中会涉及到两个系统对象,一个是用户空间I/O操作的进程或者线程,另一个是内核 空间的内核系统,比如发生 I/O read操作时,它会经历两个阶段
29 0