Added requested changes. [#METR-19372]

This commit is contained in:
Vitaliy Lyudvichenko 2016-12-09 13:10:12 +03:00
parent 855dc56598
commit ea11f61433
5 changed files with 65 additions and 63 deletions

View File

@ -3,7 +3,7 @@
#include <DB/Interpreters/Context.h>
#include <DB/IO/ConcatReadBuffer.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <common/logger_useful.h>
#include <DB/DataStreams/BlockIO.h>
namespace DB
{
@ -13,69 +13,27 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
/** Prepares an input stream which produce data containing in INSERT query
* Head of inserting data could be stored in INSERT ast directly
* Remaining (tail) data could be stored in input_buffer_tail_part
*/
class InputStreamFromASTInsertQuery : public IProfilingBlockInputStream
{
public:
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer & istr, const BlockIO & streams, Context & context)
{
const ASTInsertQuery * ast_insert_query = dynamic_cast<const ASTInsertQuery *>(ast.get());
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context);
if (!ast_insert_query)
throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR);
Block readImpl() override { return res_stream->read(); }
void readPrefixImpl() override { return res_stream->readPrefix(); }
void readSuffixImpl() override { return res_stream->readSuffix(); }
String format = ast_insert_query->format;
if (format.empty())
format = "Values";
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (istr) part of query.
buf1 = std::make_unique<ReadBuffer>(
const_cast<char *>(ast_insert_query->data), ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0, 0);
if (ast_insert_query->data)
buffers.push_back(buf1.get());
buffers.push_back(&istr);
/** NOTE Must not read from 'istr' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
* - because 'query.data' could refer to memory piece, used as buffer for 'istr'.
*/
data_istr = std::make_unique<ConcatReadBuffer>(buffers);
res_stream = context.getInputFormat(format, *data_istr, streams.out_sample, context.getSettings().max_insert_block_size);
}
Block readImpl() override
{
return res_stream->read();
}
void readPrefixImpl() override
{
return res_stream->readPrefix();
}
void readSuffixImpl() override
{
return res_stream->readSuffix();
}
String getName() const override
{
return "InputStreamFromASTInsertQuery";
}
String getID() const override
{
return "InputStreamFromASTInsertQuery(" + toString(this) + ")";
}
String getName() const override { return "InputStreamFromASTInsertQuery"; }
String getID() const override { return "InputStreamFromASTInsertQuery(" + toString(this) + ")"; }
private:
ConcatReadBuffer::ReadBuffers buffers;
std::unique_ptr<ReadBuffer> buf1;
std::unique_ptr<ReadBuffer> data_istr;
std::unique_ptr<ReadBuffer> input_buffer_ast_part;
std::unique_ptr<ReadBuffer> input_buffer_contacenated;
BlockInputStreamPtr res_stream;
};

View File

@ -32,14 +32,15 @@ struct QueryLogElement
time_t query_start_time{};
UInt64 query_duration_ms{};
/// The data fetched from DB to execute the query
UInt64 read_rows{};
UInt64 read_bytes{};
/// The data written to DB
UInt64 written_rows{};
UInt64 written_bytes{};
/// NOTE: Not obvious metric.
/// It always approximately equal to read_rows or written_rows at the end of query execution.
/// The data sent to the client
UInt64 result_rows{};
UInt64 result_bytes{};

View File

@ -0,0 +1,37 @@
#include <DB/DataStreams/InputStreamFromASTInsertQuery.h>
namespace DB
{
InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context)
{
const ASTInsertQuery * ast_insert_query = dynamic_cast<const ASTInsertQuery *>(ast.get());
if (!ast_insert_query)
throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR);
String format = ast_insert_query->format;
if (format.empty())
format = "Values";
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query.
input_buffer_ast_part = std::make_unique<ReadBuffer>(
const_cast<char *>(ast_insert_query->data), ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0, 0);
ConcatReadBuffer::ReadBuffers buffers;
if (ast_insert_query->data)
buffers.push_back(input_buffer_ast_part.get());
buffers.push_back(&input_buffer_tail_part);
/** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
* - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'.
*/
input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);
res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out_sample, context.getSettings().max_insert_block_size);
}
}

View File

@ -207,8 +207,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
stream->setProcessListElement(context.getProcessListElement());
}
}
if (!res.out || !dynamic_cast<CountingBlockOutputStream *>(res.out.get()))
LOG_DEBUG(&Logger::get("executeQuery"), "res.out " << res.out.get() << " is empty");
/// Everything related to query log.
{

View File

@ -383,7 +383,12 @@ public:
}
};
/** Progress callback. Is used by Horizontal merger and first step of Vertical merger.
* What it should update:
* - approximate progress
* - amount of merged rows and their size (PK columns subset is used in case of Vertical merge)
* - time elapsed for current merge.
*/
class MergeProgressCallback : public ProgressCallback
{
public:
@ -425,6 +430,9 @@ public:
};
};
/** Progress callback for gathering step of Vertical merge.
* Updates: approximate progress, amount of merged bytes (TODO: two column case should be fixed), elapsed time.
*/
class MergeProgressCallbackVerticalStep : public MergeProgressCallback
{
public:
@ -438,8 +446,7 @@ public:
}
Float64 initial_progress;
/// NOTE: not thread safe (to be copyable). It is OK in current single thread use case
size_t rows_read_internal{0};
size_t rows_read_internal{0}; // NOTE: not thread safe (to be copyable). It is OK in current single thread use case
void operator() (const Progress & value)
{
@ -453,6 +460,7 @@ public:
};
};
/// parts should be sorted.
MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart(
MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry,