Add support for nested supqueries for AggregationPartialResult transform and more comments

This commit is contained in:
alexX512 2023-08-29 10:45:34 +00:00
parent bbbf7f4c16
commit 47a908dfa8
27 changed files with 296 additions and 151 deletions

View File

@ -271,9 +271,16 @@ protected:
enum class PartialResultMode: UInt8 enum class PartialResultMode: UInt8
{ {
NotInit, /// Query doesn't show partial result before the first block with 0 rows. /// Query doesn't show partial result before the first block with 0 rows.
Active, /// Query shows partial result after the first and before the second block with 0 rows. /// The first block with 0 rows initializes the output table format using its header.
Inactive /// Query doesn't show partial result at all. NotInit,
/// Query shows partial result after the first and before the second block with 0 rows.
/// The second block with 0 rows indicates that that receiving blocks with partial result has been completed and next blocks will be with the full result.
Active,
/// Query doesn't show partial result at all.
Inactive,
}; };
PartialResultMode partial_result_mode = PartialResultMode::Inactive; PartialResultMode partial_result_mode = PartialResultMode::Inactive;

View File

@ -307,8 +307,8 @@ class IColumn;
\ \
M(Bool, partial_result_on_first_cancel, false, "Allows query to return a partial result after cancel.", 0) \ M(Bool, partial_result_on_first_cancel, false, "Allows query to return a partial result after cancel.", 0) \
\ \
M(Milliseconds, partial_result_update_duration_ms, 0, "Duration of time in milliseconds between real-time updates of result table sent to the client during query execution.", 0) \ M(Milliseconds, partial_result_update_duration_ms, 0, "Interval (in milliseconds) for sending updates with partial data about the result table to the client (in interactive mode) during query execution. Setting to 0 disables partial results. Only supported for single-threaded GROUP BY without key, ORDER BY, LIMIT and OFFSET.", 0) \
M(UInt64, max_rows_in_partial_result, 10, "Max rows displayed to user after each real-time update of output table during query execution.", 0) \ M(UInt64, max_rows_in_partial_result, 10, "Maximum rows to show in the partial result after every real-time update while the query runs (use partial result limit + OFFSET as a value in case of OFFSET in the query).", 0) \
/** Settings for testing hedged requests */ \ /** Settings for testing hedged requests */ \
M(Milliseconds, sleep_in_send_tables_status_ms, 0, "Time to sleep in sending tables status response in TCPHandler", 0) \ M(Milliseconds, sleep_in_send_tables_status_ms, 0, "Time to sleep in sending tables status response in TCPHandler", 0) \
M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \ M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \

View File

@ -198,22 +198,4 @@ void IOutputFormat::finalize()
finalized = true; finalized = true;
} }
void IOutputFormat::clearLastLines(size_t lines_number)
{
/// http://en.wikipedia.org/wiki/ANSI_escape_code
#define MOVE_TO_PREV_LINE "\033[A"
#define CLEAR_TO_END_OF_LINE "\033[K"
static const char * clear_prev_line = MOVE_TO_PREV_LINE \
CLEAR_TO_END_OF_LINE;
/// Move cursor to the beginning of line
writeCString("\r", out);
for (size_t line = 0; line < lines_number; ++line)
{
writeCString(clear_prev_line, out);
}
}
} }

View File

@ -103,8 +103,6 @@ public:
} }
} }
void clearLastLines(size_t lines_number);
protected: protected:
friend class ParallelFormattingOutputFormat; friend class ParallelFormattingOutputFormat;

View File

@ -390,9 +390,28 @@ void PrettyBlockOutputFormat::consumeExtremes(Chunk chunk)
write(std::move(chunk), PortKind::Extremes); write(std::move(chunk), PortKind::Extremes);
} }
void PrettyBlockOutputFormat::clearLastLines(size_t lines_number)
{
/// http://en.wikipedia.org/wiki/ANSI_escape_code
#define MOVE_TO_PREV_LINE "\033[A"
#define CLEAR_TO_END_OF_LINE "\033[K"
static const char * clear_prev_line = MOVE_TO_PREV_LINE \
CLEAR_TO_END_OF_LINE;
/// Move cursor to the beginning of line
writeCString("\r", out);
for (size_t line = 0; line < lines_number; ++line)
{
writeCString(clear_prev_line, out);
}
}
void PrettyBlockOutputFormat::consumePartialResult(Chunk chunk) void PrettyBlockOutputFormat::consumePartialResult(Chunk chunk)
{ {
if (prev_partial_block_rows > 0) if (prev_partial_block_rows > 0)
/// number of rows + header line + footer line
clearLastLines(prev_partial_block_rows + 2); clearLastLines(prev_partial_block_rows + 2);
prev_partial_block_rows = chunk.getNumRows(); prev_partial_block_rows = chunk.getNumRows();

View File

@ -27,6 +27,8 @@ protected:
void consume(Chunk) override; void consume(Chunk) override;
void consumeTotals(Chunk) override; void consumeTotals(Chunk) override;
void consumeExtremes(Chunk) override; void consumeExtremes(Chunk) override;
void clearLastLines(size_t lines_number);
void consumePartialResult(Chunk) override; void consumePartialResult(Chunk) override;
size_t total_rows = 0; size_t total_rows = 0;

View File

@ -237,8 +237,21 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'expandPipeline' is not implemented for {} processor", getName()); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'expandPipeline' is not implemented for {} processor", getName());
} }
enum class PartialResultStatus
{
/// Processor currently doesn't support work with the partial result pipeline.
NotSupported,
/// Processor can be skipped in the partial result pipeline.
SkipSupported,
/// Processor creates a light-weight copy of itself in the partial result pipeline.
/// The copy can create snapshots of the original processor or transform small blocks of data in the same way as the original processor
FullSupported,
};
virtual bool isPartialResultProcessor() const { return false; } virtual bool isPartialResultProcessor() const { return false; }
virtual bool supportPartialResultProcessor() const { return false; } virtual PartialResultStatus getPartialResultProcessorSupportStatus() const { return PartialResultStatus::NotSupported; }
/// In case if query was cancelled executor will wait till all processors finish their jobs. /// In case if query was cancelled executor will wait till all processors finish their jobs.
/// Generally, there is no reason to check this flag. However, it may be reasonable for long operations (e.g. i/o). /// Generally, there is no reason to check this flag. However, it may be reasonable for long operations (e.g. i/o).

View File

@ -76,7 +76,7 @@ public:
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_at_least.swap(counter); } void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_at_least.swap(counter); }
void setInputPortHasCounter(size_t pos) { ports_data[pos].input_port_has_counter = true; } void setInputPortHasCounter(size_t pos) { ports_data[pos].input_port_has_counter = true; }
bool supportPartialResultProcessor() const override { return true; } PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::FullSupported; }
}; };
} }

View File

@ -197,7 +197,7 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
else else
stack.push(Frame{.node = frame.node->children[next_child]}); stack.push(Frame{.node = frame.node->children[next_child]});
if (last_pipeline && has_partial_result_setting) if (has_partial_result_setting && last_pipeline && !last_pipeline->isPartialResultActive())
last_pipeline->activatePartialResult(build_pipeline_settings.partial_result_limit, build_pipeline_settings.partial_result_duration_ms); last_pipeline->activatePartialResult(build_pipeline_settings.partial_result_limit, build_pipeline_settings.partial_result_duration_ms);
} }

View File

@ -8,33 +8,38 @@ AggregatingPartialResultTransform::AggregatingPartialResultTransform(
UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_) UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_)
: PartialResultTransform(input_header, output_header, partial_result_limit_, partial_result_duration_ms_) : PartialResultTransform(input_header, output_header, partial_result_limit_, partial_result_duration_ms_)
, aggregating_transform(std::move(aggregating_transform_)) , aggregating_transform(std::move(aggregating_transform_))
, transform_aggregator(input_header, aggregating_transform->params->params)
{} {}
void AggregatingPartialResultTransform::transformPartialResult(Chunk & chunk)
{
auto & params = aggregating_transform->params->params;
bool no_more_keys = false;
AggregatedDataVariants variants;
ColumnRawPtrs key_columns(params.keys_size);
Aggregator::AggregateColumns aggregate_columns(params.aggregates_size);
const UInt64 num_rows = chunk.getNumRows();
transform_aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys);
auto transformed_block = transform_aggregator.convertToBlocks(variants, /*final*/ true, /*max_threads*/ 1).front();
chunk = convertToChunk(transformed_block);
}
PartialResultTransform::ShaphotResult AggregatingPartialResultTransform::getRealProcessorSnapshot() PartialResultTransform::ShaphotResult AggregatingPartialResultTransform::getRealProcessorSnapshot()
{ {
std::lock_guard lock(aggregating_transform->snapshot_mutex); std::lock_guard lock(aggregating_transform->snapshot_mutex);
auto & params = aggregating_transform->params;
/// Currently not supported cases
/// TODO: check that insert results from prepareBlockAndFillWithoutKey return values without changing of the aggregator state
if (params->params.keys_size != 0 /// has at least one key for aggregation
|| params->aggregator.hasTemporaryData() /// use external storage for aggregation
|| aggregating_transform->many_data->variants.size() > 1) /// use more then one stream for aggregation
return {{}, SnaphotStatus::Stopped};
if (aggregating_transform->is_generate_initialized) if (aggregating_transform->is_generate_initialized)
return {{}, SnaphotStatus::Stopped}; return {{}, SnaphotStatus::Stopped};
if (aggregating_transform->variants.empty()) if (aggregating_transform->variants.empty())
return {{}, SnaphotStatus::NotReady}; return {{}, SnaphotStatus::NotReady};
auto & aggregator = params->aggregator; auto & snapshot_aggregator = aggregating_transform->params->aggregator;
auto & snapshot_variants = aggregating_transform->many_data->variants;
auto prepared_data = aggregator.prepareVariantsToMerge(aggregating_transform->many_data->variants); auto block = snapshot_aggregator.prepareBlockAndFillWithoutKeySnapshot(*snapshot_variants.at(0));
AggregatedDataVariantsPtr & first = prepared_data.at(0);
aggregator.mergeWithoutKeyDataImpl(prepared_data);
auto block = aggregator.prepareBlockAndFillWithoutKeySnapshot(*first);
return {convertToChunk(block), SnaphotStatus::Ready}; return {convertToChunk(block), SnaphotStatus::Ready};
} }

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <Interpreters/Aggregator.h>
#include <Processors/Transforms/AggregatingTransform.h> #include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/PartialResultTransform.h> #include <Processors/Transforms/PartialResultTransform.h>
@ -17,10 +18,12 @@ public:
String getName() const override { return "AggregatingPartialResultTransform"; } String getName() const override { return "AggregatingPartialResultTransform"; }
void transformPartialResult(Chunk & chunk) override;
ShaphotResult getRealProcessorSnapshot() override; ShaphotResult getRealProcessorSnapshot() override;
private: private:
AggregatingTransformPtr aggregating_transform; AggregatingTransformPtr aggregating_transform;
Aggregator transform_aggregator;
}; };
} }

View File

@ -664,11 +664,13 @@ void AggregatingTransform::consume(Chunk chunk)
{ {
auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns()); auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns());
block = materializeBlock(block); block = materializeBlock(block);
LOG_DEBUG(log, "AggregatingTransform::consume. Merge Block columns {}", block.dumpNames());
if (!params->aggregator.mergeOnBlock(block, variants, no_more_keys)) if (!params->aggregator.mergeOnBlock(block, variants, no_more_keys))
is_consume_finished = true; is_consume_finished = true;
} }
else else
{ {
LOG_DEBUG(log, "AggregatingTransform::consume. Execute Block");
if (!params->aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys)) if (!params->aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys))
is_consume_finished = true; is_consume_finished = true;
} }

View File

@ -170,7 +170,16 @@ public:
void work() override; void work() override;
Processors expandPipeline() override; Processors expandPipeline() override;
bool supportPartialResultProcessor() const override { return true; } PartialResultStatus getPartialResultProcessorSupportStatus() const override {
/// Currently AggregatingPartialResultTransform support only single-thread aggregation without key.
/// TODO: check that insert results from aggregator.prepareBlockAndFillWithoutKey return values without
/// changing of the aggregator state when aggregation with keys will be supported in AggregatingPartialResultTransform.
bool is_partial_result_supported = params->params.keys_size == 0 /// Aggregation without key.
&& many_data->variants.size() == 1; /// Use only one stream for aggregation.
return is_partial_result_supported ? PartialResultStatus::FullSupported : PartialResultStatus::NotSupported;
}
protected: protected:
void consume(Chunk chunk); void consume(Chunk chunk);
@ -217,6 +226,10 @@ private:
bool is_consume_started = false; bool is_consume_started = false;
friend class AggregatingPartialResultTransform; friend class AggregatingPartialResultTransform;
/// The mutex protects variables that are used for creating a snapshot of the current processor.
/// The current implementation of AggregatingPartialResultTransform uses the 'is_generate_initialized' variable to check
/// whether the processor has started sending data through the main pipeline, and the corresponding partial result processor should stop creating snapshots.
/// Additionally, the mutex protects the 'params->aggregator' and 'many_data->variants' variables, which are used to get data from them for a snapshot.
std::mutex snapshot_mutex; std::mutex snapshot_mutex;
void initGenerate(); void initGenerate();

View File

@ -26,7 +26,7 @@ public:
static Block transformHeader(Block header, const ActionsDAG & expression); static Block transformHeader(Block header, const ActionsDAG & expression);
bool supportPartialResultProcessor() const override { return true; } PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::FullSupported; }
protected: protected:
void transform(Chunk & chunk) override; void transform(Chunk & chunk) override;

View File

@ -29,11 +29,11 @@ void LimitPartialResultTransform::transformPartialResult(Chunk & chunk)
/// Check if some rows should be removed /// Check if some rows should be removed
if (length < num_rows) if (length < num_rows)
{ {
auto columns = chunk.detachColumns();
UInt64 num_columns = chunk.getNumColumns(); UInt64 num_columns = chunk.getNumColumns();
auto columns = chunk.detachColumns();
for (UInt64 i = 0; i < num_columns; ++i) for (UInt64 i = 0; i < num_columns; ++i)
columns[i] = columns[i]->cut(offset, limit); columns[i] = columns[i]->cut(offset, length);
chunk.setColumns(std::move(columns), length); chunk.setColumns(std::move(columns), length);
} }

View File

@ -23,6 +23,8 @@ public:
String getName() const override { return "LimitPartialResultTransform"; } String getName() const override { return "LimitPartialResultTransform"; }
void transformPartialResult(Chunk & chunk) override; void transformPartialResult(Chunk & chunk) override;
/// LimitsTransform doesn't have a state which can be snapshoted
ShaphotResult getRealProcessorSnapshot() override { return {{}, SnaphotStatus::Stopped}; }
private: private:
UInt64 limit; UInt64 limit;

View File

@ -75,10 +75,4 @@ void LimitsCheckingTransform::checkQuota(Chunk & chunk)
} }
} }
ProcessorPtr LimitsCheckingTransform::getPartialResultProcessor(const ProcessorPtr & /*current_processor*/, UInt64 partial_result_limit, UInt64 partial_result_duration_ms)
{
const auto & header = inputs.front().getHeader();
return std::make_shared<PartialResultTransform>(header, partial_result_limit, partial_result_duration_ms);
}
} }

View File

@ -33,13 +33,11 @@ public:
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) { quota = quota_; } void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) { quota = quota_; }
bool supportPartialResultProcessor() const override { return true; } PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::SkipSupported; }
protected: protected:
void transform(Chunk & chunk) override; void transform(Chunk & chunk) override;
ProcessorPtr getPartialResultProcessor(const ProcessorPtr & current_processor, UInt64 partial_result_limit, UInt64 partial_result_duration_ms) override;
private: private:
StreamLocalLimits limits; StreamLocalLimits limits;

View File

@ -17,6 +17,8 @@ public:
String getName() const override { return "MergeSortingPartialResultTransform"; } String getName() const override { return "MergeSortingPartialResultTransform"; }
/// MergeSortingTransform always receives chunks in a sorted state, so transformation is not needed
void transformPartialResult(Chunk & /*chunk*/) override {}
ShaphotResult getRealProcessorSnapshot() override; ShaphotResult getRealProcessorSnapshot() override;
private: private:

View File

@ -33,7 +33,7 @@ public:
String getName() const override { return "MergeSortingTransform"; } String getName() const override { return "MergeSortingTransform"; }
bool supportPartialResultProcessor() const override { return true; } PartialResultStatus getPartialResultProcessorSupportStatus() const override { return PartialResultStatus::FullSupported; }
protected: protected:
void consume(Chunk chunk) override; void consume(Chunk chunk) override;
@ -65,6 +65,10 @@ private:
ProcessorPtr external_merging_sorted; ProcessorPtr external_merging_sorted;
friend class MergeSortingPartialResultTransform; friend class MergeSortingPartialResultTransform;
/// The mutex protects variables that are used for creating a snapshot of the current processor.
/// The current implementation of MergeSortingPartialResultTransform uses the 'generated_prefix' variable to check
/// whether the processor has started sending data through the main pipeline, and the corresponding partial result processor should stop creating snapshots.
/// Additionally, the mutex protects the 'chunks' variable and all variables in the 'remerge' function, which is used to transition 'chunks' to a sorted state.
std::mutex snapshot_mutex; std::mutex snapshot_mutex;
}; };

View File

@ -5,6 +5,11 @@
namespace DB namespace DB
{ {
/// Processors of this type are used to construct an auxiliary pipeline with processors corresponding to those in the main pipeline.
/// These processors work in two modes:
/// 1) Creating a snapshot of the corresponding processor from the main pipeline once per partial_result_duration_ms (period in milliseconds), and then sending the snapshot through the partial result pipeline.
/// 2) Transforming small blocks of data in the same way as the original processor and sending the transformed data through the partial result pipeline.
/// All processors of this type rely on the invariant that a new block from the previous processor of the partial result pipeline overwrites information about the previous block of the same previous processor.
class PartialResultTransform : public IProcessor class PartialResultTransform : public IProcessor
{ {
public: public:
@ -42,8 +47,8 @@ protected:
bool finished_getting_snapshots = false; bool finished_getting_snapshots = false;
virtual void transformPartialResult(Chunk & /*chunk*/) {} virtual void transformPartialResult(Chunk & /*chunk*/) = 0;
virtual ShaphotResult getRealProcessorSnapshot() { return {{}, SnaphotStatus::Stopped}; } virtual ShaphotResult getRealProcessorSnapshot() = 0; // { return {{}, SnaphotStatus::Stopped}; }
private: private:
Stopwatch watch; Stopwatch watch;

View File

@ -407,12 +407,13 @@ void Pipe::addExtremesSource(ProcessorPtr source)
void Pipe::activatePartialResult(UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_) void Pipe::activatePartialResult(UInt64 partial_result_limit_, UInt64 partial_result_duration_ms_)
{ {
if (!is_partial_result_active) if (is_partial_result_active)
partial_result_ports.assign(output_ports.size(), nullptr); throw Exception(ErrorCodes::LOGICAL_ERROR, "Partial result for Pipe should be initialized only once");
is_partial_result_active = true; is_partial_result_active = true;
partial_result_limit = partial_result_limit_; partial_result_limit = partial_result_limit_;
partial_result_duration_ms = partial_result_duration_ms_; partial_result_duration_ms = partial_result_duration_ms_;
partial_result_ports.assign(output_ports.size(), nullptr);
} }
static void dropPort(OutputPort *& port, Processors & processors, Processors * collected_processors) static void dropPort(OutputPort *& port, Processors & processors, Processors * collected_processors)
@ -629,12 +630,13 @@ void Pipe::addPartialResultSimpleTransform(const ProcessorPtr & transform, size_
if (isPartialResultActive()) if (isPartialResultActive())
{ {
auto & partial_result_port = partial_result_ports[partial_result_port_id]; auto & partial_result_port = partial_result_ports[partial_result_port_id];
auto partial_result_status = transform->getPartialResultProcessorSupportStatus();
if (!transform->supportPartialResultProcessor()) if (partial_result_status == IProcessor::PartialResultStatus::NotSupported)
{
dropPort(partial_result_port, *processors, collected_processors); dropPort(partial_result_port, *processors, collected_processors);
if (partial_result_status != IProcessor::PartialResultStatus::FullSupported)
return; return;
}
auto partial_result_transform = IProcessor::getPartialResultProcessorPtr(transform, partial_result_limit, partial_result_duration_ms); auto partial_result_transform = IProcessor::getPartialResultProcessorPtr(transform, partial_result_limit, partial_result_duration_ms);
@ -651,23 +653,34 @@ void Pipe::addPartialResultTransform(const ProcessorPtr & transform)
if (isPartialResultActive()) if (isPartialResultActive())
{ {
size_t new_outputs_size = transform->getOutputs().size(); size_t new_outputs_size = transform->getOutputs().size();
auto partial_result_status = transform->getPartialResultProcessorSupportStatus();
if (!transform->supportPartialResultProcessor()) if (partial_result_status == IProcessor::PartialResultStatus::SkipSupported && new_outputs_size != partial_result_ports.size())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot skip transform {} in the partial result part of the Pipe because it has {} output ports, but the partial result part expects {} output ports",
transform->getName(),
new_outputs_size,
partial_result_ports.size());
if (partial_result_status == IProcessor::PartialResultStatus::NotSupported)
{ {
for (auto & partial_result_port : partial_result_ports) for (auto & partial_result_port : partial_result_ports)
dropPort(partial_result_port, *processors, collected_processors); dropPort(partial_result_port, *processors, collected_processors);
partial_result_ports.assign(new_outputs_size, nullptr); partial_result_ports.assign(new_outputs_size, nullptr);
return;
} }
if (partial_result_status != IProcessor::PartialResultStatus::FullSupported)
return;
auto partial_result_transform = IProcessor::getPartialResultProcessorPtr(transform, partial_result_limit, partial_result_duration_ms); auto partial_result_transform = IProcessor::getPartialResultProcessorPtr(transform, partial_result_limit, partial_result_duration_ms);
auto & inputs = partial_result_transform->getInputs(); auto & inputs = partial_result_transform->getInputs();
if (inputs.size() != partial_result_ports.size()) if (inputs.size() != partial_result_ports.size())
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipe because it has {} input ports, but {} expected", "Cannot add partial result transform {} to Pipe because it has {} input ports, but {} expected",
partial_result_transform->getName(), partial_result_transform->getName(),
inputs.size(), inputs.size(),
partial_result_ports.size()); partial_result_ports.size());

View File

@ -11,51 +11,69 @@ sys.path.insert(0, os.path.join(CURDIR, "helpers"))
from tcp_client import TCPClient from tcp_client import TCPClient
def main(): def run_query_without_errors(query, support_partial_result):
with TCPClient() as client: with TCPClient() as client:
client.sendQuery( client.sendQuery(query)
"SELECT number FROM numbers_mt(1e7+1) ORDER BY -number LIMIT 15 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 10"
)
# external tables # external tables
client.sendEmptyBlock() client.sendEmptyBlock()
client.readHeader() client.readHeader()
# Partial result # Partial result
_, partial_result = client.readDataWithoutProgress()[0] partial_result = client.readDataWithoutProgress()[0]
if support_partial_result:
assert ( assert (
len(partial_result) > 0 len(partial_result.value) > 0
), "Expected at least one block with a non-empty partial result before getting the full result" ), "Expected at least one block with a non-empty partial result before getting the full result"
while True: while True:
assert all( assert all(
a >= b for a, b in zip(partial_result, partial_result[1:]) a >= b for a, b in zip(partial_result.value, partial_result.value[1:])
), "Partial result always should be sorted for this test" ), "Partial result always should be sorted for this test"
_, new_partial_result = client.readDataWithoutProgress( new_partial_result = client.readDataWithoutProgress(
need_print_info=False need_print_info=False
)[0] )[0]
if len(new_partial_result) == 0: if len(new_partial_result.value) == 0:
break break
data_size = len(partial_result) data_size = len(partial_result.value)
assert all( assert all(
partial_result[i] <= new_partial_result[i] for i in range(data_size) partial_result.value[i] <= new_partial_result.value[i] for i in range(data_size)
), f"New partial result values should always be greater then old one because a new block contains more information about the full data. New result {new_partial_result}. Previous result {partial_result}" ), f"New partial result values should always be greater then old one because a new block contains more information about the full data. New result {new_partial_result}. Previous result {partial_result}"
partial_result = new_partial_result partial_result = new_partial_result
else:
block_rows = len(partial_result.value)
assert (
block_rows == 0
), f"Expected only empty partial result block before getting the full result, but block has {block_rows} rows"
# Full result # Full result
_, full_result = client.readDataWithoutProgress()[0] full_result = client.readDataWithoutProgress()[0]
data_size = len(partial_result) data_size = len(partial_result.value)
assert all( assert all(
partial_result[i] <= full_result[i] for i in range(data_size) partial_result.value[i] <= full_result.value[i] for i in range(data_size)
), f"Full result values should always be greater then partial result values. Full result {full_result}. Partial result {partial_result}" ), f"Full result values should always be greater then partial result values. Full result {full_result}. Partial result {partial_result}"
for result in full_result: for result in full_result.value:
print(result) print(result)
def main():
# Request with partial result limit less then full limit
run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 5 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 3", True)
# Request with partial result limit greater then full limit
run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", True)
# Request with OFFSET
run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 1 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", True)
# Request with OFFSET greater then partial result limit (partial result pipeline use blocks with less then OFFSET, so there will be no elements in block after LimitPartialResultTransform)
run_query_without_errors("SELECT number FROM numbers_mt(5e6+1) ORDER BY -number LIMIT 3 OFFSET 15 SETTINGS max_threads = 1, partial_result_update_duration_ms = 1, max_rows_in_partial_result = 5", False)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,21 +1,38 @@
Rows 0 Columns 1 Rows 0 Columns 1
Column number type UInt64 Column number type UInt64
Rows 10 Columns 1 Rows 3 Columns 1
Column number type UInt64 Column number type UInt64
Rows 15 Columns 1 Rows 5 Columns 1
Column number type UInt64 Column number type UInt64
10000000 5000000
9999999 4999999
9999998 4999998
9999997 4999997
9999996 4999996
9999995 Rows 0 Columns 1
9999994 Column number type UInt64
9999993 Rows 3 Columns 1
9999992 Column number type UInt64
9999991 Rows 3 Columns 1
9999990 Column number type UInt64
9999989 5000000
9999988 4999999
9999987 4999998
9999986 Rows 0 Columns 1
Column number type UInt64
Rows 3 Columns 1
Column number type UInt64
Rows 3 Columns 1
Column number type UInt64
4999999
4999998
4999997
Rows 0 Columns 1
Column number type UInt64
Rows 0 Columns 1
Column number type UInt64
Rows 3 Columns 1
Column number type UInt64
4999985
4999984
4999983

View File

@ -11,11 +11,31 @@ sys.path.insert(0, os.path.join(CURDIR, "helpers"))
from tcp_client import TCPClient from tcp_client import TCPClient
def get_keys(result): def get_keys(results):
return [key for key, _ in rasults] return [key for key, _ in results]
def run_query_without_errors(query, support_partial_result, invariants=None): def check_new_result(new_results, old_results, invariants, rows_limit):
if rows_limit is not None:
assert (
len(new_results[0].value) <= rows_limit
), f"Result should have no more then {rows_limit} rows. But it has {len(new_results[0].value)} rows"
for new_result, old_result in zip(new_results, old_results):
assert (
new_result.key == old_result.key
), f"Keys in blocks should be in the same order. Full results keys {get_keys(full_results)}. Partial results keys {get_keys(partial_results)}"
key = new_result.key
if key in invariants:
new_value = new_result.value
old_value = old_result.value
assert invariants[key](
old_value, new_value
), f"Problem with the invariant between new and old result for key: {key}. New value {new_value}. Old value {old_value}"
def run_query_without_errors(query, support_partial_result, invariants=None, rows_limit=None):
if invariants is None: if invariants is None:
invariants = {} invariants = {}
@ -30,54 +50,35 @@ def run_query_without_errors(query, support_partial_result, invariants=None):
partial_results = client.readDataWithoutProgress() partial_results = client.readDataWithoutProgress()
if support_partial_result: if support_partial_result:
assert ( assert (
len(partial_results[0][1]) > 0 len(partial_results) > 0 and len(partial_results[0].value) > 0
), "Expected at least one block with a non-empty partial result before getting the full result" ), "Expected at least one block with a non-empty partial result before getting the full result"
while True: while True:
new_partial_results = client.readDataWithoutProgress( new_partial_results = client.readDataWithoutProgress(
need_print_info=False need_print_info=False
) )
if len(new_partial_results[0][1]) == 0: if len(new_partial_results[0].value) == 0:
break break
for new_result, old_result in zip(new_partial_results, partial_results): check_new_result(new_partial_results, partial_results, invariants, rows_limit)
assert ( partial_results = new_partial_results
new_result[0] == old_result[0]
), "Keys in blocks should be in the same order"
key = new_result[0]
if key in invariants:
old_value = old_result[1]
new_value = new_result[1]
assert invariants[key](
old_value, new_value
), f"Problem with the invariant between old and new versions of a partial result for key: {key}. Old value {old_value}, new value {new_value}"
else: else:
block_rows = len(partial_results[0].value)
assert ( assert (
len(partial_results[0][1]) == 0 block_rows == 0
), "Expected no non-empty partial result blocks before getting the full result" ), f"Expected only empty partial result block before getting the full result, but block has {block_rows} rows"
# Full result # Full result
full_results = client.readDataWithoutProgress() full_results = client.readDataWithoutProgress()
if support_partial_result: if support_partial_result:
for full_result, partial_result in zip(full_results, partial_results): check_new_result(full_results, partial_results, invariants, rows_limit)
assert (
full_result[0] == partial_result[0]
), f"Keys in blocks should be in the same order. Full results keys {get_keys(full_results)}. Partial results keys {get_keys(partial_results)}"
key = full_result[0] for data in full_results:
if key in invariants: if isinstance(data.value[0], int):
full_value = full_result[1] print(data.key, data.value)
partial_value = partial_result[1]
assert invariants[key](
partial_value, full_value
), f"Problem with the invariant between full and partial result for key: {key}. Partial value {partial_value}. Full value {full_value}"
for key, value in full_results:
if isinstance(value[0], int):
print(key, value)
def supported_scenarios(): def supported_scenarios_without_key():
# Simple aggregation query
query = "select median(number), stddevSamp(number), stddevPop(number), max(number), min(number), any(number), count(number), avg(number), sum(number) from numbers_mt(1e7+1) settings max_threads = 1, partial_result_update_duration_ms = 1" query = "select median(number), stddevSamp(number), stddevPop(number), max(number), min(number), any(number), count(number), avg(number), sum(number) from numbers_mt(1e7+1) settings max_threads = 1, partial_result_update_duration_ms = 1"
invariants = { invariants = {
"median(number)": lambda old_value, new_value: old_value <= new_value, "median(number)": lambda old_value, new_value: old_value <= new_value,
@ -87,21 +88,28 @@ def supported_scenarios():
"avg(number)": lambda old_value, new_value: old_value <= new_value, "avg(number)": lambda old_value, new_value: old_value <= new_value,
"sum(number)": lambda old_value, new_value: old_value <= new_value, "sum(number)": lambda old_value, new_value: old_value <= new_value,
} }
run_query_without_errors(query, support_partial_result=True, invariants=invariants) run_query_without_errors(query, support_partial_result=True, invariants=invariants, rows_limit=1)
# Aggregation query with a nested ORDER BY subquery
query = "select median(number), stddevSamp(number), stddevPop(number), max(number), min(number), any(number), count(number), avg(number), sum(number) FROM (SELECT number FROM numbers_mt(1e7) ORDER BY -number LIMIT 3) settings max_threads = 1, partial_result_update_duration_ms=1"
# Aggregation receives small partial result blocks from ORDER BY which always sends blocks with bigger values
invariants["min(number)"] = lambda old_value, new_value: old_value <= new_value
run_query_without_errors(query, support_partial_result=True, invariants=invariants, rows_limit=1)
def unsupported_scenarios(): def unsupported_scenarios():
# Currently aggregator for partial result supports only single thread aggregation without key # Currently aggregator for partial result supports only single thread aggregation without key
# Update test when multithreading or aggregation with GROUP BY will be supported for partial result updates # Update test when multithreading or aggregation with GROUP BY will be supported for partial result updates
multithread_query = "select sum(number) from numbers_mt(1e7+1) settings max_threads = 2, partial_result_update_duration_ms = 1" multithread_query = "select sum(number) from numbers_mt(1e7+1) settings max_threads = 2, partial_result_update_duration_ms = 100"
run_query_without_errors(multithread_query, support_partial_result=False) run_query_without_errors(multithread_query, support_partial_result=False)
group_with_key_query = "select mod2, sum(number) from numbers_mt(1e7+1) group by number % 2 as mod2 settings max_threads = 1, partial_result_update_duration_ms = 1" group_with_key_query = "select mod2, sum(number) from numbers_mt(1e7+1) group by number % 2 as mod2 settings max_threads = 1, partial_result_update_duration_ms = 100"
run_query_without_errors(group_with_key_query, support_partial_result=False) run_query_without_errors(group_with_key_query, support_partial_result=False)
def main(): def main():
supported_scenarios() supported_scenarios_without_key()
unsupported_scenarios() unsupported_scenarios()

View File

@ -33,6 +33,41 @@ min(number) [0]
any(number) [0] any(number) [0]
count(number) [10000001] count(number) [10000001]
sum(number) [50000005000000] sum(number) [50000005000000]
Rows 0 Columns 9
Column median(number) type Float64
Column stddevSamp(number) type Float64
Column stddevPop(number) type Float64
Column max(number) type UInt64
Column min(number) type UInt64
Column any(number) type UInt64
Column count(number) type UInt64
Column avg(number) type Float64
Column sum(number) type UInt64
Rows 1 Columns 9
Column median(number) type Float64
Column stddevSamp(number) type Float64
Column stddevPop(number) type Float64
Column max(number) type UInt64
Column min(number) type UInt64
Column any(number) type UInt64
Column count(number) type UInt64
Column avg(number) type Float64
Column sum(number) type UInt64
Rows 1 Columns 9
Column median(number) type Float64
Column stddevSamp(number) type Float64
Column stddevPop(number) type Float64
Column max(number) type UInt64
Column min(number) type UInt64
Column any(number) type UInt64
Column count(number) type UInt64
Column avg(number) type Float64
Column sum(number) type UInt64
max(number) [9999999]
min(number) [9999997]
any(number) [9999999]
count(number) [3]
sum(number) [29999994]
Rows 0 Columns 1 Rows 0 Columns 1
Column sum(number) type UInt64 Column sum(number) type UInt64
Rows 0 Columns 1 Rows 0 Columns 1

View File

@ -57,6 +57,11 @@ def assertPacket(packet, expected):
assert packet == expected, "Got: {}, expected: {}".format(packet, expected) assert packet == expected, "Got: {}, expected: {}".format(packet, expected)
class Data(object):
def __init__(self, key, value):
self.key = key
self.value = value
class TCPClient(object): class TCPClient(object):
def __init__(self, timeout=30): def __init__(self, timeout=30):
self.timeout = timeout self.timeout = timeout
@ -301,6 +306,6 @@ class TCPClient(object):
if need_print_info: if need_print_info:
print("Column {} type {}".format(col_name, type_name)) print("Column {} type {}".format(col_name, type_name))
data.append((col_name, self.readRow(type_name, rows))) data.append(Data(col_name, self.readRow(type_name, rows)))
return data return data