2014-03-04 11:26:55 +00:00
# include <DB/DataStreams/CreatingSetsBlockInputStream.h>
# include <iomanip>
namespace DB
{
Block CreatingSetsBlockInputStream : : readImpl ( )
{
Block res ;
if ( ! created )
{
2014-07-06 19:48:39 +00:00
/// Заполнение временных таблиц идёт первым - потому что эти таблицы могут затем использоваться для создания Set/Join.
for ( auto & elem : subqueries_for_sets )
2014-03-04 11:26:55 +00:00
{
2014-07-06 19:48:39 +00:00
create ( elem . second ) ;
2014-06-12 04:04:47 +00:00
if ( isCancelled ( ) )
return res ;
}
2014-03-04 11:26:55 +00:00
created = true ;
}
if ( isCancelled ( ) )
return res ;
return children . back ( ) - > read ( ) ;
}
2014-07-06 19:48:39 +00:00
void CreatingSetsBlockInputStream : : create ( SubqueryForSet & subquery )
2014-03-04 11:26:55 +00:00
{
2014-07-06 19:48:39 +00:00
LOG_TRACE ( log , ( subquery . set ? " Creating set. " : " " )
< < ( subquery . join ? " Creating join. " : " " )
< < ( subquery . table ? " Filling temporary table. " : " " ) ) ;
2014-03-04 11:26:55 +00:00
Stopwatch watch ;
2014-07-06 19:48:39 +00:00
BlockOutputStreamPtr table_out ;
if ( subquery . table )
table_out = subquery . table - > write ( ASTPtr ( ) ) ;
bool done_with_set = ! subquery . set ;
bool done_with_join = ! subquery . join ;
bool done_with_table = ! subquery . table ;
if ( done_with_set & & done_with_join & & done_with_table )
throw Exception ( " Logical error: nothing to do with subquery " , ErrorCodes : : LOGICAL_ERROR ) ;
subquery . source - > readPrefix ( ) ;
if ( table_out )
table_out - > writePrefix ( ) ;
while ( Block block = subquery . source - > read ( ) )
2014-03-04 11:26:55 +00:00
{
if ( isCancelled ( ) )
{
2014-07-06 19:48:39 +00:00
LOG_DEBUG ( log , " Query was cancelled during set / join or temporary table creation. " ) ;
2014-03-04 11:26:55 +00:00
return ;
}
2014-07-06 19:48:39 +00:00
if ( ! done_with_set )
2014-03-04 11:26:55 +00:00
{
2014-07-06 19:48:39 +00:00
if ( ! subquery . set - > insertFromBlock ( block ) )
done_with_set = true ;
2014-03-04 11:26:55 +00:00
}
2014-07-06 19:48:39 +00:00
if ( ! done_with_join )
{
if ( ! subquery . join - > insertFromBlock ( block ) )
done_with_join = true ;
}
2014-06-12 04:04:47 +00:00
2014-07-06 19:48:39 +00:00
if ( ! done_with_table )
2014-06-12 04:04:47 +00:00
{
2014-07-06 19:48:39 +00:00
table_out - > write ( block ) ;
rows_to_transfer + = block . rows ( ) ;
bytes_to_transfer + = block . bytes ( ) ;
if ( ( max_rows_to_transfer & & rows_to_transfer > max_rows_to_transfer )
| | ( max_bytes_to_transfer & & bytes_to_transfer > max_bytes_to_transfer ) )
{
if ( transfer_overflow_mode = = OverflowMode : : THROW )
throw Exception ( " IN/JOIN external table size limit exceeded. "
" Rows: " + toString ( rows_to_transfer )
+ " , limit: " + toString ( max_rows_to_transfer )
+ " . Bytes: " + toString ( bytes_to_transfer )
+ " , limit: " + toString ( max_bytes_to_transfer ) + " . " ,
ErrorCodes : : SET_SIZE_LIMIT_EXCEEDED ) ;
if ( transfer_overflow_mode = = OverflowMode : : BREAK )
done_with_table = true ;
throw Exception ( " Logical error: unknown overflow mode " , ErrorCodes : : LOGICAL_ERROR ) ;
}
2014-06-12 04:04:47 +00:00
}
2014-07-06 19:48:39 +00:00
if ( done_with_set & & done_with_join & & done_with_table )
2014-06-12 04:04:47 +00:00
{
2014-07-06 19:48:39 +00:00
if ( IProfilingBlockInputStream * profiling_in = dynamic_cast < IProfilingBlockInputStream * > ( & * subquery . source ) )
2014-06-12 04:04:47 +00:00
profiling_in - > cancel ( ) ;
2014-07-06 19:48:39 +00:00
2014-06-12 04:04:47 +00:00
break ;
}
}
2014-07-06 21:59:20 +00:00
// subquery.source->readSuffix(); /// TODO Блокируется в RemoteBlockInputStream::readSuffixImpl при запросе SELECT number FROM system.numbers WHERE number IN (SELECT number FROM remote('127.0.0.{1,2}', system, numbers) WHERE number % 2 = 1 LIMIT 10) LIMIT 10
2014-07-06 19:48:39 +00:00
if ( table_out )
table_out - > writeSuffix ( ) ;
2014-03-04 11:26:55 +00:00
/// Выведем информацию о том, сколько считано строк и байт.
size_t rows = 0 ;
size_t bytes = 0 ;
2014-08-22 02:31:54 +00:00
watch . stop ( ) ;
2014-07-06 19:48:39 +00:00
subquery . source - > getLeafRowsBytes ( rows , bytes ) ;
2014-03-04 11:26:55 +00:00
size_t head_rows = 0 ;
2014-07-06 19:48:39 +00:00
if ( IProfilingBlockInputStream * profiling_in = dynamic_cast < IProfilingBlockInputStream * > ( & * subquery . source ) )
2014-03-04 11:26:55 +00:00
head_rows = profiling_in - > getInfo ( ) . rows ;
if ( rows ! = 0 )
{
2014-07-06 19:48:39 +00:00
std : : stringstream msg ;
msg < < std : : fixed < < std : : setprecision ( 3 ) ;
msg < < " Created. " ;
if ( subquery . set )
2014-07-06 20:46:17 +00:00
msg < < " Set with " < < subquery . set - > size ( ) < < " entries from " < < head_rows < < " rows. " ;
2014-07-06 19:48:39 +00:00
if ( subquery . join )
2014-07-06 20:46:17 +00:00
msg < < " Join with " < < subquery . join - > size ( ) < < " entries from " < < head_rows < < " rows. " ;
2014-07-06 19:48:39 +00:00
if ( subquery . table )
msg < < " Table with " < < head_rows < < " rows. " ;
2014-07-06 20:46:17 +00:00
msg < < " Read " < < rows < < " rows, " < < bytes / 1048576.0 < < " MiB in " < < watch . elapsedSeconds ( ) < < " sec., "
2014-07-06 19:48:39 +00:00
< < static_cast < size_t > ( rows / watch . elapsedSeconds ( ) ) < < " rows/sec., " < < bytes / 1048576.0 / watch . elapsedSeconds ( ) < < " MiB/sec. " ;
LOG_DEBUG ( log , msg . rdbuf ( ) ) ;
}
else
{
LOG_DEBUG ( log , " Subquery has empty result. " ) ;
2014-03-04 11:26:55 +00:00
}
}
}