Fix test_executable_table_function

This commit is contained in:
Nikolai Kochetov 2021-09-22 22:31:12 +03:00
parent eed4e8c754
commit 4f802d1cea
7 changed files with 48 additions and 11 deletions

View File

@ -275,6 +275,7 @@ Chain buildPushingToViewsChain(
out = buildPushingToViewsChain(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), false, view_thread_status, view_counter_ms);
assert(views_data != nullptr);
views_data->views.emplace_back(ViewRuntimeData{
std::move(query),
out.getInputHeader(),

View File

@ -339,7 +339,7 @@ static inline void dumpDataForTables(
pipeline.getHeader(), mysql_input_stream_settings);
auto counting = std::make_shared<CountingTransform>(pipeline.getHeader());
Pipe pipe(std::move(input));
pipe.addTransform(std::move(counting));
pipe.addTransform(counting);
pipeline.complete(std::move(pipe));
Stopwatch watch;

View File

@ -430,8 +430,13 @@ void PipelineExecutor::execute(size_t num_threads)
bool PipelineExecutor::executeStep(std::atomic_bool * yield_flag)
{
if (!is_execution_initialized)
{
initializeExecution(1);
if (yield_flag && *yield_flag)
return true;
}
executeStepImpl(0, 1, yield_flag);
if (!finished)

View File

@ -307,6 +307,38 @@ QueryPipeline::QueryPipeline(Chain chain)
input = &chain.getInputPort();
}
QueryPipeline::QueryPipeline(std::shared_ptr<IOutputFormat> format)
{
auto & format_main = format->getPort(IOutputFormat::PortKind::Main);
auto & format_totals = format->getPort(IOutputFormat::PortKind::Totals);
auto & format_extremes = format->getPort(IOutputFormat::PortKind::Extremes);
if (!totals)
{
auto source = std::make_shared<NullSource>(format_totals.getHeader());
totals = &source->getPort();
processors.emplace_back(std::move(source));
}
if (!extremes)
{
auto source = std::make_shared<NullSource>(format_extremes.getHeader());
extremes = &source->getPort();
processors.emplace_back(std::move(source));
}
connect(*totals, format_totals);
connect(*extremes, format_extremes);
input = &format_main;
totals = nullptr;
extremes = nullptr;
output_format = format.get();
processors.emplace_back(std::move(format));
}
static void drop(OutputPort *& port, Processors & processors)
{
if (!port)
@ -489,7 +521,7 @@ void QueryPipeline::setLimitsAndQuota(const StreamLocalLimits & limits, std::sha
}
bool QueryPipeline::tryGetResultRowsAndBytes(size_t & result_rows, size_t & result_bytes) const
bool QueryPipeline::tryGetResultRowsAndBytes(UInt64 & result_rows, UInt64 & result_bytes) const
{
if (!output_format)
return false;

View File

@ -46,6 +46,7 @@ public:
/// pushing
explicit QueryPipeline(Chain chain);
explicit QueryPipeline(std::shared_ptr<SinkToStorage> sink);
explicit QueryPipeline(std::shared_ptr<IOutputFormat> format);
/// completed
QueryPipeline(
@ -92,7 +93,7 @@ public:
void setProcessListElement(QueryStatus * elem);
void setProgressCallback(const ProgressCallback & callback);
void setLimitsAndQuota(const StreamLocalLimits & limits, std::shared_ptr<const EnabledQuota> quota);
bool tryGetResultRowsAndBytes(size_t & result_rows, size_t & result_bytes) const;
bool tryGetResultRowsAndBytes(UInt64 & result_rows, UInt64 & result_bytes) const;
void addStorageHolder(StoragePtr storage);

View File

@ -3,6 +3,7 @@
#include <filesystem>
#include <Common/ShellCommand.h>
#include <DataStreams/materializeBlock.h>
#include <Core/Block.h>
#include <IO/ReadHelpers.h>
@ -183,12 +184,12 @@ Pipe StorageExecutable::read(
inputs[i].addTransform(std::move(transform));
}
auto out = FormatFactory::instance().getOutputFormat(format, *write_buffer, inputs[i].getHeader(), context);
out->setAutoFlush();
inputs[i].setOutputFormat(std::move(out));
auto pipeline = std::make_shared<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(inputs[i])));
auto out = FormatFactory::instance().getOutputFormat(format, *write_buffer, materializeBlock(pipeline->getHeader()), context);
out->setAutoFlush();
pipeline->complete(std::move(out));
ShellCommandSource::SendDataTask task = [pipeline, write_buffer, is_executable_pool]()
{
CompletedPipelineExecutor executor(*pipeline);

View File

@ -463,10 +463,7 @@ void LogSink::writeMarks(MarksForColumns && marks)
}
}
StorageLog::~StorageLog()
{
// std::cerr << "======================\n" << StackTrace().toString() << std::endl;
}
StorageLog::~StorageLog() = default;
StorageLog::StorageLog(
DiskPtr disk_,