From bfff8ab71547b046a21e649fe8b6617493ed83f6 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 13 Jun 2014 06:05:05 +0400 Subject: [PATCH] dbms: JOINs: development [#METR-11370]. --- .../DB/Interpreters/ExpressionActions.h | 14 ++++- .../DB/Interpreters/ExpressionAnalyzer.h | 5 ++ dbms/src/Interpreters/ExpressionActions.cpp | 61 +++++++++++++++---- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 42 +++++++++++++ .../Interpreters/InterpreterSelectQuery.cpp | 1 + dbms/src/Interpreters/Join.cpp | 22 ++++--- 6 files changed, 126 insertions(+), 19 deletions(-) diff --git a/dbms/include/DB/Interpreters/ExpressionActions.h b/dbms/include/DB/Interpreters/ExpressionActions.h index 1913c497c7f..357150196d1 100644 --- a/dbms/include/DB/Interpreters/ExpressionActions.h +++ b/dbms/include/DB/Interpreters/ExpressionActions.h @@ -21,6 +21,8 @@ typedef std::vector NamesWithAliases; typedef std::unordered_set NameSet; typedef std::unordered_map NameToNameMap; +class Join; + /** Действие над блоком. */ @@ -67,7 +69,8 @@ public: NameSet array_joined_columns; /// Для JOIN - //JoinPtr join; + Join * join = nullptr; + NamesAndTypesList columns_added_by_join; /// Для PROJECT. NamesWithAliases projection; @@ -130,6 +133,15 @@ public: return a; } + static ExpressionAction ordinaryJoin(Join * join_, const NamesAndTypesList & columns_added_by_join_) + { + ExpressionAction a; + a.type = JOIN; + a.join = join_; + a.columns_added_by_join = columns_added_by_join_; + return a; + } + /// Какие столбцы нужны, чтобы выполнить это действие. /// Если этот Action еще не добавлен в ExpressionActions, возвращаемый список может быть неполным, потому что не учтены prerequisites. Names getNeededColumns() const; diff --git a/dbms/include/DB/Interpreters/ExpressionAnalyzer.h b/dbms/include/DB/Interpreters/ExpressionAnalyzer.h index 715965dd1f8..b4bd39f6d3f 100644 --- a/dbms/include/DB/Interpreters/ExpressionAnalyzer.h +++ b/dbms/include/DB/Interpreters/ExpressionAnalyzer.h @@ -71,6 +71,7 @@ public: /// До агрегации: bool appendArrayJoin(ExpressionActionsChain & chain, bool only_types); + bool appendJoin(ExpressionActionsChain & chain, bool only_types); bool appendWhere(ExpressionActionsChain & chain, bool only_types); bool appendGroupBy(ExpressionActionsChain & chain, bool only_types); void appendAggregateFunctionsArguments(ExpressionActionsChain & chain, bool only_types); @@ -136,6 +137,7 @@ private: std::unordered_map sets_with_subqueries; Joins joins; + NamesAndTypesList columns_added_by_join; typedef std::unordered_map Aliases; Aliases aliases; @@ -194,6 +196,8 @@ private: void getArrayJoinedColumnsImpl(ASTPtr ast); void addMultipleArrayJoinAction(ExpressionActions & actions); + void addJoinAction(ExpressionActions & actions); + struct ScopeStack; void getActionsImpl(ASTPtr ast, bool no_subqueries, bool only_consts, ScopeStack & actions_stack); @@ -210,6 +214,7 @@ private: /// Получить таблицу, из которой идет запрос StoragePtr getTable(); + /// columns - столбцы, присутствующие до начала преобразований. void initChain(ExpressionActionsChain & chain, NamesAndTypesList & columns); void assertSelect(); diff --git a/dbms/src/Interpreters/ExpressionActions.cpp b/dbms/src/Interpreters/ExpressionActions.cpp index 14ccbbeb0b3..07e0d1f6a1d 100644 --- a/dbms/src/Interpreters/ExpressionActions.cpp +++ b/dbms/src/Interpreters/ExpressionActions.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -28,8 +29,8 @@ Names ExpressionAction::getNeededColumns() const } ExpressionAction ExpressionAction::applyFunction(FunctionPtr function_, - const std::vector & argument_names_, - std::string result_name_) + const std::vector & argument_names_, + std::string result_name_) { if (result_name_ == "") { @@ -82,6 +83,8 @@ ExpressionActions::Actions ExpressionAction::getPrerequisites(Block & sample_blo void ExpressionAction::prepare(Block & sample_block) { + //std::cerr << "preparing: " << toString() << std::endl; + if (type == APPLY_FUNCTION) { if (sample_block.has(result_name)) @@ -144,6 +147,13 @@ void ExpressionAction::prepare(Block & sample_block) current.column = nullptr; } } + else if (type == JOIN) + { + for (const auto & col : columns_added_by_join) + sample_block.insert(ColumnWithNameAndType(col.second->createColumn(), col.second, col.first)); + + std::cerr << sample_block.dumpNames() << std::endl; + } else if (type == ADD_COLUMN) { if (sample_block.has(result_name)) @@ -162,6 +172,8 @@ void ExpressionAction::prepare(Block & sample_block) void ExpressionAction::execute(Block & block) const { + //std::cerr << "executing: " << toString() << std::endl; + if (type == REMOVE_COLUMN || type == COPY_COLUMN) if (!block.has(source_name)) throw Exception("Not found column '" + source_name + "'. There are columns: " + block.dumpNames(), ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK); @@ -242,10 +254,20 @@ void ExpressionAction::execute(Block & block) const break; } + case JOIN: + { + /// TODO Другие виды JOIN-ов. + join->anyLeftJoinBlock(block); + std::cerr << block.dumpStructure() << std::endl; + break; + } + case PROJECT: { Block new_block; + //std::cerr << block.dumpNames() << std::endl; + for (size_t i = 0; i < projection.size(); ++i) { const std::string & name = projection[i].first; @@ -284,24 +306,28 @@ std::string ExpressionAction::toString() const switch (type) { case ADD_COLUMN: - ss << "+" << result_name << "(" << result_type->getName() << ")" << "[" << added_column->getName() << "]"; + ss << "ADD " << result_name << " " << result_type->getName() << " " << added_column->getName(); break; + case REMOVE_COLUMN: - ss << "-" << source_name; + ss << "REMOVE " << source_name; break; + case COPY_COLUMN: - ss << result_name << "(" << result_type->getName() << ")" << "=" << source_name; + ss << "COPY " << result_name << " " << result_type->getName() << " = " << source_name; break; + case APPLY_FUNCTION: - ss << result_name << "(" << result_type->getName() << ")" << "= " << function->getName() << " ( "; + ss << "FUNCTION " << result_name << " " << result_type->getName() << " = " << function->getName() << "("; for (size_t i = 0; i < argument_names.size(); ++i) { if (i) - ss << " , "; + ss << ", "; ss << argument_names[i]; } - ss << " )"; + ss << ")"; break; + case ARRAY_JOIN: ss << "ARRAY JOIN "; for (NameSet::const_iterator it = array_joined_columns.begin(); it != array_joined_columns.end(); ++it) @@ -311,18 +337,29 @@ std::string ExpressionAction::toString() const ss << *it; } break; + + case JOIN: + ss << "JOIN "; + for (NamesAndTypesList::const_iterator it = columns_added_by_join.begin(); it != columns_added_by_join.end(); ++it) + { + if (it != columns_added_by_join.begin()) + ss << ", "; + ss << it->first; + } + break; + case PROJECT: - ss << "{"; + ss << "PROJECT "; for (size_t i = 0; i < projection.size(); ++i) { if (i) ss << ", "; ss << projection[i].first; if (projection[i].second != "" && projection[i].second != projection[i].first) - ss << "=>" << projection[i].second; + ss << " AS " << projection[i].second; } - ss << "}"; break; + default: throw Exception("Unexpected Action type", ErrorCodes::LOGICAL_ERROR); } @@ -628,6 +665,8 @@ std::string ExpressionActions::getID() const } ss << "}"; } + + /// TODO JOIN } ss << ": {"; diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index a4f63dbfb6d..b5b2056233f 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -101,6 +101,12 @@ void ExpressionAnalyzer::init() addMultipleArrayJoinAction(temp_actions); } + if (select_query && select_query->join) + { + getRootActionsImpl(dynamic_cast(*select_query->join).using_expr_list, true, false, temp_actions); + addJoinAction(temp_actions); + } + getAggregatesImpl(ast, temp_actions); if (has_aggregation) @@ -1229,6 +1235,28 @@ bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool on return true; } +void ExpressionAnalyzer::addJoinAction(ExpressionActions & actions) +{ + actions.add(ExpressionAction::ordinaryJoin(joins[0], columns_added_by_join)); +} + +bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_types) +{ + assertSelect(); + + if (!select_query->join) + return false; + + initChain(chain, columns); + ExpressionActionsChain::Step & step = chain.steps.back(); + + getRootActionsImpl(dynamic_cast(*select_query->join).using_expr_list, only_types, false, *step.actions); + + addJoinAction(*step.actions); + + return true; +} + bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_types) { assertSelect(); @@ -1576,6 +1604,8 @@ void ExpressionAnalyzer::findJoins(NameSet & required_columns) String name = join_keys_expr_list.children[i]->getColumnName(); join_key_names[i] = name; + std::cerr << "USING " << name << std::endl; + if (!join_key_names_set.insert(name).second) throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN); } @@ -1606,11 +1636,23 @@ void ExpressionAnalyzer::findJoins(NameSet & required_columns) joins.push_back(join); + std::cerr << right_table_sample.dumpNames() << std::endl; + + for (const auto & x : required_columns) + std::cerr << "Required column: " << x << std::endl; + /// Удаляем из required_columns столбцы, которые есть в подзапросе, но нет в USING-е. for (NameSet::iterator it = required_columns.begin(); it != required_columns.end();) { if (right_table_sample.has(*it) && !join_key_names_set.count(*it)) + { + ColumnWithNameAndType & added_col = right_table_sample.getByName(*it); + columns_added_by_join.emplace_back(added_col.name, added_col.type); + + std::cerr << "Column added by JOIN: " << *it << std::endl; + required_columns.erase(it++); + } else ++it; } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 3c1633a4305..c5ebf2db81a 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -243,6 +243,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() need_aggregate = query_analyzer->hasAggregation(); query_analyzer->appendArrayJoin(chain, !first_stage); + query_analyzer->appendJoin(chain, !first_stage); if (query_analyzer->appendWhere(chain, !first_stage)) { diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index c46f7132066..c782257203e 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -6,7 +6,7 @@ namespace DB { - + size_t Join::getTotalRowCount() const { size_t rows = 0; @@ -32,8 +32,8 @@ size_t Join::getTotalByteCount() const bytes += pool.size(); return bytes; } - - + + bool Join::checkSizeLimits() const { if (max_rows && getTotalRowCount() > max_rows) @@ -42,7 +42,7 @@ bool Join::checkSizeLimits() const return false; return true; } - + bool Join::checkExternalSizeLimits() const { @@ -104,8 +104,12 @@ bool Join::insertFromBlock(const Block & block) init(Set::chooseMethod(key_columns, keys_fit_128_bits, key_sizes)); blocks.push_back(block); - const Block * stored_block = &blocks.back(); - /// TODO Удалить из stored_block ключевые столбцы, так как они не нужны. + Block * stored_block = &blocks.back(); + + /// Удаляем из stored_block ключевые столбцы, так как они не нужны. + for (const auto & name : key_names) + stored_block->erase(stored_block->getPositionByName(name)); + if (type == Set::KEY_64) { @@ -225,6 +229,8 @@ void Join::anyLeftJoinBlock(Block & block) if (blocks.empty()) throw Exception("Attempt to JOIN with empty table", ErrorCodes::EMPTY_DATA_PASSED); + std::cerr << "!!! " << block.dumpNames() << std::endl; + size_t keys_size = key_names.size(); ConstColumnPlainPtrs key_columns(keys_size); @@ -251,7 +257,9 @@ void Join::anyLeftJoinBlock(Block & block) added_columns[i]->reserve(src_column.column->size()); } - size_t rows = block.rows(); + std::cerr << "??? " << block.dumpNames() << std::endl; + + size_t rows = block.rowsInFirstColumn(); if (type == Set::KEY_64) {