Fix more tests.

This commit is contained in:
Nikolai Kochetov 2021-09-20 12:05:34 +03:00
parent 2b7d71d3a2
commit 9398c22aae
7 changed files with 32 additions and 15 deletions

View File

@ -306,9 +306,12 @@ Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const S
auto io = executeQuery(select_query, getContext(), true);
PullingPipelineExecutor executor(io.pipeline);
Block res;
bool need_another_read = executor.pull(res);
while (!res && executor.pull(res));
if (res && need_another_read)
Block tmp_block;
while (executor.pull(tmp_block))
if (tmp_block)
throw Exception("Expected one block from input stream", ErrorCodes::LOGICAL_ERROR);
return res;

View File

@ -775,7 +775,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
log_queries,
log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
status_info_to_query_log
status_info_to_query_log,
pulling_pipeline = pipeline.pulling()
]
(QueryPipeline & query_pipeline) mutable
{
@ -805,7 +806,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (progress_callback)
progress_callback(Progress(WriteProgress(info.written_rows, info.written_bytes)));
else if (query_pipeline.pulling())
if (pulling_pipeline)
{
query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes);
}

View File

@ -327,6 +327,8 @@ void QueryPipeline::complete(Chain chain)
if (!pulling())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline must be pulling to be completed with chain");
resources = chain.detachResources();
drop(totals, processors);
drop(extremes, processors);
@ -412,6 +414,7 @@ void QueryPipeline::complete(std::shared_ptr<IOutputFormat> format)
extremes = nullptr;
initRowsBeforeLimit(format.get());
output_format = format.get();
processors.emplace_back(std::move(format));
}
@ -476,15 +479,11 @@ void QueryPipeline::setLimitsAndQuota(const StreamLocalLimits & limits, std::sha
bool QueryPipeline::tryGetResultRowsAndBytes(size_t & result_rows, size_t & result_bytes) const
{
if (!output || !output->isConnected())
if (!output_format)
return false;
const auto * format = typeid_cast<const IOutputFormat *>(&output->getInputPort().getProcessor());
if (!format)
return false;
result_rows = format->getResultRows();
result_bytes = format->getResultBytes();
result_rows = output_format->getResultRows();
result_bytes = output_format->getResultBytes();
return true;
}

View File

@ -109,6 +109,8 @@ private:
QueryStatus * process_list_element = nullptr;
IOutputFormat * output_format = nullptr;
size_t num_threads = 0;
friend class PushingPipelineExecutor;

View File

@ -84,13 +84,22 @@ void QueryPipelineBuilder::init(QueryPipeline pipeline)
if (initialized())
throw Exception("Pipeline has already been initialized.", ErrorCodes::LOGICAL_ERROR);
if (!pipeline.pulling())
throw Exception("Can't initialize not pulling pipeline.", ErrorCodes::LOGICAL_ERROR);
if (pipeline.pushing())
throw Exception("Can't initialize pushing pipeline.", ErrorCodes::LOGICAL_ERROR);
pipe.holder = std::move(pipeline.resources);
pipe.processors = std::move(pipeline.processors);
pipe.output_ports = {pipeline.output};
pipe.header = pipeline.output->getHeader();
if (pipeline.output)
{
pipe.output_ports = {pipeline.output};
pipe.header = pipeline.output->getHeader();
}
else
{
pipe.output_ports.clear();
pipe.header = {};
}
pipe.totals_port = pipeline.totals;
pipe.extremes_port = pipeline.extremes;
pipe.max_parallel_streams = pipeline.num_threads;

View File

@ -463,6 +463,8 @@ void LogSink::writeMarks(MarksForColumns && marks)
}
}
StorageLog::~StorageLog() = default;
StorageLog::StorageLog(
DiskPtr disk_,
const String & relative_path_,

View File

@ -23,6 +23,7 @@ class StorageLog final : public shared_ptr_helper<StorageLog>, public IStorage
friend struct shared_ptr_helper<StorageLog>;
public:
~StorageLog() override;
String getName() const override { return "Log"; }
Pipe read(