dbms: StorageJoin: development [#METR-2944].

This commit is contained in:
Alexey Milovidov 2015-01-28 05:37:05 +03:00
parent 2b0b4d0528
commit e4e313f54f
10 changed files with 92 additions and 24 deletions

View File

@ -271,6 +271,7 @@ namespace ErrorCodes
UNKNOWN_BLOCK_INFO_FIELD,
BAD_COLLATION,
CANNOT_COMPILE_CODE,
INCOMPATIBLE_TYPE_OF_JOIN,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -24,7 +24,8 @@ public:
transfer_overflow_mode(limits.transfer_overflow_mode)
{
for (auto & elem : subqueries_for_sets)
children.push_back(elem.second.source);
if (elem.second.source)
children.push_back(elem.second.source);
children.push_back(input);
}

View File

@ -39,6 +39,9 @@ public:
/// Получить доступ к внутренностям.
JoinPtr & getJoin() { return join; }
/// Убедиться, что структура данных подходит для осуществления JOIN такого типа.
void assertCompatible(ASTJoin::Kind kind_, ASTJoin::Strictness strictness_) const;
private:
const Names & key_names;
ASTJoin::Kind kind; /// LEFT | INNER

View File

@ -10,12 +10,14 @@ Block CreatingSetsBlockInputStream::readImpl()
if (!created)
{
/// Заполнение временных таблиц идёт первым - потому что эти таблицы могут затем использоваться для создания Set/Join.
for (auto & elem : subqueries_for_sets)
{
create(elem.second);
if (isCancelled())
return res;
if (elem.second.source) /// Бывают заранее подготовленные Set/Join - для них не указывается source.
{
create(elem.second);
if (isCancelled())
return res;
}
}
created = true;

View File

@ -132,8 +132,8 @@ void Context::assertTableExists(const String & database_name, const String & tab
String db = database_name.empty() ? current_database : database_name;
Databases::const_iterator it;
if (shared->databases.end() == (it = shared->databases.find(db)))
Databases::const_iterator it = shared->databases.find(db);
if (shared->databases.end() == it)
throw Exception("Database " + db + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
if (it->second.end() == it->second.find(table_name))
@ -199,8 +199,8 @@ StoragePtr Context::tryGetExternalTable(const String & table_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
Tables::const_iterator jt;
if (external_tables.end() == (jt = external_tables.find(table_name)))
Tables::const_iterator jt = external_tables.find(table_name);
if (external_tables.end() == jt)
return StoragePtr();
return jt->second;
@ -211,9 +211,6 @@ StoragePtr Context::getTable(const String & database_name, const String & table_
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
Databases::const_iterator it;
Tables::const_iterator jt;
if (database_name.empty())
{
StoragePtr res;
@ -226,10 +223,12 @@ StoragePtr Context::getTable(const String & database_name, const String & table_
}
String db = database_name.empty() ? current_database : database_name;
if (shared->databases.end() == (it = shared->databases.find(db)))
Databases::const_iterator it = shared->databases.find(db);
if (shared->databases.end() == it)
throw Exception("Database " + db + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
if (it->second.end() == (jt = it->second.find(table_name)))
Tables::const_iterator jt = it->second.find(table_name);
if (it->second.end() == jt)
throw Exception("Table " + db + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return jt->second;
@ -252,12 +251,12 @@ StoragePtr Context::tryGetTable(const String & database_name, const String & tab
}
String db = database_name.empty() ? current_database : database_name;
Databases::const_iterator it;
if (shared->databases.end() == (it = shared->databases.find(db)))
Databases::const_iterator it = shared->databases.find(db);
if (shared->databases.end() == it)
return StoragePtr();
Tables::const_iterator jt;
if (it->second.end() == (jt = it->second.find(table_name)))
Tables::const_iterator jt = it->second.find(table_name);
if (it->second.end() == jt)
return StoragePtr();
return jt->second;

View File

@ -22,11 +22,10 @@
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Storages/StorageMergeTree.h>
#include <DB/Storages/StorageDistributed.h>
#include <DB/Storages/StorageMemory.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/StorageSet.h>
#include <DB/Storages/StorageJoin.h>
#include <DB/DataStreams/LazyBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
@ -1431,15 +1430,35 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
String join_id = ast_join.table->getColumnName();
SubqueryForSet & subquery_for_set = subqueries_for_sets[join_id];
/// Особый случай - если справа JOIN указано имя таблицы, при чём, таблица имеет тип Join (заранее подготовленное отображение).
/// TODO В этом синтаксисе не поддерживается указание имени БД.
ASTIdentifier * identifier = typeid_cast<ASTIdentifier *>(&*ast_join.table);
if (identifier)
{
StoragePtr table = context.tryGetTable("", identifier->name);
if (table)
{
StorageJoin * storage_join = typeid_cast<StorageJoin *>(table.get());
if (storage_join)
{
storage_join->assertCompatible(ast_join.kind, ast_join.strictness);
/// TODO Проверять набор ключей.
JoinPtr & join = storage_join->getJoin();
subquery_for_set.join = join;
}
}
}
if (!subquery_for_set.join)
{
Names join_key_names_left(join_key_names_left_set.begin(), join_key_names_left_set.end());
Names join_key_names_right(join_key_names_right_set.begin(), join_key_names_right_set.end());
JoinPtr join = new Join(join_key_names_left, join_key_names_right, settings.limits, ast_join.kind, ast_join.strictness);
/* for (const auto & name_type : columns_added_by_join)
std::cerr << "! Column added by JOIN: " << name_type.name << std::endl;*/
Names required_joined_columns(join_key_names_right.begin(), join_key_names_right.end());
for (const auto & name_type : columns_added_by_join)
required_joined_columns.push_back(name_type.name);

View File

@ -216,7 +216,7 @@ StoragePtr StorageFactory::get(
if (!kind_id)
throw Exception("Second parameter of storage Join must be LEFT or INNER (without quotes).", ErrorCodes::BAD_ARGUMENTS);
const String kind_str = Poco::toLower(strictness_id->name);
const String kind_str = Poco::toLower(kind_id->name);
ASTJoin::Kind kind;
if (kind_str == "left")
kind = ASTJoin::Kind::Left;

View File

@ -37,4 +37,12 @@ StorageJoin::StorageJoin(
}
void StorageJoin::assertCompatible(ASTJoin::Kind kind_, ASTJoin::Strictness strictness_) const
{
/// NOTE Можно немного ослабить.
if (!(kind == kind_ && strictness == strictness_))
throw Exception("Table " + name + " has incompatible type of JOIN.", ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN);
}
}

View File

@ -0,0 +1,20 @@
0
1 abc
2 def
3
4
5
6
7
8
9
0
1 abc
2 def
3
4
5
6 ghi
7
8
9

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS test.join;
CREATE TABLE test.join (k UInt8, s String) ENGINE = Join(ANY, LEFT, k);
USE test;
INSERT INTO test.join VALUES (1, 'abc'), (2, 'def');
SELECT k, s FROM (SELECT number AS k FROM system.numbers LIMIT 10) ANY LEFT JOIN join USING k;
INSERT INTO test.join VALUES (6, 'ghi');
SELECT k, s FROM (SELECT number AS k FROM system.numbers LIMIT 10) ANY LEFT JOIN join USING k;
USE default;
DROP TABLE test.join;