#include "SpoolRewindInputStream.hpp" #include "Logger.hpp" #include #include #include // TODO: thread-safe and platform-conformant name of spool file #ifndef _WIN32 #define SPOOL_FILE_NAME "/tmp/spool.tmp" #else #define SPOOL_FILE_NAME "C:\\TEMP\\SPOOL.TMP" #endif using namespace std; spool_streambuf::spool_streambuf( size_t bufSize, size_t putBack, size_t spoolBufSize ) : m_putBack( max( putBack, size_t( 1 ) ) ), m_spoolBuf( spoolBufSize ), m_spoolBufPos( 0 ), m_spoolBufSize( spoolBufSize ), m_state( TO_SPOOL_MEMORY ), m_buf( max( bufSize, putBack ) + putBack ), m_base( 0 ), m_start( 0 ) { char *end = &m_buf.front( ) + m_buf.size( ); setg( end, end, end ); } spool_streambuf::~spool_streambuf( ) { switch( m_state ) { case TO_SPOOL_MEMORY: case FROM_SPOOL_MEMORY: // memory only, nothing to clean up break; case TO_SPOOL_FILE: case FROM_SPOOL_FILE: m_spoolFile.close( ); (void)remove( SPOOL_FILE_NAME ); break; } } streambuf::int_type spool_streambuf::spoolSourceData( char *data, size_t n ) { m_base = &m_buf.front( ); m_start = m_base; size_t data_len = m_buf.size( ) - ( m_start - m_base ) ; // more space than data, no problem if( n < data_len ) { data_len = n; } // enough space in the stream read buffer, put it there memcpy( m_start, data, data_len ); setg( m_base, m_start, m_start + data_len ); data += data_len; n -= data_len; spoolData( data, n ); return data_len + n; } void spool_streambuf::spoolData( char *data, size_t n ) { switch( m_state ) { case TO_SPOOL_MEMORY: // as long we can "spool" to memory, do so.. if( m_spoolBufPos + n <= m_spoolBufSize ) { m_spoolBuf.insert( m_spoolBuf.begin( ) + m_spoolBufPos, data, data + n ); m_spoolBufPos += n; } else { // ..otherwise start spooling to disk, write // current memory spool buffer first.. LOG( logWARNING ) << "Spooling spool buffer exceeded (>" << m_spoolBufSize << ")"; m_spoolFile.open( SPOOL_FILE_NAME, ios::binary | ios::out | ios::trunc ); assert( m_spoolFile.good( ) ); m_spoolFile.write( &m_spoolBuf.front( ), m_spoolBufSize ); assert( m_spoolFile.good( ) ); m_state = TO_SPOOL_FILE; m_spoolFile.write( data, n ); assert( m_spoolFile.good( ) ); } break; case TO_SPOOL_FILE: // we are appending to the spool file assert( m_spoolFile.good( ) ); m_spoolFile.write( data, n ); assert( m_spoolFile.good( ) ); break; case FROM_SPOOL_MEMORY: case FROM_SPOOL_FILE: throw logic_error( "Still getting data from source after rewind!" ); default: throw logic_error( "Illegal state!" ); } } streambuf::int_type spool_streambuf::underflow( ) { // check if buffer is exhausted, if not, return current character if( gptr( ) < egptr( ) ) return traits_type::to_int_type( *gptr( ) ); m_base = &m_buf.front( ); m_start = m_base; // move put back away if( eback( ) == m_base ) { memmove( m_base, egptr( ) - m_putBack, m_putBack ); m_start += m_putBack; } // read from source or spool (depends on calling rewind) streambuf::int_type n; switch( m_state ) { case TO_SPOOL_MEMORY: case TO_SPOOL_FILE: n = readFromSource( ); if( n == 0 ) { return traits_type::eof( ); } else if( n < 0 ) { // TODO handle error return traits_type::eof( ); } spoolData( m_start, n ); break; case FROM_SPOOL_MEMORY: n = min( m_buf.size( ) - ( m_start - m_base ), m_spoolBufSize - m_spoolBufPos ); if( n == 0 ) { return traits_type::eof( ); } copy( m_spoolBuf.begin( ) + m_spoolBufPos, m_spoolBuf.begin( ) + m_spoolBufPos + n, m_buf.begin( ) + ( m_start - m_base ) ); m_spoolBufPos += n; break; case FROM_SPOOL_FILE: n = min( m_buf.size( ) - ( m_start - m_base ), m_spoolBufSize - m_spoolBufPos ); m_spoolFile.read( m_start, n ); m_spoolBufPos += n; if( m_spoolBufPos > m_spoolBufSize ) { return traits_type::eof( ); } if( n == 0 || m_spoolFile.eof( ) ) { return traits_type::eof( ); } break; default: throw logic_error( "Illegal state!" ); } // set pointers setg( m_base, m_start, m_start + n ); return traits_type::to_int_type( *gptr( ) ); } void spool_streambuf::rewind( ) { switch( m_state ) { case TO_SPOOL_MEMORY: m_spoolBufPos = 0; m_state = FROM_SPOOL_MEMORY; break; case TO_SPOOL_FILE: m_spoolFile.close( ); m_spoolFile.open( SPOOL_FILE_NAME, ios::binary | ios::in ); m_spoolFile.seekg( 0, ios::end ); m_spoolBufSize = m_spoolFile.tellg( ); m_spoolFile.seekg( 0, ios::beg ); m_spoolBufPos = 0; m_state = FROM_SPOOL_FILE; break; case FROM_SPOOL_MEMORY: m_spoolBufPos = 0; break; case FROM_SPOOL_FILE: m_spoolBufPos = 0; m_spoolFile.seekg( 0, ios::beg ); break; } char *end = &m_buf.front( ) + m_buf.size( ); setg( end, end, end ); pubseekpos( 0, ios_base::in ); } SpoolRewindInputStream::SpoolRewindInputStream( const URL &url ) : RewindInputStream( url ), m_buf( 0 ) { } SpoolRewindInputStream::~SpoolRewindInputStream( ) { } void SpoolRewindInputStream::rewind( ) { // consume rest of web request, force spooling in streambuf enum { CHUNKSIZE = 1024 }; char buf[CHUNKSIZE]; while( good( ) && !eof( ) ) { read( buf, CHUNKSIZE ); } ios::clear( ); assert( m_buf != 0 ); m_buf->rewind( ); }