dbms: JOINs: development [#METR-11370].

This commit is contained in:
Alexey Milovidov 2014-06-13 06:05:05 +04:00
parent 1ba07d65ad
commit bfff8ab715
6 changed files with 126 additions and 19 deletions

View File

@ -21,6 +21,8 @@ typedef std::vector<NameWithAlias> NamesWithAliases;
typedef std::unordered_set<String> NameSet;
typedef std::unordered_map<String, String> NameToNameMap;
class Join;
/** Действие над блоком.
*/
@ -67,7 +69,8 @@ public:
NameSet array_joined_columns;
/// Для JOIN
//JoinPtr join;
Join * join = nullptr;
NamesAndTypesList columns_added_by_join;
/// Для PROJECT.
NamesWithAliases projection;
@ -130,6 +133,15 @@ public:
return a;
}
static ExpressionAction ordinaryJoin(Join * join_, const NamesAndTypesList & columns_added_by_join_)
{
ExpressionAction a;
a.type = JOIN;
a.join = join_;
a.columns_added_by_join = columns_added_by_join_;
return a;
}
/// Какие столбцы нужны, чтобы выполнить это действие.
/// Если этот Action еще не добавлен в ExpressionActions, возвращаемый список может быть неполным, потому что не учтены prerequisites.
Names getNeededColumns() const;

View File

@ -71,6 +71,7 @@ public:
/// До агрегации:
bool appendArrayJoin(ExpressionActionsChain & chain, bool only_types);
bool appendJoin(ExpressionActionsChain & chain, bool only_types);
bool appendWhere(ExpressionActionsChain & chain, bool only_types);
bool appendGroupBy(ExpressionActionsChain & chain, bool only_types);
void appendAggregateFunctionsArguments(ExpressionActionsChain & chain, bool only_types);
@ -136,6 +137,7 @@ private:
std::unordered_map<String, SetPtr> sets_with_subqueries;
Joins joins;
NamesAndTypesList columns_added_by_join;
typedef std::unordered_map<String, ASTPtr> Aliases;
Aliases aliases;
@ -194,6 +196,8 @@ private:
void getArrayJoinedColumnsImpl(ASTPtr ast);
void addMultipleArrayJoinAction(ExpressionActions & actions);
void addJoinAction(ExpressionActions & actions);
struct ScopeStack;
void getActionsImpl(ASTPtr ast, bool no_subqueries, bool only_consts, ScopeStack & actions_stack);
@ -210,6 +214,7 @@ private:
/// Получить таблицу, из которой идет запрос
StoragePtr getTable();
/// columns - столбцы, присутствующие до начала преобразований.
void initChain(ExpressionActionsChain & chain, NamesAndTypesList & columns);
void assertSelect();

View File

@ -1,5 +1,6 @@
#include <DB/Common/ProfileEvents.h>
#include <DB/Interpreters/ExpressionActions.h>
#include <DB/Interpreters/Join.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/DataTypes/DataTypeNested.h>
@ -28,8 +29,8 @@ Names ExpressionAction::getNeededColumns() const
}
ExpressionAction ExpressionAction::applyFunction(FunctionPtr function_,
const std::vector<std::string> & argument_names_,
std::string result_name_)
const std::vector<std::string> & argument_names_,
std::string result_name_)
{
if (result_name_ == "")
{
@ -82,6 +83,8 @@ ExpressionActions::Actions ExpressionAction::getPrerequisites(Block & sample_blo
void ExpressionAction::prepare(Block & sample_block)
{
//std::cerr << "preparing: " << toString() << std::endl;
if (type == APPLY_FUNCTION)
{
if (sample_block.has(result_name))
@ -144,6 +147,13 @@ void ExpressionAction::prepare(Block & sample_block)
current.column = nullptr;
}
}
else if (type == JOIN)
{
for (const auto & col : columns_added_by_join)
sample_block.insert(ColumnWithNameAndType(col.second->createColumn(), col.second, col.first));
std::cerr << sample_block.dumpNames() << std::endl;
}
else if (type == ADD_COLUMN)
{
if (sample_block.has(result_name))
@ -162,6 +172,8 @@ void ExpressionAction::prepare(Block & sample_block)
void ExpressionAction::execute(Block & block) const
{
//std::cerr << "executing: " << toString() << std::endl;
if (type == REMOVE_COLUMN || type == COPY_COLUMN)
if (!block.has(source_name))
throw Exception("Not found column '" + source_name + "'. There are columns: " + block.dumpNames(), ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
@ -242,10 +254,20 @@ void ExpressionAction::execute(Block & block) const
break;
}
case JOIN:
{
/// TODO Другие виды JOIN-ов.
join->anyLeftJoinBlock(block);
std::cerr << block.dumpStructure() << std::endl;
break;
}
case PROJECT:
{
Block new_block;
//std::cerr << block.dumpNames() << std::endl;
for (size_t i = 0; i < projection.size(); ++i)
{
const std::string & name = projection[i].first;
@ -284,24 +306,28 @@ std::string ExpressionAction::toString() const
switch (type)
{
case ADD_COLUMN:
ss << "+" << result_name << "(" << result_type->getName() << ")" << "[" << added_column->getName() << "]";
ss << "ADD " << result_name << " " << result_type->getName() << " " << added_column->getName();
break;
case REMOVE_COLUMN:
ss << "-" << source_name;
ss << "REMOVE " << source_name;
break;
case COPY_COLUMN:
ss << result_name << "(" << result_type->getName() << ")" << "=" << source_name;
ss << "COPY " << result_name << " " << result_type->getName() << " = " << source_name;
break;
case APPLY_FUNCTION:
ss << result_name << "(" << result_type->getName() << ")" << "= " << function->getName() << " ( ";
ss << "FUNCTION " << result_name << " " << result_type->getName() << " = " << function->getName() << "(";
for (size_t i = 0; i < argument_names.size(); ++i)
{
if (i)
ss << " , ";
ss << ", ";
ss << argument_names[i];
}
ss << " )";
ss << ")";
break;
case ARRAY_JOIN:
ss << "ARRAY JOIN ";
for (NameSet::const_iterator it = array_joined_columns.begin(); it != array_joined_columns.end(); ++it)
@ -311,18 +337,29 @@ std::string ExpressionAction::toString() const
ss << *it;
}
break;
case JOIN:
ss << "JOIN ";
for (NamesAndTypesList::const_iterator it = columns_added_by_join.begin(); it != columns_added_by_join.end(); ++it)
{
if (it != columns_added_by_join.begin())
ss << ", ";
ss << it->first;
}
break;
case PROJECT:
ss << "{";
ss << "PROJECT ";
for (size_t i = 0; i < projection.size(); ++i)
{
if (i)
ss << ", ";
ss << projection[i].first;
if (projection[i].second != "" && projection[i].second != projection[i].first)
ss << "=>" << projection[i].second;
ss << " AS " << projection[i].second;
}
ss << "}";
break;
default:
throw Exception("Unexpected Action type", ErrorCodes::LOGICAL_ERROR);
}
@ -628,6 +665,8 @@ std::string ExpressionActions::getID() const
}
ss << "}";
}
/// TODO JOIN
}
ss << ": {";

View File

@ -101,6 +101,12 @@ void ExpressionAnalyzer::init()
addMultipleArrayJoinAction(temp_actions);
}
if (select_query && select_query->join)
{
getRootActionsImpl(dynamic_cast<ASTJoin &>(*select_query->join).using_expr_list, true, false, temp_actions);
addJoinAction(temp_actions);
}
getAggregatesImpl(ast, temp_actions);
if (has_aggregation)
@ -1229,6 +1235,28 @@ bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool on
return true;
}
void ExpressionAnalyzer::addJoinAction(ExpressionActions & actions)
{
actions.add(ExpressionAction::ordinaryJoin(joins[0], columns_added_by_join));
}
bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_types)
{
assertSelect();
if (!select_query->join)
return false;
initChain(chain, columns);
ExpressionActionsChain::Step & step = chain.steps.back();
getRootActionsImpl(dynamic_cast<ASTJoin &>(*select_query->join).using_expr_list, only_types, false, *step.actions);
addJoinAction(*step.actions);
return true;
}
bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_types)
{
assertSelect();
@ -1576,6 +1604,8 @@ void ExpressionAnalyzer::findJoins(NameSet & required_columns)
String name = join_keys_expr_list.children[i]->getColumnName();
join_key_names[i] = name;
std::cerr << "USING " << name << std::endl;
if (!join_key_names_set.insert(name).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
}
@ -1606,11 +1636,23 @@ void ExpressionAnalyzer::findJoins(NameSet & required_columns)
joins.push_back(join);
std::cerr << right_table_sample.dumpNames() << std::endl;
for (const auto & x : required_columns)
std::cerr << "Required column: " << x << std::endl;
/// Удаляем из required_columns столбцы, которые есть в подзапросе, но нет в USING-е.
for (NameSet::iterator it = required_columns.begin(); it != required_columns.end();)
{
if (right_table_sample.has(*it) && !join_key_names_set.count(*it))
{
ColumnWithNameAndType & added_col = right_table_sample.getByName(*it);
columns_added_by_join.emplace_back(added_col.name, added_col.type);
std::cerr << "Column added by JOIN: " << *it << std::endl;
required_columns.erase(it++);
}
else
++it;
}

View File

@ -243,6 +243,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
need_aggregate = query_analyzer->hasAggregation();
query_analyzer->appendArrayJoin(chain, !first_stage);
query_analyzer->appendJoin(chain, !first_stage);
if (query_analyzer->appendWhere(chain, !first_stage))
{

View File

@ -6,7 +6,7 @@
namespace DB
{
size_t Join::getTotalRowCount() const
{
size_t rows = 0;
@ -32,8 +32,8 @@ size_t Join::getTotalByteCount() const
bytes += pool.size();
return bytes;
}
bool Join::checkSizeLimits() const
{
if (max_rows && getTotalRowCount() > max_rows)
@ -42,7 +42,7 @@ bool Join::checkSizeLimits() const
return false;
return true;
}
bool Join::checkExternalSizeLimits() const
{
@ -104,8 +104,12 @@ bool Join::insertFromBlock(const Block & block)
init(Set::chooseMethod(key_columns, keys_fit_128_bits, key_sizes));
blocks.push_back(block);
const Block * stored_block = &blocks.back();
/// TODO Удалить из stored_block ключевые столбцы, так как они не нужны.
Block * stored_block = &blocks.back();
/// Удаляем из stored_block ключевые столбцы, так как они не нужны.
for (const auto & name : key_names)
stored_block->erase(stored_block->getPositionByName(name));
if (type == Set::KEY_64)
{
@ -225,6 +229,8 @@ void Join::anyLeftJoinBlock(Block & block)
if (blocks.empty())
throw Exception("Attempt to JOIN with empty table", ErrorCodes::EMPTY_DATA_PASSED);
std::cerr << "!!! " << block.dumpNames() << std::endl;
size_t keys_size = key_names.size();
ConstColumnPlainPtrs key_columns(keys_size);
@ -251,7 +257,9 @@ void Join::anyLeftJoinBlock(Block & block)
added_columns[i]->reserve(src_column.column->size());
}
size_t rows = block.rows();
std::cerr << "??? " << block.dumpNames() << std::endl;
size_t rows = block.rowsInFirstColumn();
if (type == Set::KEY_64)
{