New ANY JOIN for StorageJoin + SEMI, ANTI

This commit is contained in:
chertus 2019-12-25 20:32:55 +03:00
parent 6e785ea4bd
commit 4fd7340416
4 changed files with 233 additions and 80 deletions

View File

@ -69,7 +69,7 @@ void StorageJoin::truncate(const ASTPtr &, const Context &, TableStructureWriteL
HashJoinPtr StorageJoin::getJoin(std::shared_ptr<AnalyzedJoin> analyzed_join) const
{
if (!(kind == analyzed_join->kind() && strictness == analyzed_join->strictness()))
if (kind != analyzed_join->kind() || strictness != analyzed_join->strictness())
throw Exception("Table " + table_name + " has incompatible type of JOIN.", ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN);
/// TODO: check key columns
@ -95,58 +95,14 @@ void registerStorageJoin(StorageFactory & factory)
ASTs & engine_args = args.engine_args;
if (engine_args.size() < 3)
throw Exception(
"Storage Join requires at least 3 parameters: Join(ANY|ALL, LEFT|INNER, keys...).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto opt_strictness_id = tryGetIdentifierName(engine_args[0]);
if (!opt_strictness_id)
throw Exception("First parameter of storage Join must be ANY or ALL (without quotes).", ErrorCodes::BAD_ARGUMENTS);
const String strictness_str = Poco::toLower(*opt_strictness_id);
ASTTableJoin::Strictness strictness;
if (strictness_str == "any")
strictness = ASTTableJoin::Strictness::RightAny;
else if (strictness_str == "all")
strictness = ASTTableJoin::Strictness::All;
else
throw Exception("First parameter of storage Join must be ANY or ALL (without quotes).", ErrorCodes::BAD_ARGUMENTS);
auto opt_kind_id = tryGetIdentifierName(engine_args[1]);
if (!opt_kind_id)
throw Exception("Second parameter of storage Join must be LEFT or INNER (without quotes).", ErrorCodes::BAD_ARGUMENTS);
const String kind_str = Poco::toLower(*opt_kind_id);
ASTTableJoin::Kind kind;
if (kind_str == "left")
kind = ASTTableJoin::Kind::Left;
else if (kind_str == "inner")
kind = ASTTableJoin::Kind::Inner;
else if (kind_str == "right")
kind = ASTTableJoin::Kind::Right;
else if (kind_str == "full")
kind = ASTTableJoin::Kind::Full;
else
throw Exception("Second parameter of storage Join must be LEFT or INNER or RIGHT or FULL (without quotes).", ErrorCodes::BAD_ARGUMENTS);
Names key_names;
key_names.reserve(engine_args.size() - 2);
for (size_t i = 2, size = engine_args.size(); i < size; ++i)
{
auto opt_key = tryGetIdentifierName(engine_args[i]);
if (!opt_key)
throw Exception("Parameter №" + toString(i + 1) + " of storage Join don't look like column name.", ErrorCodes::BAD_ARGUMENTS);
key_names.push_back(*opt_key);
}
auto & settings = args.context.getSettingsRef();
auto join_use_nulls = settings.join_use_nulls;
auto max_rows_in_join = settings.max_rows_in_join;
auto max_bytes_in_join = settings.max_bytes_in_join;
auto join_overflow_mode = settings.join_overflow_mode;
auto join_any_take_last_row = settings.join_any_take_last_row;
auto old_any_join = settings.any_join_distinct_right_table_keys;
if (args.storage_def && args.storage_def->settings)
{
@ -162,6 +118,8 @@ void registerStorageJoin(StorageFactory & factory)
join_overflow_mode.set(setting.value);
else if (setting.name == "join_any_take_last_row")
join_any_take_last_row.set(setting.value);
else if (setting.name == "any_join_distinct_right_table_keys")
old_any_join.set(setting.value);
else
throw Exception(
"Unknown setting " + setting.name + " for storage " + args.engine_name,
@ -169,6 +127,68 @@ void registerStorageJoin(StorageFactory & factory)
}
}
if (engine_args.size() < 3)
throw Exception(
"Storage Join requires at least 3 parameters: Join(ANY|ALL|SEMI|ANTI, LEFT|INNER|RIGHT, keys...).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTTableJoin::Strictness strictness = ASTTableJoin::Strictness::Unspecified;
ASTTableJoin::Kind kind = ASTTableJoin::Kind::Comma;
if (auto opt_strictness_id = tryGetIdentifierName(engine_args[0]))
{
const String strictness_str = Poco::toLower(*opt_strictness_id);
if (strictness_str == "any" || strictness_str == "\'any\'")
{
if (old_any_join)
strictness = ASTTableJoin::Strictness::RightAny;
else
strictness = ASTTableJoin::Strictness::Any;
}
else if (strictness_str == "all" || strictness_str == "\'all\'")
strictness = ASTTableJoin::Strictness::All;
else if (strictness_str == "semi" || strictness_str == "\'semi\'")
strictness = ASTTableJoin::Strictness::Semi;
else if (strictness_str == "anti" || strictness_str == "\'anti\'")
strictness = ASTTableJoin::Strictness::Anti;
}
if (strictness == ASTTableJoin::Strictness::Unspecified)
throw Exception("First parameter of storage Join must be ANY or ALL or SEMI or ANTI.", ErrorCodes::BAD_ARGUMENTS);
if (auto opt_kind_id = tryGetIdentifierName(engine_args[1]))
{
const String kind_str = Poco::toLower(*opt_kind_id);
if (kind_str == "left" || kind_str == "\'left\'")
kind = ASTTableJoin::Kind::Left;
else if (kind_str == "inner" || kind_str == "\'inner\'")
kind = ASTTableJoin::Kind::Inner;
else if (kind_str == "right" || kind_str == "\'right\'")
kind = ASTTableJoin::Kind::Right;
else if (kind_str == "full" || kind_str == "\'full\'")
{
if (strictness == ASTTableJoin::Strictness::Any)
strictness = ASTTableJoin::Strictness::RightAny;
kind = ASTTableJoin::Kind::Full;
}
}
if (kind == ASTTableJoin::Kind::Comma)
throw Exception("Second parameter of storage Join must be LEFT or INNER or RIGHT or FULL.", ErrorCodes::BAD_ARGUMENTS);
Names key_names;
key_names.reserve(engine_args.size() - 2);
for (size_t i = 2, size = engine_args.size(); i < size; ++i)
{
auto opt_key = tryGetIdentifierName(engine_args[i]);
if (!opt_key)
throw Exception("Parameter №" + toString(i + 1) + " of storage Join don't look like column name.", ErrorCodes::BAD_ARGUMENTS);
key_names.push_back(*opt_key);
}
return StorageJoin::create(
args.data_path,
args.database_name,
@ -244,8 +264,8 @@ protected:
Block block;
if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps,
[&](auto, auto strictness, auto & map) { block = createBlock<strictness>(map); }))
throw Exception("Logical error: unknown JOIN strictness (must be ANY or ALL)", ErrorCodes::LOGICAL_ERROR);
[&](auto kind, auto strictness, auto & map) { block = createBlock<kind, strictness>(map); }))
throw Exception("Logical error: unknown JOIN strictness", ErrorCodes::LOGICAL_ERROR);
return block;
}
@ -263,7 +283,7 @@ private:
std::unique_ptr<void, std::function<void(void *)>> position; /// type erasure
template <ASTTableJoin::Strictness STRICTNESS, typename Maps>
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
Block createBlock(const Maps & maps)
{
for (size_t i = 0; i < sample_block.columns(); ++i)
@ -290,7 +310,7 @@ private:
{
#define M(TYPE) \
case Join::Type::TYPE: \
rows_added = fillColumns<STRICTNESS>(*maps.TYPE); \
rows_added = fillColumns<KIND, STRICTNESS>(*maps.TYPE); \
break;
APPLY_FOR_JOIN_VARIANTS_LIMITED(M)
#undef M
@ -321,8 +341,7 @@ private:
return res;
}
template <ASTTableJoin::Strictness STRICTNESS, typename Map>
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Map>
size_t fillColumns(const Map & map)
{
size_t rows_added = 0;
@ -339,34 +358,35 @@ private:
{
if constexpr (STRICTNESS == ASTTableJoin::Strictness::RightAny)
{
for (size_t j = 0; j < columns.size(); ++j)
if (j == key_pos)
columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey()));
else
columns[j]->insertFrom(*it->getMapped().block->getByPosition(column_indices[j]).column.get(), it->getMapped().row_num);
++rows_added;
fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
}
else if constexpr (STRICTNESS == ASTTableJoin::Strictness::All)
{
fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
}
else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any)
{
throw Exception("New ANY join storage is not implemented yet (set any_join_distinct_right_table_keys=1 to use old one)",
ErrorCodes::NOT_IMPLEMENTED);
if constexpr (KIND == ASTTableJoin::Kind::Left || KIND == ASTTableJoin::Kind::Inner)
fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
else if constexpr (KIND == ASTTableJoin::Kind::Right)
fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
}
else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof ||
STRICTNESS == ASTTableJoin::Strictness::Semi ||
STRICTNESS == ASTTableJoin::Strictness::Anti)
else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Semi)
{
throw Exception("ASOF|SEMI|ANTI join storage is not implemented yet", ErrorCodes::NOT_IMPLEMENTED);
if constexpr (KIND == ASTTableJoin::Kind::Left)
fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
else if constexpr (KIND == ASTTableJoin::Kind::Right)
fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
}
else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Anti)
{
if constexpr (KIND == ASTTableJoin::Kind::Left)
fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
else if constexpr (KIND == ASTTableJoin::Kind::Right)
fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
}
else
for (auto ref_it = it->getMapped().begin(); ref_it.ok(); ++ref_it)
{
for (size_t j = 0; j < columns.size(); ++j)
if (j == key_pos)
columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey()));
else
columns[j]->insertFrom(*ref_it->block->getByPosition(column_indices[j]).column.get(), ref_it->row_num);
++rows_added;
}
throw Exception("This JOIN is not implemented yet", ErrorCodes::NOT_IMPLEMENTED);
if (rows_added >= max_block_size)
{
@ -377,6 +397,33 @@ private:
return rows_added;
}
template <typename Map>
static void fillOne(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it,
const std::optional<size_t> & key_pos, size_t & rows_added)
{
for (size_t j = 0; j < columns.size(); ++j)
if (j == key_pos)
columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey()));
else
columns[j]->insertFrom(*it->getMapped().block->getByPosition(column_indices[j]).column.get(), it->getMapped().row_num);
++rows_added;
}
template <typename Map>
static void fillAll(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it,
const std::optional<size_t> & key_pos, size_t & rows_added)
{
for (auto ref_it = it->getMapped().begin(); ref_it.ok(); ++ref_it)
{
for (size_t j = 0; j < columns.size(); ++j)
if (j == key_pos)
columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey()));
else
columns[j]->insertFrom(*ref_it->block->getByPosition(column_indices[j]).column.get(), ref_it->row_num);
++rows_added;
}
}
};

View File

@ -1,24 +1,26 @@
DROP TABLE IF EXISTS testJoinTable;
CREATE TABLE testJoinTable (number UInt64, data String) ENGINE = Join(ANY, INNER, number);
SET any_join_distinct_right_table_keys = 1;
SET enable_optimize_predicate_expression = 0;
CREATE TABLE testJoinTable (number UInt64, data String) ENGINE = Join(ANY, INNER, number) SETTINGS any_join_distinct_right_table_keys = 1;
INSERT INTO testJoinTable VALUES (1, '1'), (2, '2'), (3, '3');
SELECT * FROM (SELECT * FROM numbers(10)) INNER JOIN testJoinTable USING number;
SELECT * FROM (SELECT * FROM numbers(10)) INNER JOIN testJoinTable USING number; -- { serverError 264 }
SELECT * FROM (SELECT * FROM numbers(10)) INNER JOIN (SELECT * FROM testJoinTable) USING number;
SELECT * FROM (SELECT * FROM numbers(10)) ANY INNER JOIN testJoinTable USING number;
SELECT * FROM testJoinTable;
DROP TABLE testJoinTable;
SELECT '-';
SET any_join_distinct_right_table_keys = 1;
DROP TABLE IF EXISTS master;
DROP TABLE IF EXISTS transaction;
CREATE TABLE transaction (id Int32, value Float64, master_id Int32) ENGINE = MergeTree() ORDER BY id;
CREATE TABLE master (id Int32, name String) ENGINE = Join (ANY, LEFT, id);
CREATE TABLE master (id Int32, name String) ENGINE = Join (ANY, LEFT, id) SETTINGS any_join_distinct_right_table_keys = 1;
INSERT INTO master VALUES (1, 'ONE');
INSERT INTO transaction VALUES (1, 52.5, 1);
@ -34,7 +36,7 @@ DROP TABLE IF EXISTS some_join;
DROP TABLE IF EXISTS tbl;
CREATE TABLE tbl (eventDate Date, id String) ENGINE = MergeTree() PARTITION BY tuple() ORDER BY eventDate;
CREATE TABLE some_join (id String, value String) ENGINE = Join(ANY, LEFT, id);
CREATE TABLE some_join (id String, value String) ENGINE = Join(ANY, LEFT, id) SETTINGS any_join_distinct_right_table_keys = 1;
SELECT * FROM tbl AS t ANY LEFT JOIN some_join USING (id);
SELECT * FROM tbl AS t ANY LEFT JOIN some_join AS d USING (id);

View File

@ -0,0 +1,32 @@
any left
0 a1
1 a2
2 a3 b1
3 a4
4 a5 b3
any inner
2 a3 b1
4 a5 b3
any right
2 a3 b1
2 a3 b2
4 a5 b3
4 a5 b4
4 a5 b5
5 b6
semi left
2 a3 b1
2 a6 b1
4 a5 b3
semi right
2 a3 b1
2 a3 b2
4 a5 b3
4 a5 b4
4 a5 b5
anti left
0 a1
1 a2
3 a4
anti right
5 b6

View File

@ -0,0 +1,72 @@
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS any_left_join;
DROP TABLE IF EXISTS any_inner_join;
DROP TABLE IF EXISTS any_right_join;
DROP TABLE IF EXISTS any_full_join;
DROP TABLE IF EXISTS semi_left_join;
DROP TABLE IF EXISTS semi_right_join;
DROP TABLE IF EXISTS anti_left_join;
DROP TABLE IF EXISTS anti_right_join;
CREATE TABLE t1 (x UInt32, str String) engine = Memory;
CREATE TABLE any_left_join (x UInt32, s String) engine = Join(ANY, LEFT, x);
CREATE TABLE any_inner_join (x UInt32, s String) engine = Join(ANY, INNER, x);
CREATE TABLE any_right_join (x UInt32, s String) engine = Join(ANY, RIGHT, x);
CREATE TABLE semi_left_join (x UInt32, s String) engine = Join(SEMI, LEFT, x);
CREATE TABLE semi_right_join (x UInt32, s String) engine = Join(SEMI, RIGHT, x);
CREATE TABLE anti_left_join (x UInt32, s String) engine = Join(ANTI, LEFT, x);
CREATE TABLE anti_right_join (x UInt32, s String) engine = Join(ANTI, RIGHT, x);
INSERT INTO t1 (x, str) VALUES (0, 'a1'), (1, 'a2'), (2, 'a3'), (3, 'a4'), (4, 'a5');
INSERT INTO any_left_join (x, s) VALUES (2, 'b1'), (2, 'b2'), (4, 'b3'), (4, 'b4'), (4, 'b5'), (5, 'b6');
INSERT INTO any_inner_join (x, s) VALUES (2, 'b1'), (2, 'b2'), (4, 'b3'), (4, 'b4'), (4, 'b5'), (5, 'b6');
INSERT INTO any_right_join (x, s) VALUES (2, 'b1'), (2, 'b2'), (4, 'b3'), (4, 'b4'), (4, 'b5'), (5, 'b6');
INSERT INTO semi_left_join (x, s) VALUES (2, 'b1'), (2, 'b2'), (4, 'b3'), (4, 'b4'), (4, 'b5'), (5, 'b6');
INSERT INTO semi_right_join (x, s) VALUES (2, 'b1'), (2, 'b2'), (4, 'b3'), (4, 'b4'), (4, 'b5'), (5, 'b6');
INSERT INTO anti_left_join (x, s) VALUES (2, 'b1'), (2, 'b2'), (4, 'b3'), (4, 'b4'), (4, 'b5'), (5, 'b6');
INSERT INTO anti_right_join (x, s) VALUES (2, 'b1'), (2, 'b2'), (4, 'b3'), (4, 'b4'), (4, 'b5'), (5, 'b6');
SET join_use_nulls = 0;
SET any_join_distinct_right_table_keys = 0;
SELECT 'any left';
SELECT * FROM t1 ANY LEFT JOIN any_left_join j USING(x) ORDER BY x, str, s;
SELECT 'any inner';
SELECT * FROM t1 ANY INNER JOIN any_inner_join j USING(x) ORDER BY x, str, s;
SELECT 'any right';
SELECT * FROM t1 ANY RIGHT JOIN any_right_join j USING(x) ORDER BY x, str, s;
INSERT INTO t1 (x, str) VALUES (2, 'a6');
SELECT 'semi left';
SELECT * FROM t1 SEMI LEFT JOIN semi_left_join j USING(x) ORDER BY x, str, s;
SELECT 'semi right';
SELECT * FROM t1 SEMI RIGHT JOIN semi_right_join j USING(x) ORDER BY x, str, s;
SELECT 'anti left';
SELECT * FROM t1 ANTI LEFT JOIN anti_left_join j USING(x) ORDER BY x, str, s;
SELECT 'anti right';
SELECT * FROM t1 ANTI RIGHT JOIN anti_right_join j USING(x) ORDER BY x, str, s;
DROP TABLE t1;
DROP TABLE any_left_join;
DROP TABLE any_inner_join;
DROP TABLE any_right_join;
DROP TABLE semi_left_join;
DROP TABLE semi_right_join;
DROP TABLE anti_left_join;
DROP TABLE anti_right_join;