Header for pipeline.

This commit is contained in:
Nikolai Kochetov 2020-06-26 16:58:28 +03:00
parent 5a678d74c2
commit 79bcc8309c
6 changed files with 103 additions and 19 deletions

View File

@ -111,9 +111,10 @@ static void fillColumn(IColumn & column, const std::string & str)
namespace
{
struct ExplainSettings
struct QueryPlanSettings
{
QueryPlan::ExplainOptions query_plan_options;
QueryPlan::ExplainPlanOptions query_plan_options;
constexpr static char name[] = "PLAN";
std::unordered_map<std::string, std::reference_wrapper<bool>> boolean_settings =
{
@ -121,6 +122,23 @@ struct ExplainSettings
{"description", query_plan_options.description},
{"actions", query_plan_options.actions}
};
};
struct QueryPipelineSettings
{
QueryPlan::ExplainPipelineOptions query_pipeline_options;
constexpr static char name[] = "PIPELINE";
std::unordered_map<std::string, std::reference_wrapper<bool>> boolean_settings =
{
{"header", query_pipeline_options.header},
};
};
template <typename Settings>
struct ExplainSettings : public Settings
{
using Settings::boolean_settings;
bool has(const std::string & name) const
{
@ -151,21 +169,20 @@ struct ExplainSettings
}
};
}
ExplainSettings checkAndGetSettings(const ASTPtr & ast_settings)
template <typename Settings>
ExplainSettings<Settings> checkAndGetSettings(const ASTPtr & ast_settings)
{
if (!ast_settings)
return {};
ExplainSettings settings;
ExplainSettings<Settings> settings;
const auto & set_query = ast_settings->as<ASTSetQuery &>();
for (const auto & change : set_query.changes)
{
if (!settings.has(change.name))
throw Exception("Unknown setting \"" + change.name + "\" for EXPLAIN query. Supported settings: " +
settings.getSettingsList(), ErrorCodes::UNKNOWN_SETTING);
throw Exception("Unknown setting \"" + change.name + "\" for EXPLAIN " + Settings::name + " query. "
"Supported settings: " + settings.getSettingsList(), ErrorCodes::UNKNOWN_SETTING);
if (change.value.getType() != Field::Types::UInt64)
throw Exception("Invalid type " + std::string(change.value.getTypeName()) + " for setting \"" + change.name +
@ -182,10 +199,11 @@ ExplainSettings checkAndGetSettings(const ASTPtr & ast_settings)
return settings;
}
}
BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
{
const auto & ast = query->as<ASTExplainQuery &>();
auto settings = checkAndGetSettings(ast.getSettings());
Block sample_block = getSampleBlock();
MutableColumns res_columns = sample_block.cloneEmptyColumns();
@ -208,19 +226,21 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
if (!dynamic_cast<const ASTSelectWithUnionQuery *>(ast.getExplainedQuery().get()))
throw Exception("Only SELECT is supported for EXPLAIN query", ErrorCodes::INCORRECT_QUERY);
auto settings = checkAndGetSettings<QueryPlanSettings>(ast.getSettings());
QueryPlan plan;
InterpreterSelectWithUnionQuery interpreter(ast.getExplainedQuery(), context, SelectQueryOptions());
interpreter.buildQueryPlan(plan);
WriteBufferFromOStream buffer(ss);
plan.explain(buffer, settings.query_plan_options);
plan.explainPlan(buffer, settings.query_plan_options);
}
else if (ast.getKind() == ASTExplainQuery::QueryPipeline)
{
if (!dynamic_cast<const ASTSelectWithUnionQuery *>(ast.getExplainedQuery().get()))
throw Exception("Only SELECT is supported for EXPLAIN query", ErrorCodes::INCORRECT_QUERY);
auto settings = checkAndGetSettings<QueryPipelineSettings>(ast.getSettings());
QueryPlan plan;
InterpreterSelectWithUnionQuery interpreter(ast.getExplainedQuery(), context, SelectQueryOptions());
@ -228,7 +248,7 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
plan.buildQueryPipeline();
WriteBufferFromOStream buffer(ss);
plan.explainPipeline(buffer);
plan.explainPipeline(buffer, settings.query_pipeline_options);
}
fillColumn(*res_columns[0], ss.str());

View File

@ -79,7 +79,7 @@ public:
bool empty() const { return processors.empty(); }
void emplace(ProcessorPtr processor);
void emplace(Processors processors_);
Processors * getCollectedProcessors() const;
Processors * getCollectedProcessors() const { return collected_processors; }
Processors * setCollectedProcessors(Processors * collected_processors);
Processors & get() { return processors; }
Processors detach() { return std::move(processors); }

View File

@ -18,6 +18,38 @@ const DataStream & IQueryPlanStep::getOutputStream() const
return *output_stream;
}
static void doDescribeHeader(const Block & header, size_t count, IQueryPlanStep::FormatSettings & settings)
{
String prefix(settings.offset, settings.ident_char);
prefix += "Header";
if (count > 1)
prefix += " x " + std::to_string(count) + " ";
prefix += ": ";
settings.out << prefix;
if (!header)
{
settings.out << " empty\n";
return;
}
prefix.assign(prefix.size(), settings.ident_char);
bool first = true;
for (const auto & elem : header)
{
if (!first)
settings.out << prefix;
first = false;
elem.dumpStructure(settings.out, true);
settings.out << '\n';
}
}
static void doDescribeProcessor(const IProcessor & processor, size_t count, IQueryPlanStep::FormatSettings & settings)
{
settings.out << String(settings.offset, settings.ident_char) << processor.getName();
@ -30,6 +62,28 @@ static void doDescribeProcessor(const IProcessor & processor, size_t count, IQue
settings.out << " " << std::to_string(num_inputs) << " -> " << std::to_string(num_outputs);
settings.out << '\n';
if (settings.write_header)
{
const Block * last_header = nullptr;
size_t num_equal_headers = 0;
for (const auto & port : processor.getOutputs())
{
if (last_header && !blocksHaveEqualStructure(*last_header, port.getHeader()))
{
doDescribeHeader(*last_header, num_equal_headers, settings);
num_equal_headers = 0;
}
++num_equal_headers;
last_header = &port.getHeader();
}
if (last_header)
doDescribeHeader(*last_header, num_equal_headers, settings);
}
settings.offset += settings.ident;
}

View File

@ -61,6 +61,7 @@ public:
size_t offset = 0;
const size_t ident = 2;
const char ident_char = ' ';
const bool write_header = false;
};
/// Get detailed description of step actions. This is shown in EXPLAIN query with options `actions = 1`.

View File

@ -178,7 +178,7 @@ void QueryPlan::addInterpreterContext(std::shared_ptr<Context> context)
static void explainStep(
WriteBuffer & buffer, IQueryPlanStep & step, size_t ident, const QueryPlan::ExplainOptions & options)
WriteBuffer & buffer, IQueryPlanStep & step, size_t ident, const QueryPlan::ExplainPlanOptions & options)
{
std::string prefix(ident, ' ');
buffer << prefix;
@ -227,7 +227,7 @@ static void explainStep(
}
}
void QueryPlan::explain(WriteBuffer & buffer, const ExplainOptions & options)
void QueryPlan::explainPlan(WriteBuffer & buffer, const ExplainPlanOptions & options)
{
checkInitialized();
@ -272,11 +272,11 @@ static void explainPipelineStep(IQueryPlanStep & step, IQueryPlanStep::FormatSet
settings.offset += settings.ident;
}
void QueryPlan::explainPipeline(WriteBuffer & buffer)
void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptions & options)
{
checkInitialized();
IQueryPlanStep::FormatSettings settings{.out = buffer};
IQueryPlanStep::FormatSettings settings{.out = buffer, .write_header = options.header};
struct Frame
{

View File

@ -33,15 +33,24 @@ public:
QueryPipelinePtr buildQueryPipeline();
struct ExplainOptions
struct ExplainPlanOptions
{
/// Add output header to step.
bool header = false;
/// Add description of step.
bool description = true;
/// Add detailed information about step actions.
bool actions = false;
};
void explain(WriteBuffer & buffer, const ExplainOptions & options);
void explainPipeline(WriteBuffer & buffer);
struct ExplainPipelineOptions
{
/// Show header of output ports.
bool header = false;
};
void explainPlan(WriteBuffer & buffer, const ExplainPlanOptions & options);
void explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptions & options);
/// Set upper limit for the recommend number of threads. Will be applied to the newly-created pipelines.
/// TODO: make it in a better way.