socket/xmlBlasterZlib.c

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------------
00002 Name:      xmlBlasterZlib.c
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Contains some socket specific helper methods
00006 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
00007 -----------------------------------------------------------------------------*/
00008 #include <stdio.h>
00009 #include <string.h>
00010 #include <assert.h>
00011 #include <socket/xmlBlasterZlib.h>
00012 #include <socket/xmlBlasterSocket.h>  /* writen() */
00013 
00014 #if XMLBLASTER_ZLIB==1
00015 
00016 /*
00017 TODO: 
00018  valgrind reports with zlib-1.2.1
00019    ==24008== Conditional jump or move depends on uninitialised value(s)
00020    ==24008==    at 0x8051823: longest_match (deflate.c:949)
00021    ==24008==    by 0x8052691: deflate_slow (deflate.c:1422)
00022    ==24008==    by 0x8050F19: deflate (deflate.c:630)
00023    ==24008==    by 0x804BE78: xmlBlaster_writenCompressed (xmlBlasterZlib.c:102)
00024 
00025  See a discussion of this:
00026   http://www.groupsrv.com/science/viewtopic.php?t=19234&start=0&postdays=0&postorder=asc&highlight=
00027   (http://www.gammon.com.au/forum/bbshowpost.php?bbsubject_id=4000)
00028 
00029 Example:
00030    XmlBlasterZlibWriteBuffers zlibWriteBuf;
00031 
00032    if (xmlBlaster_initZlibWriter(zlibWriteBufP) != Z_OK) return -1;
00033    
00034    while () {
00035       ...
00036       ssize_t num = xmlBlaster_writenCompressed(&zlibWriteBuf, socketFd, buffer, nbytes);
00037    }
00038 
00039    
00040    XmlBlasterZlibReadBuffers zlibReadBuf;
00041    if (xmlBlaster_initZlibReader(&zlibReadBuf) != Z_OK) return -1;
00042 
00043 */
00044 
00048 static void dumpZlib(const char *p, XmlBlasterZlibReadBuffers *zlibReadBufP, XmlBlasterZlibWriteBuffers *zlibWriteBufP) {
00049         z_stream *zlibP = (zlibReadBufP!=0) ? &zlibReadBufP->c_stream : &zlibWriteBufP->c_stream;
00050    printf("[%s:%d] %s\n", __FILE__, __LINE__, p);
00051    printf("{\n");
00052         if (zlibReadBufP!=0) {
00053            /*printf("  compBufferP     ="PRINTF_PREFIX_UINT64_T"\n", (uint64_t)zlibReadBufP->compBuffer);*/
00054            printf("  compBufferP     =%p\n", zlibReadBufP->compBuffer);
00055            printf("  currCompBufferP =%p\n", zlibReadBufP->currCompBufferP);
00056            printf("  currCompBytes   =%d\n", (int)zlibReadBufP->currCompBytes);
00057         }
00058    printf("  zlibP->next_in  =%p\n", zlibP->next_in);
00059    printf("  zlibP->avail_in =%d\n", (int)zlibP->avail_in);
00060    printf("  zlibP->next_out =%p\n", zlibP->next_out);
00061    printf("  zlibP->avail_out=%u\n", zlibP->avail_out);
00062    printf("}\n");
00063 }
00064 
00068 int xmlBlaster_initZlibWriter(XmlBlasterZlibWriteBuffers *zlibWriteBufP) {
00069    int err;
00070 
00071    if (!zlibWriteBufP) return Z_BUF_ERROR;
00072    memset(zlibWriteBufP, 0, sizeof(XmlBlasterZlibWriteBuffers));
00073 
00074    zlibWriteBufP->debug = false;
00075 
00076    zlibWriteBufP->c_stream.zalloc = (alloc_func)0;
00077    zlibWriteBufP->c_stream.zfree = (free_func)0;
00078    zlibWriteBufP->c_stream.opaque = (voidpf)0;
00079    zlibWriteBufP->c_stream.data_type = Z_BINARY;
00080 
00081    err = deflateInit(&zlibWriteBufP->c_stream, Z_DEFAULT_COMPRESSION);
00082    if (err != Z_OK) {
00083       fprintf(stderr, "[%s:%d] deflateInit error: %s\n", __FILE__, __LINE__, zError(err));
00084    }
00085    return err;
00086 }
00087 
00088 
00092 ssize_t xmlBlaster_writenCompressed(XmlBlasterZlibWriteBuffers *zlibWriteBufP, const int fd, const char * const ptr, const size_t nbytes)
00093 {
00094    size_t written = 0; /* The written bytes of the compressed stream */
00095    if (!zlibWriteBufP) {
00096       fprintf(stderr, "[%s:%d] Internal error: XmlBlasterZlibWriteBuffers is NULL\n", __FILE__, __LINE__);
00097       return -1;
00098    }
00099 
00100    {
00101       z_stream *zlibP = &zlibWriteBufP->c_stream;
00102       bool onceMore = false;
00103 
00104       /* Initialize zlib buffer pointers */
00105       zlibP->next_in  = (Bytef*)ptr;
00106       zlibP->avail_in = (uInt)nbytes;
00107       zlibP->next_out = zlibWriteBufP->compBuffer;
00108       zlibP->avail_out = XMLBLASTER_ZLIB_WRITE_COMPBUFFER_LEN;
00109 
00110       if (zlibWriteBufP->debug) dumpZlib("writen(): Before while", 0, zlibWriteBufP);
00111 
00112       /* Compress and write to socket */
00113 
00114       while (zlibP->avail_in > 0 || onceMore) {
00115          int status = deflate(zlibP, Z_SYNC_FLUSH);
00116          if (zlibWriteBufP->debug) dumpZlib("writen(): In while after compress", 0, zlibWriteBufP);
00117          if (status != Z_OK) {
00118             fprintf(stderr, "[%s:%d] deflate error during sending of %u bytes: %s\n", __FILE__, __LINE__, (unsigned int)nbytes, zError(status));
00119             return -1;
00120          }
00121          onceMore = zlibP->avail_out == 0; /* && Z_OK which is checked above already */
00122          if ((XMLBLASTER_ZLIB_WRITE_COMPBUFFER_LEN - zlibP->avail_out) > 0) {
00123          /*if (zlibP->avail_out <= 6) { */ /* with Z_SYNC_FLUSH we should not go down to zero */
00124             ssize_t ret = writen(fd, (char *)zlibWriteBufP->compBuffer, XMLBLASTER_ZLIB_WRITE_COMPBUFFER_LEN-zlibP->avail_out);
00125             if (ret == -1) return -1;
00126             written += ret;
00127             zlibP->next_out = zlibWriteBufP->compBuffer;
00128             zlibP->avail_out = XMLBLASTER_ZLIB_WRITE_COMPBUFFER_LEN;
00129          }
00130       }
00131       if (zlibWriteBufP->debug) dumpZlib("writen(): After compress", 0, zlibWriteBufP);
00132       /*
00133       if ((XMLBLASTER_ZLIB_WRITE_COMPBUFFER_LEN - zlibP->avail_out) > 0) {
00134          ret = writen(fd, (char *)zlibWriteBufP->compBuffer, XMLBLASTER_ZLIB_WRITE_COMPBUFFER_LEN-zlibP->avail_out);
00135          if (ret == -1) return -1;
00136          written += ret;
00137       }
00138       */
00139       if (zlibWriteBufP->debug) printf("deflate - compressed %u bytes to %u\n", (unsigned int)nbytes, (unsigned int)written);
00140 
00141       return nbytes; /*written*/
00142    }
00143 }
00144 
00145 
00149 int xmlBlaster_endZlibWriter(XmlBlasterZlibWriteBuffers *zlibWriteBufP) {
00150    int err;
00151    if (!zlibWriteBufP) return Z_BUF_ERROR;
00152    if (zlibWriteBufP->debug) dumpZlib("writen(): After compress", 0, zlibWriteBufP);
00153    err = deflateEnd(&zlibWriteBufP->c_stream);
00154    if (err != Z_OK) {
00155       /* TODO: Why does it return "-3 == data error"? Seems to be in BUSY_STATE */
00156       /*fprintf(stderr, "[%s:%d] deflateEnd error: %s\n", __FILE__, __LINE__, zError(err));*/
00157       return -1;
00158    }
00159    return err;
00160 }
00161 
00162 
00166 int xmlBlaster_initZlibReader(XmlBlasterZlibReadBuffers *zlibReadBufP) {
00167    int err;
00168 
00169    if (!zlibReadBufP) return Z_BUF_ERROR;
00170    memset(zlibReadBufP, 0, sizeof(XmlBlasterZlibReadBuffers));
00171 
00172    zlibReadBufP->debug = false;
00173 
00174    zlibReadBufP->c_stream.zalloc = (alloc_func)0;
00175    zlibReadBufP->c_stream.zfree = (free_func)0;
00176    zlibReadBufP->c_stream.opaque = (voidpf)0;
00177 
00178    err = inflateInit(&zlibReadBufP->c_stream);
00179    if (err != Z_OK) {
00180       fprintf(stderr, "[%s:%d] inflateInit error: %s\n", __FILE__, __LINE__, zError(err));
00181    }
00182 
00183    zlibReadBufP->currCompBufferP = zlibReadBufP->compBuffer;
00184    zlibReadBufP->currCompBytes = 0;
00185 
00186    return err;
00187 }
00188 
00189 
00193 ssize_t xmlBlaster_readnCompressed(XmlBlasterZlibReadBuffers *zlibReadBufP, int fd, char *ptr, size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2)
00194 {
00195    uInt readBytes = 0;     /* The read, uncompressed bytes */
00196 
00197    if (!zlibReadBufP) {
00198       fprintf(stderr, "[%s:%d] Internal error: XmlBlasterZlibReadBuffers is NULL\n", __FILE__, __LINE__);
00199       return -1;
00200    }
00201 
00202    {
00203       z_stream *zlibP = &zlibReadBufP->c_stream;
00204 
00205       if (zlibReadBufP->debug) printf("[%s:%d] Entering readnCompressed ...\n", __FILE__, __LINE__);
00206 
00207       /* Initialize zlib buffer pointers */
00208       zlibP->next_out = (Bytef*)ptr;
00209       zlibP->avail_out = (uInt)nbytes;
00210 
00211       if (zlibReadBufP->currCompBytes == 0)
00212          zlibReadBufP->currCompBufferP = zlibReadBufP->compBuffer;
00213       zlibP->next_in  = zlibReadBufP->currCompBufferP;
00214       zlibP->avail_in = zlibReadBufP->currCompBytes;
00215 
00216       if (zlibReadBufP->debug) dumpZlib("readn(): Before do", zlibReadBufP, 0);
00217 
00218       /* Read from socket and uncompress */
00219       do {
00220          if (fpNumRead != 0) {
00221             fpNumRead(userP2, (size_t)zlibP->next_out-(size_t)ptr, nbytes); /* Callback with current status */
00222          }
00223          if (zlibP->avail_out == 0) {
00224             if (zlibReadBufP->debug) printf("[%s:%d] readCompress() we are done with nbytes=%u currCompBytes=%u\n", __FILE__, __LINE__, (unsigned int)nbytes, zlibReadBufP->currCompBytes);
00225             if (fpNumRead != 0) {
00226                fpNumRead(userP2, nbytes, nbytes);
00227             }
00228             return nbytes;
00229          }
00230 
00231          if (zlibReadBufP->currCompBytes == 0 && readBytes != nbytes) {
00232             const int flag = 0;
00233             ssize_t nCompRead;
00234             if (zlibReadBufP->debug) printf("[%s:%d] recv() readBytes=%u, nbytes=%u, currCompBytes=%u\n", __FILE__, __LINE__, readBytes, (unsigned int)nbytes, zlibReadBufP->currCompBytes);
00235             zlibReadBufP->currCompBufferP = zlibReadBufP->compBuffer;
00236          /* currCompBufferP is of type "Bytef *" which is a "unsigned char *" */
00237             nCompRead = recv(fd, (char*)zlibReadBufP->currCompBufferP, (int)XMLBLASTER_ZLIB_READ_COMPBUFFER_LEN, flag); /* TODO: do we need at least two bytes?? */
00238             if (nCompRead == -1 || nCompRead == 0) { /* 0 is not possible as we are blocking */
00239                if (zlibReadBufP->debug) printf("[%s:%d] EOF during reading of %u bytes\n", __FILE__, __LINE__, (unsigned int)(nbytes-readBytes));
00240                return -1;
00241             }
00242             zlibReadBufP->currCompBytes += (uInt)nCompRead;
00243             zlibP->next_in  = zlibReadBufP->currCompBufferP;
00244             zlibP->avail_in = zlibReadBufP->currCompBytes;
00245             if (zlibReadBufP->debug) dumpZlib("readn(): recv() returned", zlibReadBufP, 0);
00246          }
00247 
00248          while (zlibP->avail_in > 0 && zlibP->avail_out > 0) {
00249             int status = inflate(zlibP, Z_SYNC_FLUSH);
00250             if (status != Z_OK) {
00251                fprintf(stderr, "[%s:%d] inflate error during reading of %u bytes: %s\n", __FILE__, __LINE__, (unsigned int)nbytes, zError(status));
00252                return -1;
00253             }
00254             zlibReadBufP->currCompBufferP = zlibP->next_in;
00255             zlibReadBufP->currCompBytes = zlibP->avail_in;
00256             if (zlibReadBufP->debug) dumpZlib("readn(): inflate() returned", zlibReadBufP, 0);
00257             if (zlibP->avail_out == 0) {
00258                if (zlibReadBufP->debug) printf("[%s:%d] readCompress() we are done with nbytes=%u\n", __FILE__, __LINE__, (unsigned int)nbytes);
00259                if (fpNumRead != 0) {
00260                   fpNumRead(userP2, nbytes, nbytes);
00261                }
00262                return nbytes;
00263             }
00264          }
00265       } while(true);
00266 
00267       /* check if bytes are available
00268             int hasMoreBytes;
00269             fd_set fds;
00270             FD_ZERO(&fds);
00271             FD_SET(fd, &fds);
00272             hasMoreBytes = select (fd+1, &fds, NULL, NULL, NULL);
00273       */
00274    }
00275 }
00276 
00277 
00281 int xmlBlaster_endZlibReader(XmlBlasterZlibReadBuffers *zlibReadBufP) {
00282    int err;
00283    if (!zlibReadBufP) return Z_BUF_ERROR;
00284    err = inflateEnd(&zlibReadBufP->c_stream);
00285    if (err != Z_OK) {
00286       fprintf(stderr, "[%s:%d] inflateEnd error: %s\n", __FILE__, __LINE__, zError(err));
00287       return -1;
00288    }
00289    return err;
00290 }
00291 
00292 #else
00293 
00294 /* If no zlib is available use dummies: */
00295 int xmlBlaster_initZlibWriter(XmlBlasterZlibWriteBuffers *zlibWriteBufP) { 
00296    fprintf(stderr, "No support for zlib is compiled, try with -DXMLBLASTER_ZLIB=1\n");
00297    assert(0);
00298    zlibWriteBufP = 0;/* to supress compiler warning */
00299    return 0;
00300 }
00301 ssize_t xmlBlaster_writenCompressed(XmlBlasterZlibWriteBuffers *zlibWriteBufP, const int fd, const char * const ptr, const size_t nbytes) { 
00302    if (fd && ptr && nbytes) zlibWriteBufP = 0;/* to supress compiler warning */
00303    return 0;
00304 }
00305 int xmlBlaster_endZlibWriter(XmlBlasterZlibWriteBuffers *zlibWriteBufP) { zlibWriteBufP = 0;/* to supress compiler warning */ return -1; }
00306 int xmlBlaster_initZlibReader(XmlBlasterZlibReadBuffers *zlibReadBufP) { zlibReadBufP = 0;/* to supress compiler warning */ return -1; }
00307 ssize_t xmlBlaster_readnCompressed(XmlBlasterZlibReadBuffers *zlibReadBufP, int fd, char *ptr, size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
00308    if (fd && ptr && nbytes && fpNumRead && userP2) zlibReadBufP = 0;/* to supress compiler warning */
00309    return 0;
00310 }
00311 int xmlBlaster_endZlibReader(XmlBlasterZlibReadBuffers *zlibReadBufP) { zlibReadBufP = 0;/* to supress compiler warning */ return -1; }
00312 
00313 # endif /* XMLBLASTER_ZLIB */
00314