summaryrefslogtreecommitdiff
path: root/src/libcrawler/SpoolRewindInputStream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/libcrawler/SpoolRewindInputStream.cpp')
-rw-r--r--src/libcrawler/SpoolRewindInputStream.cpp181
1 files changed, 181 insertions, 0 deletions
diff --git a/src/libcrawler/SpoolRewindInputStream.cpp b/src/libcrawler/SpoolRewindInputStream.cpp
new file mode 100644
index 0000000..9135741
--- /dev/null
+++ b/src/libcrawler/SpoolRewindInputStream.cpp
@@ -0,0 +1,181 @@
+#include "SpoolRewindInputStream.hpp"
+#include "Logger.hpp"
+
+#include <algorithm>
+#include <cstring>
+#include <cassert>
+
+using namespace std;
+
+spool_streambuf::spool_streambuf( size_t bufSize, size_t putBack, size_t spoolBufSize )
+ : m_putBack( max( putBack, size_t( 1 ) ) ),
+ m_spoolBuf( spoolBufSize ), m_spoolBufPos( 0 ),
+ m_spoolBufSize( spoolBufSize ), m_state( TO_SPOOL_MEMORY ),
+ m_buf( max( bufSize, putBack ) + putBack ),
+ m_base( 0 ), m_start( 0 )
+{
+ char *end = &m_buf.front( ) + m_buf.size( );
+ setg( end, end, end );
+}
+
+spool_streambuf::~spool_streambuf( )
+{
+ switch( m_state ) {
+ case TO_SPOOL_MEMORY:
+ case FROM_SPOOL_MEMORY:
+ // memory only, nothing to clean up
+ break;
+
+ case TO_SPOOL_FILE:
+ case FROM_SPOOL_FILE:
+ m_spoolFile.close( );
+ (void)remove( "/tmp/spool.tmp" );
+ break;
+ }
+}
+
+streambuf::int_type spool_streambuf::underflow( )
+{
+ // check if buffer is exhausted, if not, return current character
+ if( gptr( ) < egptr( ) )
+ return traits_type::to_int_type( *gptr( ) );
+
+ m_base = &m_buf.front( );
+ m_start = m_base;
+
+ // move put back away
+ if( eback( ) == m_base ) {
+ memmove( m_base, egptr( ) - m_putBack, m_putBack );
+ m_start += m_putBack;
+ }
+
+ // read from source or spool (depends on calling rewind)
+ streambuf::int_type n;
+ switch( m_state ) {
+ case TO_SPOOL_MEMORY:
+ case TO_SPOOL_FILE:
+ n = readFromSource( );
+ if( n == 0 ) {
+ return traits_type::eof( );
+ } else if( n < 0 ) {
+ // TODO handle error
+ return traits_type::eof( );
+ }
+
+ if( m_state == TO_SPOOL_MEMORY ) {
+ // as long we can "spool" to memory, do so..
+ if( m_spoolBufPos + n <= m_spoolBufSize ) {
+ m_spoolBuf.insert( m_spoolBuf.begin( ) + m_spoolBufPos, m_start, m_start + n );
+ m_spoolBufPos += n;
+ } else {
+ // ..otherwise start spooling to disk, write
+ // current memory spool buffer first..
+ LOG( logWARNING ) << "Spooling spool buffer exceeded (>" << m_spoolBufSize << ")";
+ m_spoolFile.open( "/tmp/spool.tmp", ios::binary | ios::out | ios::trunc );
+ assert( m_spoolFile.good( ) );
+ m_spoolFile.write( &m_spoolBuf.front( ), m_spoolBufSize );
+ assert( m_spoolFile.good( ) );
+ m_state = TO_SPOOL_FILE;
+ m_spoolFile.write( m_start, n );
+ assert( m_spoolFile.good( ) );
+ }
+ } else {
+ // we are appending to the spool file
+ assert( m_spoolFile.good( ) );
+ m_spoolFile.write( m_start, n );
+ assert( m_spoolFile.good( ) );
+ }
+
+ break;
+
+ case FROM_SPOOL_MEMORY:
+ n = min( m_buf.size( ) - ( m_start - m_base ), m_spoolBufSize - m_spoolBufPos );
+ if( n == 0 ) {
+ return traits_type::eof( );
+ }
+
+ copy( m_spoolBuf.begin( ) + m_spoolBufPos,
+ m_spoolBuf.begin( ) + m_spoolBufPos + n,
+ m_buf.begin( ) + ( m_start - m_base ) );
+
+ m_spoolBufPos += n;
+
+ break;
+
+ case FROM_SPOOL_FILE:
+
+ n = min( m_buf.size( ) - ( m_start - m_base ), m_spoolBufSize - m_spoolBufPos );
+ m_spoolFile.read( m_start, n );
+ m_spoolBufPos += n;
+ if( m_spoolBufPos > m_spoolBufSize ) {
+ return traits_type::eof( );
+ }
+ if( n == 0 || m_spoolFile.eof( ) ) {
+ return traits_type::eof( );
+ }
+
+ break;
+ }
+
+ // set pointers
+ setg( m_base, m_start, m_start + n );
+
+ return traits_type::to_int_type( *gptr( ) );
+}
+
+void spool_streambuf::rewind( )
+{
+ switch( m_state ) {
+ case TO_SPOOL_MEMORY:
+ m_spoolBufPos = 0;
+ m_state = FROM_SPOOL_MEMORY;
+ break;
+
+ case TO_SPOOL_FILE:
+ m_spoolFile.close( );
+ m_spoolFile.open( "/tmp/spool.tmp", ios::binary | ios::in );
+ m_spoolFile.seekg( 0, ios::end );
+ m_spoolBufSize = m_spoolFile.tellg( );
+ m_spoolFile.seekg( 0, ios::beg );
+ m_spoolBufPos = 0;
+ m_state = FROM_SPOOL_FILE;
+ break;
+
+ case FROM_SPOOL_MEMORY:
+ m_spoolBufPos = 0;
+ break;
+
+ case FROM_SPOOL_FILE:
+ m_spoolBufPos = 0;
+ m_spoolFile.seekg( 0, ios::beg );
+ break;
+ }
+
+ char *end = &m_buf.front( ) + m_buf.size( );
+ setg( end, end, end );
+ pubseekpos( 0, ios_base::in );
+}
+
+SpoolRewindInputStream::SpoolRewindInputStream( const URL &url )
+ : RewindInputStream( url ), m_buf( 0 )
+{
+}
+
+SpoolRewindInputStream::~SpoolRewindInputStream( )
+{
+}
+
+void SpoolRewindInputStream::rewind( )
+{
+ // consume rest of web request, force spooling in streambuf
+ enum { CHUNKSIZE = 1024 };
+ char buf[CHUNKSIZE];
+
+ while( good( ) && !eof( ) ) {
+ read( buf, CHUNKSIZE );
+ }
+
+ ios::clear( );
+ assert( m_buf != 0 );
+ m_buf->rewind( );
+}