线程安全的环形缓冲区实现

简介: 来源:http://blog.csdn.net/lezhiyong    应用背景:线程1将每次数量不一的音频采样点(PCM音频数据)写入环形缓冲区,线程2每次取固定数量采样点送音频编码器,线程1线程2在平均时间内的读写数据量相等。

来源:http://blog.csdn.net/lezhiyong
    应用背景:线程1将每次数量不一的音频采样点(PCM音频数据)写入环形缓冲区,线程2每次取固定数量采样点送音频编码器,线程1线程2在平均时间内的读写数据量相等。(倒入桶中的水量有时大有时小,但每次取一瓢喝:)
   该环形缓冲区借鉴CoolPlayer音频播放器中的环形缓冲区代码实现,在读写操作函数中加了锁,允许多线程同时操作。CPs_CircleBuffer基于内存段的读写,比用模板实现的环形缓冲队列适用的数据类型更广些, CPs_CircleBuffer修改成C++中基于对象的实现,加上详细注释,m_csCircleBuffer锁变量为自用的lock类型(将CRITICAL_SECTION封装起来),调用lock()加锁,调用unlock()解锁。使用效果良好,分享出来。

CPs_CircleBuffer环形缓冲还不具备当待写数据量超出空余缓冲时自动分配内存的功能,这个将在后续进行优化。

CPs_CircleBuffer使用步骤:

 

[cpp]  view plain copy
  1. 1、创建对象  
  2. CPs_CircleBuffer* m_pCircleBuffer;  
  3. m_pCircleBuffer = new CPs_CircleBuffer(bufsize);  
  4. 2、写  
  5. if (m_pCircleBuffer->GetFreeSize() < CIC_READCHUNKSIZE)  
  6.  {  
  7.      Sleep(20);  
  8.      continue;  
  9.  }  
  10. m_pCircleBuffer->Write(internetbuffer.lpvBuffer,internetbuffer.dwBufferLength);  
  11. 3、读  
  12. m_pCircleBuffer->Read(pDestBuffer,iBytesToRead, piBytesRead);  
  13.    
  14. 4、其他调用  
  15. if(m_pCircleBuffer->IsComplete())  
  16.     break;          
  17. iUsedSpace =m_pCircleBuffer->GetUsedSize();  
  18. m_pCircleBuffer->SetComplete();  

 

CPs_CircleBuffer修改为类的定义:

[cpp]  view plain copy
  1. class  CPs_CircleBuffer  
  2. {  
  3. public:  
  4.        CPs_CircleBuffer(const unsigned int iBufferSize);  
  5.        ~CPs_CircleBuffer();  
  6. public:  
  7.         // Public functions  
  8.         void  Uninitialise();  
  9.         void  Write(const void* pSourceBuffer, const unsigned int iNumBytes);  
  10.         bool  Read(void* pDestBuffer, const size_t iBytesToRead, size_t* pbBytesRead);  
  11.         void  Flush();  
  12.         unsigned int GetUsedSize();  
  13.         unsigned int GetFreeSize();  
  14.         void  SetComplete();  
  15.         bool  IsComplete();  
  16.   
  17. private:         
  18.         unsigned char*  m_pBuffer;  
  19.         unsigned int    m_iBufferSize;  
  20.         unsigned int    m_iReadCursor;  
  21.         unsigned int    m_iWriteCursor;  
  22.         HANDLE          m_evtDataAvailable;  
  23.         Vlock           m_csCircleBuffer;  
  24.         bool            m_bComplete;        
  25. };  

CPs_CircleBuffer修改为类的实现:

[cpp]  view plain copy
  1. #define CIC_WAITTIMEOUT  3000  
  2.   
  3. CPs_CircleBuffer::CPs_CircleBuffer(const unsigned int iBufferSize)  
  4. {  
  5.     m_iBufferSize = iBufferSize;  
  6.     m_pBuffer = (unsigned char*)malloc(iBufferSize);  
  7.     m_iReadCursor = 0;  
  8.     m_iWriteCursor = 0;  
  9.     m_bComplete = false;  
  10.     m_evtDataAvailable = CreateEvent(NULL, FALSE, FALSE, NULL);  
  11. }  
  12.   
  13. CPs_CircleBuffer::~CPs_CircleBuffer()  
  14. {  
  15.     Uninitialise();  
  16. }  
  17.   
  18. // Public functions  
  19. void CPs_CircleBuffer::Uninitialise()//没有必要public这个接口函数,long120817  
  20. {  
  21.     CloseHandle(m_evtDataAvailable);  
  22.     free(m_pBuffer);  
  23. }  
  24.   
  25. //Write前一定要调用m_pCircleBuffer->GetFreeSize(),如果FreeSize不够需要等待,long120817  
  26.   
  27. void  CPs_CircleBuffer::Write(const void* _pSourceBuffer, const unsigned int _iNumBytes)  
  28. {  
  29.     unsigned int iBytesToWrite = _iNumBytes;  
  30.     unsigned char* pSourceReadCursor = (unsigned char*)_pSourceBuffer;  
  31.   
  32.     //CP_ASSERT(iBytesToWrite <= GetFreeSize());//修改为没有足够空间就返回,write前一定要加GetFreeSize判断,否则进入到这里相当于丢掉数据,         // long120817  
  33.     if (iBytesToWrite > GetFreeSize())  
  34.     {  
  35.         return;  
  36.     }  
  37.     _ASSERT(m_bComplete == false);  
  38.   
  39.     m_csCircleBuffer.Lock();  
  40.   
  41.     if (m_iWriteCursor >= m_iReadCursor)  
  42.     {  
  43.         //              0                                            m_iBufferSize  
  44.         //              |-----------------|===========|--------------|  
  45.         //                                pR->        pW->   
  46.         // 计算尾部可写空间iChunkSize,long120817  
  47.         unsigned int iChunkSize = m_iBufferSize - m_iWriteCursor;  
  48.   
  49.         if (iChunkSize > iBytesToWrite)  
  50.         {  
  51.             iChunkSize = iBytesToWrite;  
  52.         }  
  53.   
  54.         // Copy the data  
  55.         memcpy(m_pBuffer + m_iWriteCursor,pSourceReadCursor, iChunkSize);  
  56.   
  57.         pSourceReadCursor += iChunkSize;  
  58.   
  59.         iBytesToWrite -= iChunkSize;  
  60.   
  61.         // 更新m_iWriteCursor  
  62.         m_iWriteCursor += iChunkSize;  
  63.   
  64.         if (m_iWriteCursor >= m_iBufferSize)//如果m_iWriteCursor已经到达末尾  
  65.             m_iWriteCursor -= m_iBufferSize;//返回到起点0位置,long120817  
  66.   
  67.     }  
  68.   
  69.     //剩余数据从Buffer起始位置开始写  
  70.     if (iBytesToWrite)  
  71.     {  
  72.         memcpy(m_pBuffer + m_iWriteCursor,pSourceReadCursor, iBytesToWrite);  
  73.         m_iWriteCursor += iBytesToWrite;  
  74.         _ASSERT(m_iWriteCursor < m_iBufferSize);//这个断言没什么意思,应该_ASSERT(m_iWriteCursor <= m_iReadCursor);long20120817  
  75.     }  
  76.   
  77.     SetEvent(m_evtDataAvailable);//设置数据写好信号量  
  78.   
  79.     m_csCircleBuffer.UnLock();  
  80. }  
  81.   
  82. bool  CPs_CircleBuffer::Read(void* pDestBuffer, const size_t _iBytesToRead, size_t* pbBytesRead)  
  83. {  
  84.     size_t iBytesToRead = _iBytesToRead;  
  85.     size_t iBytesRead = 0;  
  86.     DWORD dwWaitResult;  
  87.     bool bComplete = false;  
  88.   
  89.     while (iBytesToRead > 0 && bComplete == false)  
  90.     {  
  91.         dwWaitResult = WaitForSingleObject(m_evtDataAvailable, CIC_WAITTIMEOUT);//等待数据写好,long120817  
  92.   
  93.         if (dwWaitResult == WAIT_TIMEOUT)  
  94.         {  
  95.             //TRACE_INFO2("Circle buffer - did not fill in time!");  
  96.             *pbBytesRead = iBytesRead;  
  97.             return FALSE;//等待超时则返回  
  98.         }  
  99.   
  100.         m_csCircleBuffer.Lock();  
  101.   
  102.         if (m_iReadCursor > m_iWriteCursor)  
  103.         {  
  104.             //              0                                                    m_iBufferSize  
  105.             //              |=================|-----|===========================|  
  106.             //                                pW->  pR->   
  107.             unsigned int iChunkSize = m_iBufferSize - m_iReadCursor;  
  108.   
  109.             if (iChunkSize > iBytesToRead)  
  110.                 iChunkSize = (unsigned int)iBytesToRead;  
  111.   
  112.             //读取操作  
  113.             memcpy((unsigned char*)pDestBuffer + iBytesRead,m_pBuffer + m_iReadCursor,iChunkSize);  
  114.   
  115.             iBytesRead += iChunkSize;  
  116.             iBytesToRead -= iChunkSize;  
  117.   
  118.             m_iReadCursor += iChunkSize;  
  119.   
  120.             if (m_iReadCursor >= m_iBufferSize)//如果m_iReadCursor已经到达末尾  
  121.                 m_iReadCursor -= m_iBufferSize;//返回到起点0位置,long120817  
  122.         }  
  123.   
  124.         if (iBytesToRead && m_iReadCursor < m_iWriteCursor)  
  125.         {  
  126.             unsigned int iChunkSize = m_iWriteCursor - m_iReadCursor;  
  127.   
  128.             if (iChunkSize > iBytesToRead)  
  129.                 iChunkSize = (unsigned int)iBytesToRead;  
  130.   
  131.             //读取操作  
  132.             memcpy((unsigned char*)pDestBuffer + iBytesRead,m_pBuffer + m_iReadCursor,iChunkSize);  
  133.   
  134.             iBytesRead += iChunkSize;  
  135.             iBytesToRead -= iChunkSize;  
  136.             m_iReadCursor += iChunkSize;  
  137.         }  
  138.   
  139.         //如果有更多的数据要写  
  140.         if (m_iReadCursor == m_iWriteCursor)  
  141.         {  
  142.             if (m_bComplete)//跳出下一个while循环,该值通过SetComplete()设置,此逻辑什么意思?long120817  
  143.                 bComplete = true;  
  144.         }  
  145.         else//还有数据可以读,SetEvent,在下一个while循环开始可以不用再等待,long120817  
  146.             SetEvent(m_evtDataAvailable);  
  147.   
  148.         m_csCircleBuffer.UnLock();  
  149.     }  
  150.   
  151.     *pbBytesRead = iBytesRead;  
  152.   
  153.     return bComplete ? false : true;  
  154.   
  155. }  
  156. //  0                                                m_iBufferSize  
  157. //  |------------------------------------------------|  
  158. //  pR  
  159. //  pW  
  160. //读写指针归零  
  161. void  CPs_CircleBuffer::Flush()  
  162. {  
  163.     m_csCircleBuffer.Lock();  
  164.     m_iReadCursor = 0;  
  165.     m_iWriteCursor = 0;  
  166.     m_csCircleBuffer.UnLock();  
  167.   
  168. }  
  169. //获取已经写的内存  
  170. unsigned int CPs_CircleBuffer::GetUsedSize()  
  171. {  
  172.      return m_iBufferSize - GetFreeSize();  
  173.   
  174. }  
  175.   
  176.   
  177. unsigned int CPs_CircleBuffer::GetFreeSize()  
  178. {  
  179.     unsigned int iNumBytesFree;  
  180.   
  181.     m_csCircleBuffer.Lock();  
  182.   
  183.     if (m_iWriteCursor < m_iReadCursor)  
  184.     {  
  185.         //              0                                                    m_iBufferSize  
  186.         //              |=================|-----|===========================|  
  187.         //                                pW->  pR->   
  188.         iNumBytesFree = (m_iReadCursor - 1) - m_iWriteCursor;  
  189.     }  
  190.     else if (m_iWriteCursor == m_iReadCursor)  
  191.     {  
  192.         iNumBytesFree = m_iBufferSize;  
  193.     }  
  194.     else  
  195.     {  
  196.         //              0                                                    m_iBufferSize  
  197.         //              |-----------------|=====|---------------------------|  
  198.         //                                pR->   pW->   
  199.         iNumBytesFree = (m_iReadCursor - 1) + (m_iBufferSize - m_iWriteCursor);  
  200.     }  
  201.   
  202.     m_csCircleBuffer.UnLock();  
  203.   
  204.     return iNumBytesFree;  
  205.   
  206. }  
  207. //该函数什么时候调用?long120817  
  208. void  CPs_CircleBuffer::SetComplete()  
  209. {  
  210.     m_csCircleBuffer.Lock();  
  211.     m_bComplete = true;  
  212.     SetEvent(m_evtDataAvailable);  
  213.     m_csCircleBuffer.UnLock();  
  214. }  

 

附自动初始化和摧毁的锁对象Vlock的实现:

[cpp]  view plain copy
    1. #ifdef WIN32  
    2. #include <windows.h>  
    3.   
    4. #define  V_MUTEX            CRITICAL_SECTION //利用临界区实现的锁变量  
    5. #define  V_MUTEX_INIT(m)        InitializeCriticalSection(m)  
    6. #define  V_MUTEX_LOCK(m)        EnterCriticalSection(m)  
    7. #define  V_MUTEX_UNLOCK(m)      LeaveCriticalSection(m)  
    8. #define  V_MUTEX_DESTORY(m)     DeleteCriticalSection(m)  
    9.   
    10. #else  
    11.   
    12. #define  V_MUTEX                pthread_mutex_t  
    13. #define  V_MUTEX_INIT(m)        pthread_mutex_init(m,NULL)  
    14. #define  V_MUTEX_LOCK(m)        pthread_mutex_Lock(m)  
    15. #define  V_MUTEX_UNLOCK(m)      pthread_mutex_unLock(m)  
    16. #define  V_MUTEX_DESTORY(m)     pthread_mutex_destroy(m)  
    17.   
    18. #endif  
    19.   
    20.   
    21. class  Vlock  
    22. {  
    23. public:  
    24.     Vlock(void)  
    25.     {  
    26.         V_MUTEX_INIT(&m_Lock);  
    27.     }  
    28.     ~Vlock(void)  
    29.     {  
    30.         V_MUTEX_DESTORY(&m_Lock);  
    31.     }  
    32. public:  
    33.     void Lock(){V_MUTEX_LOCK(&m_Lock);}  
    34.     void UnLock(){V_MUTEX_UNLOCK(&m_Lock);}  
    35. private:  
    36.     V_MUTEX m_Lock;  
    37. };  
目录
相关文章
|
7月前
|
存储 Java C++
内存模型和Java内存区域
内存模型和Java内存区域
|
29天前
|
存储 程序员 编译器
【C/C++ 堆栈以及虚拟内存分段 】C/C++内存分布/管理:代码区、数据区、堆区、栈区和常量区的探索
【C/C++ 堆栈以及虚拟内存分段 】C/C++内存分布/管理:代码区、数据区、堆区、栈区和常量区的探索
27 0
|
3月前
|
定位技术
环形缓冲区RingBuff
环形缓冲区RingBuff
|
10月前
|
存储 Java
Java数组的内存结构
Java数组的内存结构
|
12月前
|
存储
环形缓冲区
环形缓冲区 是一段 先进先出 的循环缓冲区,有一定的大小,我们可以把它抽象理解为一块环形的内存。 我们使用环形缓冲区主要有两个原因; (1)当我们要存储大量数据时,我们的计算机只能处理先写入的数据,处理完毕释放数据后,后面的数据需要前移一位,大量的数据会频繁分配释放内存,从而导致很大的开销。使用环形缓冲区 可以减少内存分配继而减少系统的开销。 (2)如果我们频繁快速的持续向计算机输入数据,计算机可能执行某个进程不能及时的执行输入的数据,导致数据丢失。这时,我们可以将要输入的数据放入环形缓冲区内,计算机就不会造成数据丢失。
63 0
|
12月前
|
存储 安全 C语言
C语言实现环形缓冲区
C语言实现环形缓冲区
149 0
|
存储 缓存 算法
怎么理解Java内存区域
怎么理解Java内存区域
101 0
怎么理解Java内存区域
|
存储 程序员 编译器
关于栈区、堆区、全局区、文字常量区、程序代码区
一个由C/C++编译的程序占用的内存分为以下几个部分: 栈区、栈区、堆区、全局区、文字常量区、程序代码区
149 0
|
算法 Java 程序员
内存结构和垃圾回收算法
做JAVA也有接近2年的时间了,公司的leader说,做JAVA,三年是个坎,如果过了三年你还没有去研究JVM的话,那么你这个程序员只能是板砖的工具了。恰逢辞职,来个JVM的解析可好?
内存结构和垃圾回收算法