Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2019-03-26 20:24:12 +03:00
commit 43fd01589a
38 changed files with 659 additions and 506 deletions

2
contrib/librdkafka vendored

@ -1 +1 @@
Subproject commit 73295a702cd1c85c11749ade500d713db7099cca
Subproject commit 8695b9d63ac0fe1b891b511d5b36302ffc84d4e2

View File

@ -206,28 +206,42 @@ CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block
createActions(list, root);
}
kj::Array<capnp::word> CapnProtoRowInputStream::readMessage()
{
uint32_t segment_count;
istr.readStrict(reinterpret_cast<char*>(&segment_count), sizeof(uint32_t));
// one for segmentCount and one because segmentCount starts from 0
const auto prefix_size = (2 + segment_count) * sizeof(uint32_t);
const auto words_prefix_size = (segment_count + 1) / 2 + 1;
auto prefix = kj::heapArray<capnp::word>(words_prefix_size);
auto prefix_chars = prefix.asChars();
::memcpy(prefix_chars.begin(), &segment_count, sizeof(uint32_t));
// read size of each segment
for (size_t i = 0; i <= segment_count; ++i)
istr.readStrict(prefix_chars.begin() + ((i + 1) * sizeof(uint32_t)), sizeof(uint32_t));
// calculate size of message
const auto expected_words = capnp::expectedSizeInWordsFromPrefix(prefix);
const auto expected_bytes = expected_words * sizeof(capnp::word);
const auto data_size = expected_bytes - prefix_size;
auto msg = kj::heapArray<capnp::word>(expected_words);
auto msg_chars = msg.asChars();
// read full message
::memcpy(msg_chars.begin(), prefix_chars.begin(), prefix_size);
istr.readStrict(msg_chars.begin() + prefix_size, data_size);
return msg;
}
bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{
if (istr.eof())
return false;
// Read from underlying buffer directly
auto buf = istr.buffer();
auto base = reinterpret_cast<const capnp::word *>(istr.position());
// Check if there's enough bytes in the buffer to read the full message
kj::Array<capnp::word> heap_array;
auto array = kj::arrayPtr(base, buf.size() - istr.offset());
auto expected_words = capnp::expectedSizeInWordsFromPrefix(array);
if (expected_words * sizeof(capnp::word) > array.size())
{
// We'll need to reassemble the message in a contiguous buffer
heap_array = kj::heapArray<capnp::word>(expected_words);
istr.readStrict(heap_array.asChars().begin(), heap_array.asChars().size());
array = heap_array.asPtr();
}
auto array = readMessage();
#if CAPNP_VERSION >= 8000
capnp::UnalignedFlatArrayMessageReader msg(array);
@ -281,13 +295,6 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &)
}
}
// Advance buffer position if used directly
if (heap_array.size() == 0)
{
auto parsed = (msg.getEnd() - base) * sizeof(capnp::word);
istr.position() += parsed;
}
return true;
}

View File

@ -38,6 +38,8 @@ public:
bool read(MutableColumns & columns, RowReadExtension &) override;
private:
kj::Array<capnp::word> readMessage();
// Build a traversal plan from a sorted list of fields
void createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader);

View File

@ -58,7 +58,7 @@ namespace
BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Context & context, QueryProcessingStage::Enum processed_stage)
{
InterpreterSelectQuery interpreter{query_ast, context, Names{}, processed_stage};
InterpreterSelectQuery interpreter{query_ast, context, SelectQueryOptions(processed_stage)};
BlockInputStreamPtr stream = interpreter.execute().in;
/** Materialization is needed, since from remote servers the constants come materialized.

View File

@ -76,7 +76,7 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
ASTPtr subquery_select = subquery.children.at(0);
BlockIO res = InterpreterSelectWithUnionQuery(
subquery_select, subquery_context, {}, QueryProcessingStage::Complete, data.subquery_depth + 1).execute();
subquery_select, subquery_context, SelectQueryOptions(QueryProcessingStage::Complete, data.subquery_depth + 1)).execute();
Block block;
try

View File

@ -51,7 +51,8 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
}
else if (ast.getKind() == ASTExplainQuery::AnalyzedSyntax)
{
InterpreterSelectWithUnionQuery interpreter(ast.children.at(0), context, {}, QueryProcessingStage::FetchColumns, 0, true, true);
InterpreterSelectWithUnionQuery interpreter(ast.children.at(0), context,
SelectQueryOptions(QueryProcessingStage::FetchColumns).analyze().modify());
interpreter.getQuery()->format(IAST::FormatSettings(ss, false));
}

View File

@ -84,12 +84,12 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context &
{
/// This is internal part of ASTSelectWithUnionQuery.
/// Even if there is SELECT without union, it is represented by ASTSelectWithUnionQuery with single ASTSelectQuery as a child.
return std::make_unique<InterpreterSelectQuery>(query, context, Names{}, stage);
return std::make_unique<InterpreterSelectQuery>(query, context, SelectQueryOptions(stage));
}
else if (query->as<ASTSelectWithUnionQuery>())
{
ProfileEvents::increment(ProfileEvents::SelectQuery);
return std::make_unique<InterpreterSelectWithUnionQuery>(query, context, Names{}, stage);
return std::make_unique<InterpreterSelectWithUnionQuery>(query, context, SelectQueryOptions(stage));
}
else if (query->as<ASTInsertQuery>())
{

View File

@ -128,7 +128,7 @@ BlockIO InterpreterInsertQuery::execute()
if (query.select)
{
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{query.select, context, {}, QueryProcessingStage::Complete, 1};
InterpreterSelectWithUnionQuery interpreter_select{query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res.in = interpreter_select.execute().in;

View File

@ -78,13 +78,9 @@ namespace ErrorCodes
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const Context & context_,
const Names & required_result_column_names,
QueryProcessingStage::Enum to_stage_,
size_t subquery_depth_,
bool only_analyze_,
bool modify_inplace)
: InterpreterSelectQuery(
query_ptr_, context_, nullptr, nullptr, required_result_column_names, to_stage_, subquery_depth_, only_analyze_, modify_inplace)
const SelectQueryOptions & options,
const Names & required_result_column_names)
: InterpreterSelectQuery(query_ptr_, context_, nullptr, nullptr, options, required_result_column_names)
{
}
@ -92,23 +88,17 @@ InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const Context & context_,
const BlockInputStreamPtr & input_,
QueryProcessingStage::Enum to_stage_,
bool only_analyze_,
bool modify_inplace)
: InterpreterSelectQuery(query_ptr_, context_, input_, nullptr, Names{}, to_stage_, 0, only_analyze_, modify_inplace)
{
}
const SelectQueryOptions & options)
: InterpreterSelectQuery(query_ptr_, context_, input_, nullptr, options.copy().noSubquery())
{}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const Context & context_,
const StoragePtr & storage_,
QueryProcessingStage::Enum to_stage_,
bool only_analyze_,
bool modify_inplace)
: InterpreterSelectQuery(query_ptr_, context_, nullptr, storage_, Names{}, to_stage_, 0, only_analyze_, modify_inplace)
{
}
const SelectQueryOptions & options)
: InterpreterSelectQuery(query_ptr_, context_, nullptr, storage_, options.copy().noSubquery())
{}
InterpreterSelectQuery::~InterpreterSelectQuery() = default;
@ -133,17 +123,12 @@ InterpreterSelectQuery::InterpreterSelectQuery(
const Context & context_,
const BlockInputStreamPtr & input_,
const StoragePtr & storage_,
const Names & required_result_column_names,
QueryProcessingStage::Enum to_stage_,
size_t subquery_depth_,
bool only_analyze_,
bool modify_inplace)
const SelectQueryOptions & options_,
const Names & required_result_column_names)
: options(options_)
/// NOTE: the query almost always should be cloned because it will be modified during analysis.
: query_ptr(modify_inplace ? query_ptr_ : query_ptr_->clone())
, query_ptr(options.modify_inplace ? query_ptr_ : query_ptr_->clone())
, context(context_)
, to_stage(to_stage_)
, subquery_depth(subquery_depth_)
, only_analyze(only_analyze_)
, storage(storage_)
, input(input_)
, log(&Logger::get("InterpreterSelectQuery"))
@ -151,7 +136,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
initSettings();
const Settings & settings = context.getSettingsRef();
if (settings.max_subquery_depth && subquery_depth > settings.max_subquery_depth)
if (settings.max_subquery_depth && options.subquery_depth > settings.max_subquery_depth)
throw Exception("Too deep subqueries. Maximum: " + settings.max_subquery_depth.toString(),
ErrorCodes::TOO_DEEP_SUBQUERIES);
@ -189,7 +174,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
{
/// Read from subquery.
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
table_expression, getSubqueryContext(context), required_columns, QueryProcessingStage::Complete, subquery_depth + 1, only_analyze, modify_inplace);
table_expression, getSubqueryContext(context), options.subquery(), required_columns);
source_header = interpreter_subquery->getSampleBlock();
}
@ -215,13 +200,14 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (storage)
table_lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
syntax_analyzer_result = SyntaxAnalyzer(context, subquery_depth).analyze(
syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze(
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage);
query_analyzer = std::make_unique<ExpressionAnalyzer>(
query_ptr, syntax_analyzer_result, context, NamesAndTypesList(),
NameSet(required_result_column_names.begin(), required_result_column_names.end()), subquery_depth, !only_analyze);
NameSet(required_result_column_names.begin(), required_result_column_names.end()),
options.subquery_depth, !options.only_analyze);
if (!only_analyze)
if (!options.only_analyze)
{
if (query.sample_size() && (input || !storage || !storage->supportsSampling()))
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
@ -238,7 +224,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
context.addExternalTable(it.first, it.second);
}
if (!only_analyze || modify_inplace)
if (!options.only_analyze || options.modify_inplace)
{
if (query_analyzer->isRewriteSubqueriesPredicate())
{
@ -247,11 +233,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
table_expression,
getSubqueryContext(context),
required_columns,
QueryProcessingStage::Complete,
subquery_depth + 1,
only_analyze,
modify_inplace);
options.subquery(),
required_columns);
}
}
@ -304,7 +287,7 @@ Block InterpreterSelectQuery::getSampleBlock()
BlockIO InterpreterSelectQuery::execute()
{
Pipeline pipeline;
executeImpl(pipeline, input, only_analyze);
executeImpl(pipeline, input, options.only_analyze);
executeUnion(pipeline);
BlockIO res;
@ -315,7 +298,7 @@ BlockIO InterpreterSelectQuery::execute()
BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams()
{
Pipeline pipeline;
executeImpl(pipeline, input, only_analyze);
executeImpl(pipeline, input, options.only_analyze);
return pipeline.streams;
}
@ -325,10 +308,10 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
res.first_stage = from_stage < QueryProcessingStage::WithMergeableState
&& to_stage >= QueryProcessingStage::WithMergeableState;
&& options.to_stage >= QueryProcessingStage::WithMergeableState;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
res.second_stage = from_stage <= QueryProcessingStage::WithMergeableState
&& to_stage > QueryProcessingStage::WithMergeableState;
&& options.to_stage > QueryProcessingStage::WithMergeableState;
/** First we compose a chain of actions and remember the necessary steps from it.
* Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and
@ -553,16 +536,16 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
expressions = analyzeExpressions(from_stage, false);
if (from_stage == QueryProcessingStage::WithMergeableState &&
to_stage == QueryProcessingStage::WithMergeableState)
options.to_stage == QueryProcessingStage::WithMergeableState)
throw Exception("Distributed on Distributed is not supported", ErrorCodes::NOT_IMPLEMENTED);
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
executeFetchColumns(from_stage, pipeline, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(options.to_stage));
}
if (to_stage > QueryProcessingStage::FetchColumns)
if (options.to_stage > QueryProcessingStage::FetchColumns)
{
/// Do I need to aggregate in a separate row rows that have not passed max_rows_to_group_by.
bool aggregate_overflow_row =
@ -575,7 +558,7 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
/// Do I need to immediately finalize the aggregate functions after the aggregation?
bool aggregate_final =
expressions.need_aggregate &&
to_stage > QueryProcessingStage::WithMergeableState &&
options.to_stage > QueryProcessingStage::WithMergeableState &&
!query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube;
if (expressions.first_stage)
@ -938,7 +921,7 @@ void InterpreterSelectQuery::executeFetchColumns(
/// Limitation on the number of columns to read.
/// It's not applied in 'only_analyze' mode, because the query could be analyzed without removal of unnecessary columns.
if (!only_analyze && settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read)
if (!options.only_analyze && settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read)
throw Exception("Limit for number of columns to read exceeded. "
"Requested: " + toString(required_columns.size())
+ ", maximum: " + settings.max_columns_to_read.toString(),
@ -1000,7 +983,8 @@ void InterpreterSelectQuery::executeFetchColumns(
throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR);
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
subquery, getSubqueryContext(context), required_columns, QueryProcessingStage::Complete, subquery_depth + 1, only_analyze);
subquery, getSubqueryContext(context),
options.copy().subquery().noModify(), required_columns);
if (query_analyzer->hasAggregation())
interpreter_subquery->ignoreWithTotals();
@ -1057,7 +1041,7 @@ void InterpreterSelectQuery::executeFetchColumns(
* additionally on each remote server, because these limits are checked per block of data processed,
* and remote servers may process way more blocks of data than are received by initiator.
*/
if (to_stage == QueryProcessingStage::Complete)
if (options.to_stage == QueryProcessingStage::Complete)
{
limits.min_execution_speed = settings.min_execution_speed;
limits.max_execution_speed = settings.max_execution_speed;
@ -1072,7 +1056,7 @@ void InterpreterSelectQuery::executeFetchColumns(
{
stream->setLimits(limits);
if (to_stage == QueryProcessingStage::Complete)
if (options.to_stage == QueryProcessingStage::Complete)
stream->setQuota(quota);
});
}

View File

@ -3,12 +3,13 @@
#include <memory>
#include <Core/QueryProcessingStage.h>
#include <Parsers/ASTSelectQuery.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IInterpreter.h>
#include <Parsers/ASTSelectQuery.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Storages/SelectQueryInfo.h>
@ -23,6 +24,7 @@ class InterpreterSelectWithUnionQuery;
struct SyntaxAnalyzerResult;
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
/** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage.
*/
class InterpreterSelectQuery : public IInterpreter
@ -32,14 +34,6 @@ public:
* query_ptr
* - A query AST to interpret.
*
* to_stage
* - the stage to which the query is to be executed. By default - till to the end.
* You can perform till the intermediate aggregation state, which are combined from different servers for distributed query processing.
*
* subquery_depth
* - to control the limit on the depth of nesting of subqueries. For subqueries, a value that is incremented by one is passed;
* for INSERT SELECT, a value 1 is passed instead of 0.
*
* required_result_column_names
* - don't calculate all columns except the specified ones from the query
* - it is used to remove calculation (and reading) of unnecessary columns from subqueries.
@ -49,29 +43,22 @@ public:
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const Context & context_,
const Names & required_result_column_names = Names{},
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
bool only_analyze_ = false,
bool modify_inplace = false);
const SelectQueryOptions &,
const Names & required_result_column_names = Names{});
/// Read data not from the table specified in the query, but from the prepared source `input`.
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const Context & context_,
const BlockInputStreamPtr & input_,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
bool only_analyze_ = false,
bool modify_inplace = false);
const SelectQueryOptions & = {});
/// Read data not from the table specified in the query, but from the specified `storage_`.
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const Context & context_,
const StoragePtr & storage_,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
bool only_analyze_ = false,
bool modify_inplace = false);
const SelectQueryOptions & = {});
~InterpreterSelectQuery() override;
@ -93,11 +80,8 @@ private:
const Context & context_,
const BlockInputStreamPtr & input_,
const StoragePtr & storage_,
const Names & required_result_column_names,
QueryProcessingStage::Enum to_stage_,
size_t subquery_depth_,
bool only_analyze_,
bool modify_inplace);
const SelectQueryOptions &,
const Names & required_result_column_names = {});
ASTSelectQuery & getSelectQuery() { return query_ptr->as<ASTSelectQuery &>(); }
@ -223,10 +207,9 @@ private:
*/
void initSettings();
const SelectQueryOptions options;
ASTPtr query_ptr;
Context context;
QueryProcessingStage::Enum to_stage;
size_t subquery_depth = 0;
NamesAndTypesList source_columns;
SyntaxAnalyzerResultPtr syntax_analyzer_result;
std::unique_ptr<ExpressionAnalyzer> query_analyzer;
@ -234,9 +217,6 @@ private:
/// How many streams we ask for storage to produce, and in how many threads we will do further processing.
size_t max_streams = 1;
/// The object was created only for query analysis.
bool only_analyze = false;
/// List of columns to read to execute the query.
Names required_columns;
/// Structure of query source (table, subquery, etc).

View File

@ -26,15 +26,11 @@ namespace ErrorCodes
InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
const ASTPtr & query_ptr_,
const Context & context_,
const Names & required_result_column_names,
QueryProcessingStage::Enum to_stage_,
size_t subquery_depth_,
bool only_analyze,
bool modify_inplace)
: query_ptr(query_ptr_),
context(context_),
to_stage(to_stage_),
subquery_depth(subquery_depth_)
const SelectQueryOptions & options_,
const Names & required_result_column_names)
: options(options_),
query_ptr(query_ptr_),
context(context_)
{
const auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>();
@ -57,7 +53,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
/// We use it to determine positions of 'required_result_column_names' in SELECT clause.
Block full_result_header = InterpreterSelectQuery(
ast.list_of_selects->children.at(0), context, Names(), to_stage, subquery_depth, true).getSampleBlock();
ast.list_of_selects->children.at(0), context, options.copy().analyze().noModify()).getSampleBlock();
std::vector<size_t> positions_of_required_result_columns(required_result_column_names.size());
for (size_t required_result_num = 0, size = required_result_column_names.size(); required_result_num < size; ++required_result_num)
@ -66,7 +62,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
for (size_t query_num = 1; query_num < num_selects; ++query_num)
{
Block full_result_header_for_current_select = InterpreterSelectQuery(
ast.list_of_selects->children.at(query_num), context, Names(), to_stage, subquery_depth, true).getSampleBlock();
ast.list_of_selects->children.at(query_num), context, options.copy().analyze().noModify()).getSampleBlock();
if (full_result_header_for_current_select.columns() != full_result_header.columns())
throw Exception("Different number of columns in UNION ALL elements:\n"
@ -89,11 +85,8 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
nested_interpreters.emplace_back(std::make_unique<InterpreterSelectQuery>(
ast.list_of_selects->children.at(query_num),
context,
current_required_result_column_names,
to_stage,
subquery_depth,
only_analyze,
modify_inplace));
options,
current_required_result_column_names));
}
/// Determine structure of the result.
@ -179,7 +172,7 @@ Block InterpreterSelectWithUnionQuery::getSampleBlock(
return cache[key];
}
return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, {}, QueryProcessingStage::Complete, 0, true).getSampleBlock();
return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, SelectQueryOptions().analyze()).getSampleBlock();
}

View File

@ -3,6 +3,7 @@
#include <Core/QueryProcessingStage.h>
#include <Interpreters/Context.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/SelectQueryOptions.h>
namespace DB
@ -19,11 +20,8 @@ public:
InterpreterSelectWithUnionQuery(
const ASTPtr & query_ptr_,
const Context & context_,
const Names & required_result_column_names = Names{},
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
bool only_analyze = false,
bool modify_inplace = false);
const SelectQueryOptions &,
const Names & required_result_column_names = {});
~InterpreterSelectWithUnionQuery() override;
@ -43,10 +41,9 @@ public:
ASTPtr getQuery() const { return query_ptr; }
private:
const SelectQueryOptions options;
ASTPtr query_ptr;
Context context;
QueryProcessingStage::Enum to_stage;
size_t subquery_depth;
std::vector<std::unique_ptr<InterpreterSelectQuery>> nested_interpreters;

View File

@ -120,56 +120,6 @@ Join::Type Join::chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_siz
}
template <typename Maps>
static void initImpl(Maps & maps, Join::Type type)
{
switch (type)
{
case Join::Type::EMPTY: break;
case Join::Type::CROSS: break;
#define M(TYPE) \
case Join::Type::TYPE: maps.TYPE = std::make_unique<typename decltype(maps.TYPE)::element_type>(); break;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
}
template <typename Maps>
static size_t getTotalRowCountImpl(const Maps & maps, Join::Type type)
{
switch (type)
{
case Join::Type::EMPTY: return 0;
case Join::Type::CROSS: return 0;
#define M(NAME) \
case Join::Type::NAME: return maps.NAME ? maps.NAME->size() : 0;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
template <typename Maps>
static size_t getTotalByteCountImpl(const Maps & maps, Join::Type type)
{
switch (type)
{
case Join::Type::EMPTY: return 0;
case Join::Type::CROSS: return 0;
#define M(NAME) \
case Join::Type::NAME: return maps.NAME ? maps.NAME->getBufferSizeInBytes() : 0;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
template <Join::Type type, typename Value, typename Mapped>
struct KeyGetterForTypeImpl;
@ -227,7 +177,7 @@ void Join::init(Type type_)
if (kind == ASTTableJoin::Kind::Cross)
return;
dispatch(MapInitTag());
dispatch([&](auto, auto, auto & map) { initImpl(map, type); });
dispatch([&](auto, auto, auto & map) { map.create(type); });
}
size_t Join::getTotalRowCount() const
@ -241,7 +191,7 @@ size_t Join::getTotalRowCount() const
}
else
{
dispatch([&](auto, auto, auto & map) { res += getTotalRowCountImpl(map, type); });
dispatch([&](auto, auto, auto & map) { res += map.getTotalRowCount(type); });
}
return res;
@ -258,7 +208,7 @@ size_t Join::getTotalByteCount() const
}
else
{
dispatch([&](auto, auto, auto & map) { res += getTotalByteCountImpl(map, type); });
dispatch([&](auto, auto, auto & map) { res += map.getTotalByteCountImpl(type); });
res += pool.size();
}
@ -526,160 +476,178 @@ bool Join::insertFromBlock(const Block & block)
namespace
{
template <bool fill_left, ASTTableJoin::Strictness STRICTNESS, typename Map>
struct Adder;
template <typename Map>
struct Adder<true, ASTTableJoin::Strictness::Any, Map>
class AddedColumns
{
public:
using TypeAndNames = std::vector<std::pair<decltype(ColumnWithTypeAndName::type), decltype(ColumnWithTypeAndName::name)>>;
AddedColumns(const Block & sample_block_with_columns_to_add,
const Block & block_with_columns_to_add,
const Block & block, size_t num_columns_to_skip)
{
static void addFound(const typename Map::mapped_type & mapped, size_t num_columns_to_add, MutableColumns & added_columns,
size_t i, IColumn::Filter & filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/,
const std::vector<size_t> & right_indexes)
size_t num_columns_to_add = sample_block_with_columns_to_add.columns();
columns.reserve(num_columns_to_add);
type_name.reserve(num_columns_to_add);
right_indexes.reserve(num_columns_to_add);
for (size_t i = 0; i < num_columns_to_add; ++i)
{
filter[i] = 1;
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.safeGetByPosition(i);
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*mapped.block->getByPosition(right_indexes[j]).column, mapped.row_num);
}
static void addNotFound(size_t num_columns_to_add, MutableColumns & added_columns,
size_t i, IColumn::Filter & filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/)
{
filter[i] = 0;
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertDefault();
}
};
template <typename Map>
struct Adder<false, ASTTableJoin::Strictness::Any, Map>
{
static void addFound(const typename Map::mapped_type & mapped, size_t num_columns_to_add, MutableColumns & added_columns,
size_t i, IColumn::Filter & filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/,
const std::vector<size_t> & right_indexes)
{
filter[i] = 1;
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*mapped.block->getByPosition(right_indexes[j]).column, mapped.row_num);
}
static void addNotFound(size_t /*num_columns_to_add*/, MutableColumns & /*added_columns*/,
size_t i, IColumn::Filter & filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/)
{
filter[i] = 0;
}
};
template <bool fill_left, typename Map>
struct Adder<fill_left, ASTTableJoin::Strictness::All, Map>
{
static void addFound(const typename Map::mapped_type & mapped, size_t num_columns_to_add, MutableColumns & added_columns,
size_t i, IColumn::Filter & filter, IColumn::Offset & current_offset, IColumn::Offsets * offsets,
const std::vector<size_t> & right_indexes)
{
filter[i] = 1;
size_t rows_joined = 0;
for (auto current = &static_cast<const typename Map::mapped_type::Base_t &>(mapped); current != nullptr; current = current->next)
{
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*current->block->getByPosition(right_indexes[j]).column.get(), current->row_num);
++rows_joined;
}
current_offset += rows_joined;
(*offsets)[i] = current_offset;
}
static void addNotFound(size_t num_columns_to_add, MutableColumns & added_columns,
size_t i, IColumn::Filter & filter, IColumn::Offset & current_offset, IColumn::Offsets * offsets)
{
filter[i] = 0;
if (!fill_left)
{
(*offsets)[i] = current_offset;
}
else
{
++current_offset;
(*offsets)[i] = current_offset;
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertDefault();
}
}
};
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
void NO_INLINE joinBlockImplTypeCase(
const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
MutableColumns & added_columns, ConstNullMapPtr null_map, IColumn::Filter & filter,
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate,
const std::vector<size_t> & right_indexes)
{
IColumn::Offset current_offset = 0;
size_t num_columns_to_add = right_indexes.size();
Arena pool;
KeyGetter key_getter(key_columns, key_sizes, nullptr);
for (size_t i = 0; i < rows; ++i)
{
if (has_null_map && (*null_map)[i])
{
Adder<Join::KindTrait<KIND>::fill_left, STRICTNESS, Map>::addNotFound(
num_columns_to_add, added_columns, i, filter, current_offset, offsets_to_replicate.get());
}
else
{
auto find_result = key_getter.findKey(map, i, pool);
if (find_result.isFound())
{
auto & mapped = find_result.getMapped();
mapped.setUsed();
Adder<Join::KindTrait<KIND>::fill_left, STRICTNESS, Map>::addFound(
mapped, num_columns_to_add, added_columns, i, filter, current_offset, offsets_to_replicate.get(), right_indexes);
}
else
Adder<Join::KindTrait<KIND>::fill_left, STRICTNESS, Map>::addNotFound(
num_columns_to_add, added_columns, i, filter, current_offset, offsets_to_replicate.get());
}
/// Don't insert column if it's in left block or not explicitly required.
if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name))
addColumn(src_column, num_columns_to_skip + i);
}
}
using BlockFilterData = std::pair<
std::unique_ptr<IColumn::Filter>,
std::unique_ptr<IColumn::Offsets>>;
size_t size() const { return columns.size(); }
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
BlockFilterData joinBlockImplType(
const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
MutableColumns & added_columns, ConstNullMapPtr null_map, const std::vector<size_t> & right_indexes)
ColumnWithTypeAndName moveColumn(size_t i)
{
std::unique_ptr<IColumn::Filter> filter = std::make_unique<IColumn::Filter>(rows);
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].first, type_name[i].second);
}
if (STRICTNESS == ASTTableJoin::Strictness::All)
offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows);
void appendFromBlock(const Block & block, size_t row_num)
{
for (size_t j = 0; j < right_indexes.size(); ++j)
columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num);
}
if (null_map)
joinBlockImplTypeCase<KIND, STRICTNESS, KeyGetter, Map, true>(
map, rows, key_columns, key_sizes, added_columns, null_map, *filter,
offsets_to_replicate, right_indexes);
else
joinBlockImplTypeCase<KIND, STRICTNESS, KeyGetter, Map, false>(
map, rows, key_columns, key_sizes, added_columns, null_map, *filter,
offsets_to_replicate, right_indexes);
void appendDefaultRow()
{
for (size_t j = 0; j < right_indexes.size(); ++j)
columns[j]->insertDefault();
}
return {std::move(filter), std::move(offsets_to_replicate)};
private:
TypeAndNames type_name;
MutableColumns columns;
std::vector<size_t> right_indexes;
void addColumn(const ColumnWithTypeAndName & src_column, size_t idx)
{
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(src_column.column->size());
type_name.emplace_back(src_column.type, src_column.name);
right_indexes.push_back(idx);
}
};
template <ASTTableJoin::Strictness STRICTNESS, typename Map>
void addFoundRow(const typename Map::mapped_type & mapped, AddedColumns & added, IColumn::Offset & current_offset [[maybe_unused]])
{
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any)
{
added.appendFromBlock(*mapped.block, mapped.row_num);
}
if constexpr (STRICTNESS == ASTTableJoin::Strictness::All)
{
for (auto current = &static_cast<const typename Map::mapped_type::Base_t &>(mapped); current != nullptr; current = current->next)
{
added.appendFromBlock(*current->block, current->row_num);
++current_offset;
}
}
};
template <bool _add_missing>
void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]])
{
if constexpr (_add_missing)
{
added.appendDefaultRow();
++current_offset;
}
}
/// Joins right table columns which indexes are present in right_indexes using specified map.
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
template <bool _add_missing, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool _has_null_map>
std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
const Map & map, size_t rows, KeyGetter & key_getter,
AddedColumns & added_columns, ConstNullMapPtr null_map, IColumn::Filter & filter)
{
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::All)
offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows);
IColumn::Offset current_offset = 0;
Arena pool;
for (size_t i = 0; i < rows; ++i)
{
if (_has_null_map && (*null_map)[i])
{
addNotFoundRow<_add_missing>(added_columns, current_offset);
}
else
{
auto find_result = key_getter.findKey(map, i, pool);
if (find_result.isFound())
{
filter[i] = 1;
auto & mapped = find_result.getMapped();
mapped.setUsed();
addFoundRow<STRICTNESS, Map>(mapped, added_columns, current_offset);
}
else
addNotFoundRow<_add_missing>(added_columns, current_offset);
}
if constexpr (STRICTNESS == ASTTableJoin::Strictness::All)
(*offsets_to_replicate)[i] = current_offset;
}
return offsets_to_replicate;
}
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
IColumn::Filter joinRightColumns(
const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
AddedColumns & added_columns, ConstNullMapPtr null_map, std::unique_ptr<IColumn::Offsets> & offsets_to_replicate)
{
constexpr bool left_or_full = static_in_v<KIND, ASTTableJoin::Kind::Left, ASTTableJoin::Kind::Full>;
IColumn::Filter filter(rows, 0);
KeyGetter key_getter(key_columns, key_sizes, nullptr);
if (null_map)
offsets_to_replicate = joinRightIndexedColumns<left_or_full, STRICTNESS, KeyGetter, Map, true>(
map, rows, key_getter, added_columns, null_map, filter);
else
offsets_to_replicate = joinRightIndexedColumns<left_or_full, STRICTNESS, KeyGetter, Map, false>(
map, rows, key_getter, added_columns, null_map, filter);
return filter;
}
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
IColumn::Filter switchJoinRightColumns(
Join::Type type,
const Maps & maps_, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
AddedColumns & added_columns, ConstNullMapPtr null_map,
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate)
{
switch (type)
{
#define M(TYPE) \
case Join::Type::TYPE: \
return joinRightColumns<KIND, STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, const std::remove_reference_t<decltype(*maps_.TYPE)>>::Type>(\
*maps_.TYPE, rows, key_columns, key_sizes, added_columns, null_map, offsets_to_replicate);
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
default:
throw Exception("Unknown JOIN keys variant.", ErrorCodes::UNKNOWN_SET_DATA_VARIANT);
}
}
} /// nameless
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
void Join::joinBlockImpl(
@ -714,7 +682,8 @@ void Join::joinBlockImpl(
* Because if they are constants, then in the "not joined" rows, they may have different values
* - default values, which can differ from the values of these constants.
*/
if (isRightOrFull(kind))
constexpr bool right_or_full = static_in_v<KIND, ASTTableJoin::Kind::Right, ASTTableJoin::Kind::Full>;
if constexpr (right_or_full)
{
for (size_t i = 0; i < existing_columns; ++i)
{
@ -734,68 +703,33 @@ void Join::joinBlockImpl(
* but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped.
*/
size_t num_columns_to_skip = 0;
if (isRightOrFull(kind))
if constexpr (right_or_full)
num_columns_to_skip = keys_size;
/// Add new columns to the block.
size_t num_columns_to_add = sample_block_with_columns_to_add.columns();
MutableColumns added_columns;
added_columns.reserve(num_columns_to_add);
std::vector<std::pair<decltype(ColumnWithTypeAndName::type), decltype(ColumnWithTypeAndName::name)>> added_type_name;
added_type_name.reserve(num_columns_to_add);
AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, num_columns_to_skip);
std::vector<size_t> right_indexes;
right_indexes.reserve(num_columns_to_add);
for (size_t i = 0; i < num_columns_to_add; ++i)
{
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.safeGetByPosition(i);
/// Don't insert column if it's in left block or not explicitly required.
if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name))
{
added_columns.push_back(src_column.column->cloneEmpty());
added_columns.back()->reserve(src_column.column->size());
added_type_name.emplace_back(src_column.type, src_column.name);
right_indexes.push_back(num_columns_to_skip + i);
}
}
std::unique_ptr<IColumn::Filter> filter;
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
switch (type)
{
#define M(TYPE) \
case Join::Type::TYPE: \
std::tie(filter, offsets_to_replicate) = \
joinBlockImplType<KIND, STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, const std::remove_reference_t<decltype(*maps_.TYPE)>>::Type>(\
*maps_.TYPE, block.rows(), key_columns, key_sizes, added_columns, null_map, right_indexes); \
break;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
IColumn::Filter filter = switchJoinRightColumns<KIND, STRICTNESS>(
type, maps_, block.rows(), key_columns, key_sizes, added, null_map, offsets_to_replicate);
default:
throw Exception("Unknown JOIN keys variant.", ErrorCodes::UNKNOWN_SET_DATA_VARIANT);
}
for (size_t i = 0; i < added.size(); ++i)
block.insert(added.moveColumn(i));
const auto added_columns_size = added_columns.size();
for (size_t i = 0; i < added_columns_size; ++i)
block.insert(ColumnWithTypeAndName(std::move(added_columns[i]), added_type_name[i].first, added_type_name[i].second));
if (!filter)
throw Exception("No data to filter columns", ErrorCodes::LOGICAL_ERROR);
/// Filter & insert missing rows
NameSet needed_key_names_right = requiredRightKeys(key_names_right, columns_added_by_join);
if (strictness == ASTTableJoin::Strictness::Any)
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any)
{
if (isInnerOrRight(kind))
constexpr bool inner_or_right = static_in_v<KIND, ASTTableJoin::Kind::Inner, ASTTableJoin::Kind::Right>;
if constexpr (inner_or_right)
{
/// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones.
for (size_t i = 0; i < existing_columns; ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1);
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, -1);
/// Add join key columns from right block if they has different name.
for (size_t i = 0; i < key_names_right.size(); ++i)
@ -821,13 +755,13 @@ void Join::joinBlockImpl(
if (needed_key_names_right.count(right_name) && !block.has(right_name))
{
const auto & col = block.getByName(left_name);
auto & column = col.column;
ColumnPtr column = col.column->convertToFullColumnIfConst();
MutableColumnPtr mut_column = column->cloneEmpty();
for (size_t col_no = 0; col_no < filter->size(); ++col_no)
for (size_t row = 0; row < filter.size(); ++row)
{
if ((*filter)[col_no])
mut_column->insertFrom(*column, col_no);
if (filter[row])
mut_column->insertFrom(*column, row);
else
mut_column->insertDefault();
}
@ -851,22 +785,22 @@ void Join::joinBlockImpl(
if (needed_key_names_right.count(right_name) && !block.has(right_name))
{
const auto & col = block.getByName(left_name);
auto & column = col.column;
ColumnPtr column = col.column->convertToFullColumnIfConst();
MutableColumnPtr mut_column = column->cloneEmpty();
size_t last_offset = 0;
for (size_t col_no = 0; col_no < column->size(); ++col_no)
for (size_t row = 0; row < column->size(); ++row)
{
if (size_t to_insert = (*offsets_to_replicate)[col_no] - last_offset)
if (size_t to_insert = (*offsets_to_replicate)[row] - last_offset)
{
if (!(*filter)[col_no])
if (!filter[row])
mut_column->insertDefault();
else
for (size_t dup = 0; dup < to_insert; ++dup)
mut_column->insertFrom(*column, col_no);
mut_column->insertFrom(*column, row);
}
last_offset = (*offsets_to_replicate)[col_no];
last_offset = (*offsets_to_replicate)[row];
}
block.insert({std::move(mut_column), col.type, right_name});

View File

@ -228,6 +228,52 @@ public:
std::unique_ptr<HashMap<UInt128, Mapped, UInt128HashCRC32>> keys128;
std::unique_ptr<HashMap<UInt256, Mapped, UInt256HashCRC32>> keys256;
std::unique_ptr<HashMap<UInt128, Mapped, UInt128TrivialHash>> hashed;
void create(Type which)
{
switch (which)
{
case Type::EMPTY: break;
case Type::CROSS: break;
#define M(NAME) \
case Type::NAME: NAME = std::make_unique<typename decltype(NAME)::element_type>(); break;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
}
size_t getTotalRowCount(Type which) const
{
switch (which)
{
case Type::EMPTY: return 0;
case Type::CROSS: return 0;
#define M(NAME) \
case Type::NAME: return NAME ? NAME->size() : 0;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
size_t getTotalByteCountImpl(Type which) const
{
switch (which)
{
case Type::EMPTY: return 0;
case Type::CROSS: return 0;
#define M(NAME) \
case Type::NAME: return NAME ? NAME->getBufferSizeInBytes() : 0;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
};
using MapsAny = MapsTemplate<WithFlags<false, false, RowRef>>;

View File

@ -72,7 +72,7 @@ bool MutationsInterpreter::isStorageTouchedByMutations() const
context_copy.getSettingsRef().merge_tree_uniform_read_distribution = 0;
context_copy.getSettingsRef().max_threads = 1;
BlockInputStreamPtr in = InterpreterSelectQuery(select, context_copy, storage, QueryProcessingStage::Complete).execute().in;
BlockInputStreamPtr in = InterpreterSelectQuery(select, context_copy, storage).execute().in;
Block block = in->read();
if (!block.rows())
@ -367,7 +367,7 @@ void MutationsInterpreter::prepare(bool dry_run)
select->children.push_back(where_expression);
}
interpreter_select = std::make_unique<InterpreterSelectQuery>(select, context, storage, QueryProcessingStage::Complete, dry_run);
interpreter_select = std::make_unique<InterpreterSelectQuery>(select, context, storage, SelectQueryOptions().analyze(dry_run));
is_prepared = true;
}

View File

@ -0,0 +1,76 @@
#pragma once
#include <Core/QueryProcessingStage.h>
namespace DB
{
/**
* to_stage
* - the stage to which the query is to be executed. By default - till to the end.
* You can perform till the intermediate aggregation state, which are combined from different servers for distributed query processing.
*
* subquery_depth
* - to control the limit on the depth of nesting of subqueries. For subqueries, a value that is incremented by one is passed;
* for INSERT SELECT, a value 1 is passed instead of 0.
*
* only_analyze
* - the object was created only for query analysis.
*
* is_subquery
* - there could be some specific for subqueries. Ex. there's no need to pass duplicated columns in results, cause of indirect results.
*/
struct SelectQueryOptions
{
QueryProcessingStage::Enum to_stage;
size_t subquery_depth;
bool only_analyze;
bool modify_inplace;
bool remove_duplicates;
SelectQueryOptions(QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, size_t depth = 0)
: to_stage(stage)
, subquery_depth(depth)
, only_analyze(false)
, modify_inplace(false)
, remove_duplicates(false)
{}
SelectQueryOptions copy() const { return *this; }
SelectQueryOptions subquery() const
{
SelectQueryOptions out = *this;
out.to_stage = QueryProcessingStage::Complete;
++out.subquery_depth;
return out;
}
SelectQueryOptions & analyze(bool value = true)
{
only_analyze = value;
return *this;
}
SelectQueryOptions & modify(bool value = true)
{
modify_inplace = value;
return *this;
}
SelectQueryOptions & noModify() { return modify(false); }
SelectQueryOptions & removeDuplicates(bool value = true)
{
remove_duplicates = value;
return *this;
}
SelectQueryOptions & noSubquery()
{
subquery_depth = 0;
return *this;
}
};
}

View File

@ -123,24 +123,69 @@ bool hasArrayJoin(const ASTPtr & ast)
return false;
}
/// Keep number of columns for 'GLOBAL IN (SELECT 1 AS a, a)'
void renameDuplicatedColumns(const ASTSelectQuery * select_query)
{
ASTs & elements = select_query->select_expression_list->children;
std::set<String> all_column_names;
std::set<String> assigned_column_names;
for (auto & expr : elements)
all_column_names.insert(expr->getAliasOrColumnName());
for (auto & expr : elements)
{
auto name = expr->getAliasOrColumnName();
if (!assigned_column_names.insert(name).second)
{
size_t i = 1;
while (all_column_names.end() != all_column_names.find(name + "_" + toString(i)))
++i;
name = name + "_" + toString(i);
expr = expr->clone(); /// Cancels fuse of the same expressions in the tree.
expr->setAlias(name);
all_column_names.insert(name);
assigned_column_names.insert(name);
}
}
}
/// Sometimes we have to calculate more columns in SELECT clause than will be returned from query.
/// This is the case when we have DISTINCT or arrayJoin: we require more columns in SELECT even if we need less columns in result.
void removeUnneededColumnsFromSelectClause(const ASTSelectQuery * select_query, const Names & required_result_columns)
/// Also we have to remove duplicates in case of GLOBAL subqueries. Their results are placed into tables so duplicates are inpossible.
void removeUnneededColumnsFromSelectClause(const ASTSelectQuery * select_query, const Names & required_result_columns, bool remove_dups)
{
if (required_result_columns.empty())
return;
ASTs & elements = select_query->select_expression_list->children;
std::map<String, size_t> required_columns_with_duplicate_count;
if (!required_result_columns.empty())
{
/// Some columns may be queried multiple times, like SELECT x, y, y FROM table.
for (const auto & name : required_result_columns)
{
if (remove_dups)
required_columns_with_duplicate_count[name] = 1;
else
++required_columns_with_duplicate_count[name];
}
}
else if (remove_dups)
{
/// Even if we have no requirements there could be duplicates cause of asterisks. SELECT *, t.*
for (const auto & elem : elements)
required_columns_with_duplicate_count.emplace(elem->getAliasOrColumnName(), 1);
}
else
return;
ASTs new_elements;
new_elements.reserve(elements.size());
/// Some columns may be queried multiple times, like SELECT x, y, y FROM table.
/// In that case we keep them exactly same number of times.
std::map<String, size_t> required_columns_with_duplicate_count;
for (const auto & name : required_result_columns)
++required_columns_with_duplicate_count[name];
for (const auto & elem : elements)
{
String name = elem->getAliasOrColumnName();
@ -645,6 +690,9 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
if (select_query)
{
if (remove_duplicates)
renameDuplicatedColumns(select_query);
if (const ASTTablesInSelectQueryElement * node = select_query->join())
{
if (settings.enable_optimize_predicate_expression)
@ -688,7 +736,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
/// Must be after 'normalizeTree' (after expanding aliases, for aliases not get lost)
/// and before 'executeScalarSubqueries', 'analyzeAggregation', etc. to avoid excessive calculations.
if (select_query)
removeUnneededColumnsFromSelectClause(select_query, required_result_columns);
removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates);
/// Executing scalar subqueries - replacing them with constant values.
executeScalarSubqueries(query, context, subquery_depth);

View File

@ -2,6 +2,7 @@
#include <Interpreters/AnalyzedJoin.h>
#include <Interpreters/Aliases.h>
#include <Interpreters/SelectQueryOptions.h>
namespace DB
{
@ -55,9 +56,10 @@ using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
class SyntaxAnalyzer
{
public:
SyntaxAnalyzer(const Context & context_, size_t subquery_depth_ = 0)
SyntaxAnalyzer(const Context & context_, const SelectQueryOptions & select_options = {})
: context(context_)
, subquery_depth(subquery_depth_)
, subquery_depth(select_options.subquery_depth)
, remove_duplicates(select_options.remove_duplicates)
{}
SyntaxAnalyzerResultPtr analyze(
@ -69,6 +71,7 @@ public:
private:
const Context & context;
size_t subquery_depth;
bool remove_duplicates;
};
}

View File

@ -41,6 +41,8 @@ std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
subquery_settings.extremes = 0;
subquery_context.setSettings(subquery_settings);
auto subquery_options = SelectQueryOptions(QueryProcessingStage::Complete, subquery_depth).subquery();
ASTPtr query;
if (table || function)
{
@ -83,48 +85,10 @@ std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
else
{
query = subquery->children.at(0);
/** Columns with the same name can be specified in a subquery. For example, SELECT x, x FROM t
* This is bad, because the result of such a query can not be saved to the table, because the table can not have the same name columns.
* Saving to the table is required for GLOBAL subqueries.
*
* To avoid this situation, we will rename the same columns.
*/
std::set<std::string> all_column_names;
std::set<std::string> assigned_column_names;
if (const auto * select_with_union = query->as<ASTSelectWithUnionQuery>())
{
if (const auto * select = select_with_union->list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
for (auto & expr : select->select_expression_list->children)
all_column_names.insert(expr->getAliasOrColumnName());
for (auto & expr : select->select_expression_list->children)
{
auto name = expr->getAliasOrColumnName();
if (!assigned_column_names.insert(name).second)
{
size_t i = 1;
while (all_column_names.end() != all_column_names.find(name + "_" + toString(i)))
++i;
name = name + "_" + toString(i);
expr = expr->clone(); /// Cancels fuse of the same expressions in the tree.
expr->setAlias(name);
all_column_names.insert(name);
assigned_column_names.insert(name);
}
}
}
}
subquery_options.removeDuplicates();
}
return std::make_shared<InterpreterSelectWithUnionQuery>(
query, subquery_context, required_source_columns, QueryProcessingStage::Complete, subquery_depth + 1);
return std::make_shared<InterpreterSelectWithUnionQuery>(query, subquery_context, subquery_options, required_source_columns);
}
}

View File

@ -205,7 +205,7 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
}
else if (type == MODIFY_ORDER_BY)
{
if (!primary_key_ast)
if (!primary_key_ast && order_by_ast)
{
/// Primary and sorting key become independent after this ALTER so we have to
/// save the old ORDER BY expression as the new primary key.

View File

@ -25,6 +25,8 @@
#include <Poco/Ext/ThreadNumber.h>
#include <ext/range.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
namespace ProfileEvents
@ -221,7 +223,21 @@ BlockInputStreams StorageBuffer::read(
*/
if (processed_stage > QueryProcessingStage::FetchColumns)
for (auto & stream : streams_from_buffers)
stream = InterpreterSelectQuery(query_info.query, context, stream, processed_stage).execute().in;
stream = InterpreterSelectQuery(query_info.query, context, stream, SelectQueryOptions(processed_stage)).execute().in;
if (query_info.prewhere_info)
{
for (auto & stream : streams_from_buffers)
stream = std::make_shared<FilterBlockInputStream>(stream, query_info.prewhere_info->prewhere_actions,
query_info.prewhere_info->prewhere_column_name, query_info.prewhere_info->remove_prewhere_column);
if (query_info.prewhere_info->alias_actions)
{
for (auto & stream : streams_from_buffers)
stream = std::make_shared<ExpressionBlockInputStream>(stream, query_info.prewhere_info->alias_actions);
}
}
streams_from_dst.insert(streams_from_dst.end(), streams_from_buffers.begin(), streams_from_buffers.end());
return streams_from_dst;

View File

@ -74,7 +74,15 @@ public:
void rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & new_table_name) override { name = new_table_name; }
bool supportsSampling() const override { return true; }
bool supportsPrewhere() const override { return false; }
bool supportsPrewhere() const override
{
if (no_destination)
return false;
auto dest = global_context.tryGetTable(destination_database, destination_table);
if (dest && dest.get() != this)
return dest->supportsPrewhere();
return false;
}
bool supportsFinal() const override { return true; }
bool supportsIndexForIn() const override { return true; }

View File

@ -286,7 +286,8 @@ BlockInputStreams StorageDistributed::read(
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr);
Block header = materializeBlock(InterpreterSelectQuery(query_info.query, context, Names{}, processed_stage).getSampleBlock());
Block header = materializeBlock(
InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage)).getSampleBlock());
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr
? ClusterProxy::SelectStreamFactory(

View File

@ -274,7 +274,7 @@ BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & quer
if (!storage)
return BlockInputStreams{
InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared<OneBlockInputStream>(header),
processed_stage, true).execute().in};
SelectQueryOptions(processed_stage).analyze()).execute().in};
BlockInputStreams source_streams;
@ -295,7 +295,7 @@ BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & quer
modified_context.getSettingsRef().max_threads = UInt64(streams_num);
modified_context.getSettingsRef().max_streams_to_max_threads_ratio = 1;
InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, Names{}, processed_stage};
InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, SelectQueryOptions(processed_stage)};
BlockInputStreamPtr interpreter_stream = interpreter.execute().in;
/** Materialization is needed, since from distributed storage the constants come materialized.
@ -429,7 +429,7 @@ Block StorageMerge::getQueryHeader(
case QueryProcessingStage::Complete:
return materializeBlock(InterpreterSelectQuery(
query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)),
processed_stage, true).getSampleBlock());
SelectQueryOptions(processed_stage).analyze()).getSampleBlock());
}
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -63,7 +63,7 @@ BlockInputStreams StorageView::read(
current_inner_query = new_inner_query;
}
res = InterpreterSelectWithUnionQuery(current_inner_query, context, column_names).executeWithMultipleStreams();
res = InterpreterSelectWithUnionQuery(current_inner_query, context, {}, column_names).executeWithMultipleStreams();
/// It's expected that the columns read from storage are not constant.
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.

View File

@ -1,2 +1,7 @@
1
0
0
0 0
0
0 0
0 0

View File

@ -35,49 +35,49 @@ GLOBAL INNER JOIN
) USING dummy;
-- SET asterisk_left_columns_only = 0;
--
-- SELECT * FROM remote('127.0.0.2', system.one)
-- GLOBAL INNER JOIN
-- (
-- SELECT *, dummy
-- FROM ( SELECT dummy FROM remote('127.0.0.2', system.one) ) t1
-- GLOBAL INNER JOIN ( SELECT dummy FROM remote('127.0.0.3', system.one) ) t2
-- USING dummy
-- ) USING dummy;
--
-- SELECT * FROM remote('127.0.0.2', system.one)
-- GLOBAL INNER JOIN
-- (
-- SELECT *, t1.*, t2.*
-- FROM ( SELECT toUInt8(1) AS dummy ) t1
-- INNER JOIN ( SELECT toUInt8(1) AS dummy ) t2
-- USING dummy
-- ) USING dummy;
--
-- SELECT * FROM remote('127.0.0.2', system.one)
-- GLOBAL INNER JOIN
-- (
-- SELECT *, dummy
-- FROM ( SELECT toUInt8(1) AS dummy ) t1
-- INNER JOIN ( SELECT toUInt8(1) AS dummy ) t2
-- USING dummy
-- ) USING dummy;
--
-- SELECT * FROM remote('127.0.0.2', system.one)
-- GLOBAL INNER JOIN
-- (
-- SELECT *
-- FROM ( SELECT dummy FROM remote('127.0.0.3', system.one) ) t1
-- GLOBAL INNER JOIN ( SELECT toUInt8(1) AS dummy ) t2
-- USING dummy
-- ) USING dummy;
--
-- SELECT * FROM remote('127.0.0.2', system.one)
-- GLOBAL INNER JOIN
-- (
-- SELECT *
-- FROM ( SELECT toUInt8(1) AS dummy ) t1
-- GLOBAL INNER JOIN ( SELECT dummy FROM remote('127.0.0.3', system.one) ) t2
-- USING dummy
-- ) USING dummy;
SET asterisk_left_columns_only = 0;
SELECT * FROM remote('127.0.0.2', system.one)
GLOBAL INNER JOIN
(
SELECT *, dummy
FROM ( SELECT dummy FROM remote('127.0.0.2', system.one) ) t1
GLOBAL INNER JOIN ( SELECT dummy FROM remote('127.0.0.3', system.one) ) t2
USING dummy
) USING dummy;
SELECT * FROM remote('127.0.0.2', system.one)
GLOBAL INNER JOIN
(
SELECT *, t1.*, t2.*
FROM ( SELECT toUInt8(0) AS dummy ) t1
INNER JOIN ( SELECT toUInt8(0) AS dummy ) t2
USING dummy
) USING dummy;
SELECT * FROM remote('127.0.0.2', system.one)
GLOBAL INNER JOIN
(
SELECT *, dummy
FROM ( SELECT toUInt8(0) AS dummy ) t1
INNER JOIN ( SELECT toUInt8(0) AS dummy ) t2
USING dummy
) USING dummy;
SELECT * FROM remote('127.0.0.2', system.one)
GLOBAL INNER JOIN
(
SELECT *, dummy as other
FROM ( SELECT dummy FROM remote('127.0.0.3', system.one) ) t1
GLOBAL INNER JOIN ( SELECT toUInt8(0) AS dummy ) t2
USING dummy
) USING dummy;
SELECT * FROM remote('127.0.0.2', system.one)
GLOBAL INNER JOIN
(
SELECT *, dummy, dummy as other
FROM ( SELECT toUInt8(0) AS dummy ) t1
GLOBAL INNER JOIN ( SELECT dummy FROM remote('127.0.0.3', system.one) ) t2
USING dummy
) USING dummy;

View File

@ -0,0 +1 @@
1

View File

@ -0,0 +1,7 @@
DROP DATABASE IF EXISTS test_buffer;
CREATE DATABASE test_buffer;
CREATE TABLE test_buffer.mt (uid UInt64, ts DateTime, val Float64) ENGINE = MergeTree PARTITION BY toDate(ts) ORDER BY (uid, ts);
CREATE TABLE test_buffer.buf as test_buffer.mt ENGINE = Buffer(test_buffer, mt, 2, 10, 60, 10000, 100000, 1000000, 10000000);
INSERT INTO test_buffer.buf VALUES (1, '2019-03-01 10:00:00', 0.5), (2, '2019-03-02 10:00:00', 0.15), (1, '2019-03-03 10:00:00', 0.25);
SELECT count() from test_buffer.buf prewhere ts > toDateTime('2019-03-01 12:00:00') and ts < toDateTime('2019-03-02 12:00:00');
DROP DATABASE test_buffer;

View File

@ -0,0 +1,7 @@
DROP TABLE IF EXISTS test.union1;
DROP TABLE IF EXISTS test.union2;
CREATE TABLE test.union1 ( date Date, a Int32, b Int32, c Int32, d Int32) ENGINE = MergeTree(date, (a, date), 8192);
CREATE TABLE test.union2 ( date Date, a Int32, b Int32, c Int32, d Int32) ENGINE = Distributed(test_shard_localhost, 'test', 'union1');
ALTER TABLE test.union2 MODIFY ORDER BY a; -- { serverError 48 }
DROP TABLE test.union1;
DROP TABLE test.union2;

View File

@ -3,3 +3,18 @@
1
1
1 1
1 0
1 0
1 0
1 1
1 0
1 0
1 0
1 1
0 2
0 2
0 2
1 1
0 2
0 2
0 2

View File

@ -5,4 +5,25 @@ SELECT * FROM (SELECT 1 AS x) AS t1 ALL LEFT JOIN (SELECT 1 AS x) AS t2 USING x;
SELECT * FROM (SELECT 1 AS x) AS t1 ALL LEFT JOIN (SELECT 2 AS x) AS t2 USING x;
SELECT * FROM (SELECT 1 AS x) AS t1 ALL LEFT JOIN (SELECT 1 AS x) AS t2 ON t1.x = t2.x;
-- (bug) SELECT * FROM (SELECT 1 AS x) AS t1 ALL LEFT JOIN (SELECT 2 AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT 1 AS x) AS t1 ALL LEFT JOIN (SELECT 2 AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT materialize(1) AS x) AS t1 ALL LEFT JOIN (SELECT 2 AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT 1 AS x) AS t1 ALL LEFT JOIN (SELECT materialize(2) AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT 1 AS x) AS t1 ANY LEFT JOIN (SELECT 1 AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT 1 AS x) AS t1 ANY LEFT JOIN (SELECT 2 AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT materialize(1) AS x) AS t1 ANY LEFT JOIN (SELECT 2 AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT 1 AS x) AS t1 ANY LEFT JOIN (SELECT materialize(2) AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT 1 AS x) AS t1 ALL RIGHT JOIN (SELECT 1 AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT 1 AS x) AS t1 ALL RIGHT JOIN (SELECT 2 AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT materialize(1) AS x) AS t1 ALL RIGHT JOIN (SELECT 2 AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT 1 AS x) AS t1 ALL RIGHT JOIN (SELECT materialize(2) AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT 1 AS x) AS t1 ANY RIGHT JOIN (SELECT 1 AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT 1 AS x) AS t1 ANY RIGHT JOIN (SELECT 2 AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT materialize(1) AS x) AS t1 ANY RIGHT JOIN (SELECT 2 AS x) AS t2 ON t1.x = t2.x;
SELECT * FROM (SELECT 1 AS x) AS t1 ANY RIGHT JOIN (SELECT materialize(2) AS x) AS t2 ON t1.x = t2.x;
-- SET join_use_nulls = 1;
-- SELECT * FROM (SELECT 1 AS x) AS t1 ALL LEFT JOIN (SELECT 2 AS x) AS t2 ON t1.x = t2.x;
-- SELECT * FROM (SELECT 1 AS x) AS t1 ALL RIGHT JOIN (SELECT 2 AS x) AS t2 ON t1.x = t2.x;

View File

@ -1,11 +0,0 @@
package config;
$default_host = "metrika";
$cfg{'metrika'} = {
fqdn => "",
method => "scpb",
incoming => "/repo/metrika/mini-dinstall/incoming/",
dinstall_runs => 0,
login => "@AUTHOR@"
};

View File

@ -119,11 +119,31 @@ SELECT arraySort((x, y) -> y, ['hello', 'world'], [2, 1]);
└────────────────────┘
```
Note that NULLs and NaNs go last (NaNs go before NULLs). For example:
``` sql
SELECT arraySort([1, nan, 2, NULL, 3, nan, 4, NULL])
```
```
┌─arraySort([1, nan, 2, NULL, 3, nan, 4, NULL])─┐
│ [1,2,3,4,nan,nan,NULL,NULL] │
└───────────────────────────────────────────────┘
```
### arrayReverseSort(\[func,\] arr1, ...)
Returns an array as result of sorting the elements of `arr1` in descending order. If the `func` function is specified, sorting order is determined by the result of the function `func` applied to the elements of array (arrays)
Note that NULLs and NaNs go last (NaNs go before NULLs). For example:
``` sql
SELECT arrayReverseSort([1, nan, 2, NULL, 3, nan, 4, NULL])
```
```
┌─arrayReverseSort([1, nan, 2, NULL, 3, nan, 4, NULL])─┐
│ [4,3,2,1,nan,nan,NULL,NULL] │
└──────────────────────────────────────────────────────┘
```

View File

@ -51,6 +51,8 @@ Groups of operators are listed in order of priority (the higher it is in the lis
`a BETWEEN b AND c` The same as `a >= b AND a <= c.`
`a NOT BETWEEN b AND c` The same as `a < b OR a > c.`
## Operators for Working With Data Sets
*See the section [IN operators](select.md#select-in-operators).*

View File

@ -61,6 +61,7 @@ SELECT
Вернуть первый элемент массива arr1, для которого функция func возвращает не 0.
### arrayFirstIndex(func, arr1, ...)
Вернуть индекс первого элемента массива arr1, для которого функция func возвращает не 0.
### arrayCumSum(\[func,\] arr1, ...)
@ -98,8 +99,31 @@ SELECT arraySort((x, y) -> y, ['hello', 'world'], [2, 1]);
└────────────────────┘
```
`NULL` и `NaN` будут последними в массиве (при этом `NaN` будет перед `NULL`). Например:
``` sql
SELECT arraySort([1, nan, 2, NULL, 3, nan, 4, NULL])
```
```
┌─arraySort([1, nan, 2, NULL, 3, nan, 4, NULL])─┐
│ [1,2,3,4,nan,nan,NULL,NULL] │
└───────────────────────────────────────────────┘
```
### arrayReverseSort(\[func,\] arr1, ...)
Возвращает отсортированный в нисходящем порядке массив `arr1`. Если задана функция `func`, то порядок сортировки определяется результатом применения функции `func` на элементы массива (массивов).
`NULL` и `NaN` будут последними в массиве (при этом `NaN` будет перед `NULL`). Например:
``` sql
SELECT arrayReverseSort([1, nan, 2, NULL, 3, nan, 4, NULL])
```
```
┌─arrayReverseSort([1, nan, 2, NULL, 3, nan, 4, NULL])─┐
│ [4,3,2,1,nan,nan,NULL,NULL] │
└──────────────────────────────────────────────────────┘
```
[Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/functions/higher_order_functions/) <!--hide-->

View File

@ -51,6 +51,8 @@
`a BETWEEN b AND c` - равнозначно `a >= b AND a <= c`
`a NOT BETWEEN b AND c` - равнозначно `a < b OR a > c`
## Операторы для работы с множествами
*Смотрите раздел [Операторы IN](select.md#select-in-operators).*