#include #include #include "tlmplib.h" #include PRIVATE void STREAMP_BUF::init (int size, int incr) { bufsize = size; bufincr = incr; buf = (char*)malloc(size); last = 0; offset = 0; } PUBLIC STREAMP_BUF::STREAMP_BUF() { init (10000,10000); } PUBLIC STREAMP_BUF::STREAMP_BUF( int _bufsize, // Internal buffer size int incr) // How to enlarge the buffer { init (_bufsize,incr); } PUBLIC STREAMP_BUF::~STREAMP_BUF() { free(buf); } /* Pack the buffer by moving the data at the front. This is done only if we are a little short at the end or if force is true */ PUBLIC void STREAMP_BUF::pack(bool force) { int rest = bufsize - last; if (offset > 0 && (force || rest < bufincr/2)){ // Not much room at the end, try copying the valid // bytes at the start of the buffer int len = last-offset; if (len > 0){ memmove (buf,buf+offset,len); } offset = 0; last = len; } } PUBLIC void STREAMP_BUF::pack() { pack (false); } PUBLIC void STREAMP_BUF::reset() { offset = last = 0; } /* Return true if there are no bytes available in the buffer */ PUBLIC bool STREAMP_BUF::isempty() const { return offset == last; } /* Return the empty space available at the end of the buffer */ PUBLIC int STREAMP_BUF::getspace() const { return bufsize - last; } /* Return a pointer where we can add bytes in the buffer. getspace() return how much bytes may be added. */ PUBLIC char *STREAMP_BUF::getinputbuf() { return buf+last; } /* Return a pointer to the first available byte. getnbbytes() return the number of bytes available. */ PUBLIC const char *STREAMP_BUF::getbytes() { return buf+offset; } /* Return the nunber of bytes available in the buffer. getbytes return a pointer to the first. */ PUBLIC int STREAMP_BUF::getnbbytes() { return last-offset; } /* Note how much bytes were added to the buffer */ PUBLIC void STREAMP_BUF::setadded (int len) { last += len; } PUBLIC void STREAMP_BUF::setused (int len) { offset += len; } /* Make sure we have enough space to hold len new bytes */ PUBLIC void STREAMP_BUF::checkalloc (int len) { if (getspace() < len){ // Ok, there is not enough space at the end of the buffer // Maybe it is time to pack pack(true); int extra = len - getspace(); if (extra > 0){ // Ok time to grow int incr = extra > bufincr ? extra + bufincr : bufincr; bufsize += incr; buf = (char*)realloc(buf,bufsize); assert (buf != NULL); } } } struct STREAMP_PRIVATE{ _F_STREAMP &c; STREAMP_BUF *buf; bool fill_needed; bool nomore; int valid; // How many bytes may be taken by getrecord STREAMP_PRIVATE(_F_STREAMP &_c, STREAMP_BUF *_buf) : c(_c){ buf = _buf; fill_needed = true; nomore = false; valid = 0; } }; int _F_STREAMP::fill (void *buf, int size, bool &end) { return -1; } int _F_STREAMP::process (const void *buf, int len, bool &end, bool nomore) { return -1; } int _F_STREAMP::validrecord (const void *buf, int len, bool &end, bool nomore) { return -1; } PUBLIC STREAMP::STREAMP(_F_STREAMP &c) { STREAMP_BUF *buf = new STREAMP_BUF; priv = new STREAMP_PRIVATE(c,buf); c.priv = priv; } PUBLIC STREAMP::~STREAMP() { delete priv->buf; delete priv; } /* Move the data at then end of the buffer to the beginning to make some room, if there is not enough. */ PRIVATE void STREAMP::pack() { priv->buf->pack(); } /* Let the object handle the show, dispatching between fill() and process() */ PUBLIC void STREAMP::loop() { bool end = false; int nothing = 0; while (1){ nothing++; // fprintf (stderr,"fill %d %d nothing %d\n",priv->offset,priv->size,nothing); if ((priv->fill_needed || priv->buf->isempty()) && !priv->nomore){ pack(); int rest = priv->buf->getspace(); if (rest > 10000){ int len = priv->c.fill (priv->buf->getinputbuf(),rest,end); if (len > 0){ priv->buf->setadded (len); nothing = 0; }else if (len == 0){ priv->nomore = true; } priv->fill_needed = false; if (end) break; } } if (!priv->buf->isempty()){ const char *bytes = priv->buf->getbytes(); int nbbytes = priv->buf->getnbbytes(); int valid = priv->c.validrecord (bytes,nbbytes,end,priv->nomore); if (valid == -1 || valid > 0){ int used = priv->c.process (bytes,nbbytes,end,priv->nomore); if (used >= 0){ priv->fill_needed = true; priv->buf->setused (used); if (used > 0) nothing = 0; }else{ // functag process is not defined if (valid > 0){ // A valid record was seen, we end so getrecord // may get it priv->valid = valid; break; }else if (valid == -1){ // functags validrecord and process are not defined // so we end. break; } } }else{ priv->fill_needed = true; } if (end) break; } if (nothing == 2){ // There is probably a problem here. We are almost // full and the processing unit did not picked any bytes // The buffer is not large enough. Not likely. break; } } } /* Add some byte to the input buffer */ PUBLIC int STREAMP::fill(void *buf, int len) { int ret = -1; pack(); priv->buf->checkalloc (len); int rest = priv->buf->getspace(); if (rest >= len){ memcpy (priv->buf->getinputbuf(),buf,len); priv->buf->setadded(len); ret = 0; // Try to process some loop(); } return ret; } void _F_STREAMP::reset() { priv->buf->reset(); } /* Tell STREAMP this is the end of input. */ PUBLIC void STREAMP::eof() { priv->nomore = true; loop(); } PUBLIC int STREAMP::getrecord(void *buf, int size) { int ret = -1; for (int i=0; i<2; i++){ if (priv->valid > 0){ ret = priv->valid; memcpy (buf,priv->buf->getbytes(),ret); priv->buf->setused (ret); priv->valid = 0; break; } if (i == 0) loop(); } return ret; } struct _F_streamp_private{ _F_STREAMP *C; }; void _F_streamp::reset() { priv->C->reset(); } void streamp (_F_streamp &c) { _F_streamp *c; _F_streamp_private priv; glocal.c = &c; glocal.priv.C = NULL; c.priv = &glocal.priv; (); glocal.priv.C = this; return glocal.c->fill(buf,size,end); glocal.priv.C = this; return glocal.c->process(buf,len,end,nomore); o.loop(); } struct _F_streamp_do_private{ STREAMP_BUF *ctl; }; void _F_streamp_do::reset() { priv->ctl->reset(); } /* Add some bytes to the control buffer and attempt to process some. Return -1 if there is no way to add all the bytes. Something is wrong with the process functag. */ int streamp_do (_F_streamp_do &c, STREAMP_BUF &ctl, const void *data, int len) { /* data may be very large. Larger in fact than the internal buffer in ctl. So we try to fill the internal buffer as much as possible, then request the functag to grab some and we iterate until all data has been moved to the internal buffer and the functag refuse to process more records. */ bool end = false; ctl.pack(); _F_streamp_do_private priv; priv.ctl = &ctl; c.priv = &priv; while(1){ int rest = ctl.getspace(); // We can grow the buffer so it contains all the new data // but this may create a very large buffer uselessly // especially if the data contains several chunks // (as expected by the process functag) // So if rest == 0, we simply request to grow the buffer // using its defined increment size. if (rest == 0){ ctl.checkalloc(1); // Grow the buffer a bit rest = ctl.getspace(); } int add = len; if (rest < len) add = rest; memcpy (ctl.getinputbuf(),data,add); ctl.setadded (add); data = (char*)data+add; len-=add; bool tooksome = false; while (1){ int nbbytes = ctl.getnbbytes(); if (nbbytes == 0) break; int used = c.process(ctl.getbytes(),nbbytes,end); if (used == 0){ break; }else{ ctl.setused (used); tooksome = true; } } if (len == 0) break; if (add == 0 && !tooksome) break; ctl.pack(); } return len == 0 ? 0 : -1; }