#include <stdlib.h> #include <stdio.h> #include <pthread.h> #include "ringbuffer.h" static int b_flag = 0; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; #define TX_LOCK(lock) pthread_mutex_lock(&lock) #define TX_UNLOCK(lock) pthread_mutex_unlock(&lock) void* ReadBufferThread(void* param) { RingBuffer * pstRing = (RingBuffer *)param; FILE* fpOut = NULL; size_t ret = 0; uint8 szBuf[128*1024] = {0}; int nLen = 0; fpOut = fopen("file.bak", "w"); if (fpOut == NULL) { return NULL; } while(b_flag) { TX_LOCK(mutex); nLen = RingBuffer_read(pstRing, szBuf, sizeof(szBuf)); TX_UNLOCK(mutex); if (nLen) { ret = fwrite(szBuf, sizeof(char), nLen, fpOut); if (ret != nLen) { printf("fwrite failure!, ret = %d, nLen = %d ", ret, nLen); } } usleep(100); } fclose(fpOut); return NULL; } int main(int argc, char**argv) { FILE* fpIn = NULL; size_t ret = 0; uint8 szBuf[64*1024] = {0}; int nWriteLen = 0; int nOffset = 0; int nReReadFlag = 0; pthread_t threadID = 0; RingBuffer * pstRing = NULL; if (argc < 2) { printf("usage: %s file_to_read ", argv[0]); return -1; } pstRing = RingBuffer_create(2*1024*1024); if (pstRing == NULL) { printf("create ring buffer failure! "); return -1; } b_flag = 1; pthread_create(&threadID, NULL, ReadBufferThread, pstRing); fpIn = fopen(argv[1], "r"); if (fpIn == NULL) { b_flag = 0; pthread_cancel(threadID); pthread_join(threadID,NULL); return -1; } while(1) { if (nReReadFlag == 0) { ret = fread (szBuf, sizeof(char), sizeof(szBuf), fpIn) ; if (ret != sizeof(szBuf)) { printf("reach to the end of the file ret = %d, eof = %d! ",ret,feof(fpIn)); TX_LOCK(mutex); nWriteLen = RingBuffer_write(pstRing, szBuf, ret); TX_UNLOCK(mutex); if (nWriteLen != ret) { ret = ret - nWriteLen; nOffset = nWriteLen; nReReadFlag = 1; printf("here still need to write %d byte to buffer! ", ret); } else { ret = 0; nWriteLen = 0; nOffset = 0; nReReadFlag = 0; } break; } } TX_LOCK(mutex); nWriteLen = RingBuffer_write(pstRing, &szBuf[nOffset], ret); TX_UNLOCK(mutex); if (nWriteLen != ret) { printf("%d, still need to write %d, write len = %d, offset = %d ", __LINE__, ret, nWriteLen, nOffset); ret -= nWriteLen; nOffset += nWriteLen; nReReadFlag = 1; } else { printf("ret = %d, nWriteLen = %d ",ret, nWriteLen); nWriteLen = 0; nOffset = 0; ret = 0; nReReadFlag = 0; } usleep(200); } fclose(fpIn); printf("already finished read the file! "); while(ret) { printf("here again write %d byte... ", ret); TX_LOCK(mutex); nWriteLen = RingBuffer_write(pstRing, &szBuf[nOffset], ret); TX_UNLOCK(mutex); if (nWriteLen != ret) { ret -= nWriteLen; nOffset += nWriteLen; } else { ret = 0; nWriteLen = 0; nOffset = 0; } usleep(300); } while(1) { TX_LOCK(mutex); ret = RingBuffer_empty(pstRing); TX_UNLOCK(mutex); if (ret) { printf("the buffer is empty... "); break; } usleep(100); } RingBuffer_destroy(pstRing); b_flag = 0; pthread_cancel(threadID); pthread_join(threadID,NULL); return 0; }