Added method "getHeader" in IBlockInputStream [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-01-08 04:14:43 +03:00
parent 12c521fa25
commit 1780e6c1d9
8 changed files with 18 additions and 35 deletions

View File

@ -29,18 +29,19 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query. /// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query.
input_buffer_ast_part = ReadBufferFromMemory(ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0); input_buffer_ast_part = std::make_unique<ReadBufferFromMemory>(
ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0);
ConcatReadBuffer::ReadBuffers buffers; ConcatReadBuffer::ReadBuffers buffers;
if (ast_insert_query->data) if (ast_insert_query->data)
buffers.push_back(&input_buffer_ast_part.value()); buffers.push_back(input_buffer_ast_part.get());
buffers.push_back(&input_buffer_tail_part); buffers.push_back(&input_buffer_tail_part);
/** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'. /** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
* - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'. * - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'.
*/ */
input_buffer_contacenated = ConcatReadBuffer(buffers); input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);
res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out_sample, context.getSettings().max_insert_block_size); res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out_sample, context.getSettings().max_insert_block_size);
} }

View File

@ -3,7 +3,7 @@
#include <Parsers/IAST.h> #include <Parsers/IAST.h>
#include <DataStreams/IProfilingBlockInputStream.h> #include <DataStreams/IProfilingBlockInputStream.h>
#include <cstddef> #include <cstddef>
#include <optional> #include <memory>
namespace DB namespace DB
@ -31,8 +31,8 @@ public:
Block getHeader() override { return res_stream->getHeader(); } Block getHeader() override { return res_stream->getHeader(); }
private: private:
std::optional<ReadBuffer> input_buffer_ast_part; std::unique_ptr<ReadBuffer> input_buffer_ast_part;
std::optional<ReadBuffer> input_buffer_contacenated; std::unique_ptr<ReadBuffer> input_buffer_contacenated;
BlockInputStreamPtr res_stream; BlockInputStreamPtr res_stream;
}; };

View File

@ -4,8 +4,6 @@
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Core/iostream_debug_helpers.h>
namespace DB namespace DB
{ {
@ -227,8 +225,6 @@ Block RemoteBlockInputStream::getHeader()
if (res.rows() > 0) if (res.rows() > 0)
throw Exception("Logical error: the header block must be sent before data", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: the header block must be sent before data", ErrorCodes::LOGICAL_ERROR);
DUMP(res);
header = res; header = res;
return header; return header;
} }

View File

@ -23,8 +23,6 @@
#include <Common/demangle.h> #include <Common/demangle.h>
#include <Interpreters/config_compile.h> #include <Interpreters/config_compile.h>
#include <Core/iostream_debug_helpers.h>
namespace ProfileEvents namespace ProfileEvents
{ {
@ -119,7 +117,6 @@ Block Aggregator::getHeader(bool final) const
} }
else if (params.intermediate_header) else if (params.intermediate_header)
{ {
DUMP(params.intermediate_header);
res = params.intermediate_header.cloneEmpty(); res = params.intermediate_header.cloneEmpty();
if (final) if (final)
@ -1147,12 +1144,8 @@ Block Aggregator::prepareBlockAndFill(
MutableColumns final_aggregate_columns(params.aggregates_size); MutableColumns final_aggregate_columns(params.aggregates_size);
AggregateColumnsData aggregate_columns_data(params.aggregates_size); AggregateColumnsData aggregate_columns_data(params.aggregates_size);
DUMP(params.aggregates_size);
Block header = getHeader(final); Block header = getHeader(final);
DUMP(header);
for (size_t i = 0; i < params.keys_size; ++i) for (size_t i = 0; i < params.keys_size; ++i)
{ {
key_columns[i] = header.safeGetByPosition(i).type->createColumn(); key_columns[i] = header.safeGetByPosition(i).type->createColumn();
@ -1163,8 +1156,6 @@ Block Aggregator::prepareBlockAndFill(
{ {
if (!final) if (!final)
{ {
DUMP(header.safeGetByPosition(i + params.keys_size).type);
aggregate_columns[i] = header.safeGetByPosition(i + params.keys_size).type->createColumn(); aggregate_columns[i] = header.safeGetByPosition(i + params.keys_size).type->createColumn();
/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states. /// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
@ -1234,8 +1225,6 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
for (size_t i = 0; i < params.aggregates_size; ++i) for (size_t i = 0; i < params.aggregates_size; ++i)
{ {
DUMP4(i, final, aggregate_columns, offsets_of_aggregate_states);
if (!final) if (!final)
aggregate_columns[i]->push_back(data + offsets_of_aggregate_states[i]); aggregate_columns[i]->push_back(data + offsets_of_aggregate_states[i]);
else else

View File

@ -46,8 +46,6 @@
#include <Columns/Collator.h> #include <Columns/Collator.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Core/iostream_debug_helpers.h>
namespace ProfileEvents namespace ProfileEvents
{ {
@ -992,9 +990,6 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina
Block header = streams[0]->getHeader(); Block header = streams[0]->getHeader();
DUMP(header);
DUMP(key_names);
ColumnNumbers keys; ColumnNumbers keys;
for (const auto & name : key_names) for (const auto & name : key_names)
keys.push_back(header.getPositionByName(name)); keys.push_back(header.getPositionByName(name));

View File

@ -1064,7 +1064,8 @@ private:
void onProgress(const Progress & value) void onProgress(const Progress & value)
{ {
progress.incrementPiecewiseAtomically(value); progress.incrementPiecewiseAtomically(value);
block_out_stream->onProgress(value); if (block_out_stream)
block_out_stream->onProgress(value);
writeProgress(); writeProgress();
} }

View File

@ -310,7 +310,8 @@ void TCPHandler::processOrdinaryQuery()
for (auto & elem : header) for (auto & elem : header)
elem.column = elem.type->createColumn(); elem.column = elem.type->createColumn();
sendData(header); if (header)
sendData(header);
} }
AsynchronousBlockInputStream async_in(state.io.in); AsynchronousBlockInputStream async_in(state.io.in);
@ -346,12 +347,12 @@ void TCPHandler::processOrdinaryQuery()
} }
} }
/** If data has run out, we will send the profiling data and total values to /** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use * the last zero block to be able to use
* this information in the suffix output of stream. * this information in the suffix output of stream.
* If the request was interrupted, then `sendTotals` and other methods could not be called, * If the request was interrupted, then `sendTotals` and other methods could not be called,
* because we have not read all the data yet, * because we have not read all the data yet,
* and there could be ongoing calculations in other threads at the same time. * and there could be ongoing calculations in other threads at the same time.
*/ */
if (!block && !isQueryCancelled()) if (!block && !isQueryCancelled())
{ {

View File

@ -6,7 +6,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
echo 'DROP TABLE IF EXISTS test.long_insert' | ${CLICKHOUSE_CURL} -sSg ${CLICKHOUSE_URL} -d @- echo 'DROP TABLE IF EXISTS test.long_insert' | ${CLICKHOUSE_CURL} -sSg ${CLICKHOUSE_URL} -d @-
echo 'CREATE TABLE test.long_insert (a String) ENGINE = Memory' | ${CLICKHOUSE_CURL} -sSg ${CLICKHOUSE_URL} -d @- echo 'CREATE TABLE test.long_insert (a String) ENGINE = Memory' | ${CLICKHOUSE_CURL} -sSg ${CLICKHOUSE_URL} -d @-
for string_size in 1 10 100 1000 10000 100000 1000000; do for string_size in 1 10 100 1000 10000 100000 1000000; do
# Если не указать LC_ALL=C, то Perl будет ругаться на некоторых плохо настроенных системах. # LC_ALL=C is needed because otherwise Perl will bark on bad tuned environment.
LC_ALL=C perl -we 'for my $letter ("a" .. "z") { print(($letter x '$string_size') . "\n") }' | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}?query=INSERT+INTO+test.long_insert+FORMAT+TabSeparated" --data-binary @- LC_ALL=C perl -we 'for my $letter ("a" .. "z") { print(($letter x '$string_size') . "\n") }' | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}?query=INSERT+INTO+test.long_insert+FORMAT+TabSeparated" --data-binary @-
echo 'SELECT substring(a, 1, 1) AS c, length(a) AS l FROM test.long_insert ORDER BY c, l' | ${CLICKHOUSE_CURL} -sSg ${CLICKHOUSE_URL} -d @- echo 'SELECT substring(a, 1, 1) AS c, length(a) AS l FROM test.long_insert ORDER BY c, l' | ${CLICKHOUSE_CURL} -sSg ${CLICKHOUSE_URL} -d @-
done done