Allowed JOIN to work with Nullable keys [#CLICKHOUSE-4].

This commit is contained in:
Alexey Milovidov 2017-03-30 17:09:24 +03:00
parent a5937ed024
commit 40ce6f86db
4 changed files with 230 additions and 178 deletions

View File

@ -12,6 +12,7 @@
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnFixedString.h>
#include <DB/Columns/ColumnNullable.h>
#include <DB/DataStreams/IBlockInputStream.h>
@ -203,6 +204,12 @@ struct Limits;
* In case of ALL ... JOIN - form new columns with all found rows,
* and also fill 'offsets' array, describing how many times we need to replicate values of "left" table.
* Then replicate columns of "left" table.
*
* How Nullable keys are processed:
*
* NULLs never join to anything, even to each other.
* During building of map, we just skip keys with NULL value of any component.
* During joining, we simply treat rows with any NULLs in key as non joined.
*/
class Join
{
@ -366,7 +373,6 @@ private:
static Type chooseMethod(const ConstColumnPlainPtrs & key_columns, Sizes & key_sizes);
bool keys_fit_128_bits;
Sizes key_sizes;
Block sample_block_with_columns_to_add;
@ -390,28 +396,15 @@ private:
void init(Type type_);
template <ASTTableJoin::Strictness STRICTNESS, typename Maps>
void insertFromBlockImpl(Maps & maps, size_t rows, const ConstColumnPlainPtrs & key_columns, size_t keys_size, Block * stored_block);
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
void insertFromBlockImplType(Map & map, size_t rows, const ConstColumnPlainPtrs & key_columns, size_t keys_size, Block * stored_block);
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
void joinBlockImpl(Block & block, const Maps & maps) const;
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
void joinBlockImplType(
Block & block, const Map & map, size_t rows, const ConstColumnPlainPtrs & key_columns, size_t keys_size,
size_t num_columns_to_add, size_t num_columns_to_skip, ColumnPlainPtrs & added_columns,
std::unique_ptr<IColumn::Filter> & filter,
IColumn::Offset_t & current_offset, std::unique_ptr<IColumn::Offsets_t> & offsets_to_replicate) const;
void joinBlockImplCross(Block & block) const;
bool checkSizeLimits() const;
/// Throw an exception if blocks have different types of key columns.
void checkTypesOfKeys(const Block & block_left, const Block & block_right) const;
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
void joinBlockImpl(Block & block, const Maps & maps) const;
void joinBlockImplCross(Block & block) const;
};
using JoinPtr = std::shared_ptr<Join>;

View File

@ -3,10 +3,10 @@
#include <Poco/RWLock.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnConst.h>
#include <DB/Columns/ColumnNullable.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/Interpreters/Limits.h>
#include <DB/Interpreters/SetVariants.h>
#include <DB/Interpreters/NullableUtils.h>
#include <DB/Parsers/IAST.h>
#include <DB/Storages/MergeTree/BoolMask.h>

View File

@ -4,6 +4,8 @@
#include <DB/Columns/ColumnFixedString.h>
#include <DB/Interpreters/Join.h>
#include <DB/Interpreters/NullableUtils.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Core/ColumnNumbers.h>
@ -239,6 +241,49 @@ bool Join::checkSizeLimits() const
}
void Join::setSampleBlock(const Block & block)
{
Poco::ScopedWriteRWLock lock(rwlock);
if (!empty())
return;
size_t keys_size = key_names_right.size();
ConstColumnPlainPtrs key_columns(keys_size);
for (size_t i = 0; i < keys_size; ++i)
key_columns[i] = block.getByName(key_names_right[i]).column.get();
/// Choose data structure to use for JOIN.
init(chooseMethod(key_columns, key_sizes));
sample_block_with_columns_to_add = block;
/// Переносим из sample_block_with_columns_to_add ключевые столбцы в sample_block_with_keys, сохраняя порядок.
size_t pos = 0;
while (pos < sample_block_with_columns_to_add.columns())
{
const auto & name = sample_block_with_columns_to_add.getByPosition(pos).name;
if (key_names_right.end() != std::find(key_names_right.begin(), key_names_right.end(), name))
{
sample_block_with_keys.insert(sample_block_with_columns_to_add.getByPosition(pos));
sample_block_with_columns_to_add.erase(pos);
}
else
++pos;
}
for (size_t i = 0, size = sample_block_with_columns_to_add.columns(); i < size; ++i)
{
auto & column = sample_block_with_columns_to_add.getByPosition(i);
if (!column.column)
column.column = column.type->createColumn();
}
}
namespace
{
/// Вставка элемента в хэш-таблицу вида ключ -> ссылка на строку, которая затем будет использоваться при JOIN-е.
template <ASTTableJoin::Strictness STRICTNESS, typename Map, typename KeyGetter>
struct Inserter
@ -294,21 +339,40 @@ struct Inserter<ASTTableJoin::Strictness::All, Map, KeyGetter>
};
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
void NO_INLINE Join::insertFromBlockImplType(Map & map, size_t rows, const ConstColumnPlainPtrs & key_columns, size_t keys_size, Block * stored_block)
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
void NO_INLINE insertFromBlockImplTypeCase(
Map & map, size_t rows, const ConstColumnPlainPtrs & key_columns,
size_t keys_size, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
KeyGetter key_getter(key_columns);
for (size_t i = 0; i < rows; ++i)
{
if (has_null_map && (*null_map)[i])
continue;
auto key = key_getter.getKey(key_columns, keys_size, i, key_sizes);
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key, stored_block, i, pool);
}
}
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
void insertFromBlockImplType(
Map & map, size_t rows, const ConstColumnPlainPtrs & key_columns,
size_t keys_size, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
if (null_map)
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(map, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool);
else
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(map, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool);
}
template <ASTTableJoin::Strictness STRICTNESS, typename Maps>
void Join::insertFromBlockImpl(Maps & maps, size_t rows, const ConstColumnPlainPtrs & key_columns, size_t keys_size, Block * stored_block)
void insertFromBlockImpl(
Join::Type type, Maps & maps, size_t rows, const ConstColumnPlainPtrs & key_columns,
size_t keys_size, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
switch (type)
{
@ -318,7 +382,7 @@ void Join::insertFromBlockImpl(Maps & maps, size_t rows, const ConstColumnPlainP
#define M(TYPE) \
case Join::Type::TYPE: \
insertFromBlockImplType<STRICTNESS, typename KeyGetterForType<Join::Type::TYPE>::Type>(\
*maps.TYPE, rows, key_columns, keys_size, stored_block); \
*maps.TYPE, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool); \
break;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
@ -327,46 +391,6 @@ void Join::insertFromBlockImpl(Maps & maps, size_t rows, const ConstColumnPlainP
throw Exception("Unknown JOIN keys variant.", ErrorCodes::UNKNOWN_SET_DATA_VARIANT);
}
}
void Join::setSampleBlock(const Block & block)
{
Poco::ScopedWriteRWLock lock(rwlock);
if (!empty())
return;
size_t keys_size = key_names_right.size();
ConstColumnPlainPtrs key_columns(keys_size);
for (size_t i = 0; i < keys_size; ++i)
key_columns[i] = block.getByName(key_names_right[i]).column.get();
/// Choose data structure to use for JOIN.
init(chooseMethod(key_columns, key_sizes));
sample_block_with_columns_to_add = block;
/// Переносим из sample_block_with_columns_to_add ключевые столбцы в sample_block_with_keys, сохраняя порядок.
size_t pos = 0;
while (pos < sample_block_with_columns_to_add.columns())
{
const auto & name = sample_block_with_columns_to_add.getByPosition(pos).name;
if (key_names_right.end() != std::find(key_names_right.begin(), key_names_right.end(), name))
{
sample_block_with_keys.insert(sample_block_with_columns_to_add.getByPosition(pos));
sample_block_with_columns_to_add.erase(pos);
}
else
++pos;
}
for (size_t i = 0, size = sample_block_with_columns_to_add.columns(); i < size; ++i)
{
auto & column = sample_block_with_columns_to_add.getByPosition(i);
if (!column.column)
column.column = column.type->createColumn();
}
}
@ -380,10 +404,10 @@ bool Join::insertFromBlock(const Block & block)
size_t keys_size = key_names_right.size();
ConstColumnPlainPtrs key_columns(keys_size);
/// Редкий случай, когда ключи являются константами. Чтобы не поддерживать отдельный код, материализуем их.
/// Rare case, when keys are constant. To avoid code bloat, simply materialize them.
Columns materialized_columns;
/// Запоминаем столбцы ключей, с которыми будем работать
/// Memoize key columns to work.
for (size_t i = 0; i < keys_size; ++i)
{
key_columns[i] = block.getByName(key_names_right[i]).column.get();
@ -395,6 +419,11 @@ bool Join::insertFromBlock(const Block & block)
}
}
/// We will insert to the map only keys, where all components are not NULL.
ColumnPtr null_map_holder;
ConstNullMapPtr null_map{};
extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map);
size_t rows = block.rows();
blocks.push_back(block);
@ -422,7 +451,7 @@ bool Join::insertFromBlock(const Block & block)
stored_block->erase(stored_block->getPositionByName(name));
}
/// Редкий случай, когда соединяемые столбцы являются константами. Чтобы не поддерживать отдельный код, материализуем их.
/// Rare case, when joined columns are constant. To avoid code bloat, simply materialize them.
for (size_t i = 0, size = stored_block->columns(); i < size; ++i)
{
ColumnPtr col = stored_block->safeGetByPosition(i).column;
@ -436,16 +465,16 @@ bool Join::insertFromBlock(const Block & block)
if (!getFullness(kind))
{
if (strictness == ASTTableJoin::Strictness::Any)
insertFromBlockImpl<ASTTableJoin::Strictness::Any>(maps_any, rows, key_columns, keys_size, stored_block);
insertFromBlockImpl<ASTTableJoin::Strictness::Any>(type, maps_any, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool);
else
insertFromBlockImpl<ASTTableJoin::Strictness::All>(maps_all, rows, key_columns, keys_size, stored_block);
insertFromBlockImpl<ASTTableJoin::Strictness::All>(type, maps_all, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool);
}
else
{
if (strictness == ASTTableJoin::Strictness::Any)
insertFromBlockImpl<ASTTableJoin::Strictness::Any>(maps_any_full, rows, key_columns, keys_size, stored_block);
insertFromBlockImpl<ASTTableJoin::Strictness::Any>(type, maps_any_full, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool);
else
insertFromBlockImpl<ASTTableJoin::Strictness::All>(maps_all_full, rows, key_columns, keys_size, stored_block);
insertFromBlockImpl<ASTTableJoin::Strictness::All>(type, maps_all_full, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool);
}
}
@ -469,50 +498,46 @@ bool Join::insertFromBlock(const Block & block)
}
namespace
{
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Map>
struct Adder;
template <typename Map>
struct Adder<ASTTableJoin::Kind::Left, ASTTableJoin::Strictness::Any, Map>
{
static void add(const Map & map, const typename Map::key_type & key, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
static void addFound(const typename Map::const_iterator & it, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets,
size_t num_columns_to_skip)
{
typename Map::const_iterator it = map.find(key);
if (it != map.end())
{
it->second.setUsed();
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->getByPosition(num_columns_to_skip + j).column.get(), it->second.row_num);
}
else
static void addNotFound(size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets)
{
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertDefault();
}
}
};
template <typename Map>
struct Adder<ASTTableJoin::Kind::Inner, ASTTableJoin::Strictness::Any, Map>
{
static void add(const Map & map, const typename Map::key_type & key, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
static void addFound(const typename Map::const_iterator & it, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets,
size_t num_columns_to_skip)
{
typename Map::const_iterator it = map.find(key);
if (it != map.end())
{
(*filter)[i] = 1;
it->second.setUsed();
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->getByPosition(num_columns_to_skip + j).column.get(), it->second.row_num);
}
else
static void addNotFound(size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets)
{
(*filter)[i] = 0;
}
};
@ -520,16 +545,11 @@ struct Adder<ASTTableJoin::Kind::Inner, ASTTableJoin::Strictness::Any, Map>
template <ASTTableJoin::Kind KIND, typename Map>
struct Adder<KIND, ASTTableJoin::Strictness::All, Map>
{
static void add(const Map & map, const typename Map::key_type & key, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
static void addFound(const typename Map::const_iterator & it, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets,
size_t num_columns_to_skip)
{
typename Map::const_iterator it = map.find(key);
if (it != map.end())
{
size_t rows_joined = 0;
it->second.setUsed();
for (auto current = &static_cast<const typename Map::mapped_type::Base_t &>(it->second); current != nullptr; current = current->next)
{
for (size_t j = 0; j < num_columns_to_add; ++j)
@ -541,7 +561,9 @@ struct Adder<KIND, ASTTableJoin::Strictness::All, Map>
current_offset += rows_joined;
(*offsets)[i] = current_offset;
}
else
static void addNotFound(size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets)
{
if (KIND == ASTTableJoin::Kind::Inner)
{
@ -556,28 +578,59 @@ struct Adder<KIND, ASTTableJoin::Strictness::All, Map>
added_columns[j]->insertDefault();
}
}
}
};
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
void NO_INLINE Join::joinBlockImplType(
Block & block, const Map & map, size_t rows, const ConstColumnPlainPtrs & key_columns, size_t keys_size,
size_t num_columns_to_add, size_t num_columns_to_skip, ColumnPlainPtrs & added_columns,
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
void NO_INLINE joinBlockImplTypeCase(
Block & block, const Map & map, size_t rows, const ConstColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes,
size_t num_columns_to_add, size_t num_columns_to_skip, ColumnPlainPtrs & added_columns, ConstNullMapPtr null_map,
std::unique_ptr<IColumn::Filter> & filter,
IColumn::Offset_t & current_offset, std::unique_ptr<IColumn::Offsets_t> & offsets_to_replicate) const
IColumn::Offset_t & current_offset, std::unique_ptr<IColumn::Offsets_t> & offsets_to_replicate)
{
KeyGetter key_getter(key_columns);
for (size_t i = 0; i < rows; ++i)
{
if (has_null_map && (*null_map)[i])
{
Adder<KIND, STRICTNESS, Map>::addNotFound(
num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
}
else
{
auto key = key_getter.getKey(key_columns, keys_size, i, key_sizes);
typename Map::const_iterator it = map.find(key);
Adder<KIND, STRICTNESS, Map>::add(
map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get(), num_columns_to_skip);
if (it != map.end())
{
it->second.setUsed();
Adder<KIND, STRICTNESS, Map>::addFound(
it, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get(), num_columns_to_skip);
}
else
Adder<KIND, STRICTNESS, Map>::addNotFound(
num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
}
}
}
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
void joinBlockImplType(
Block & block, const Map & map, size_t rows, const ConstColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes,
size_t num_columns_to_add, size_t num_columns_to_skip, ColumnPlainPtrs & added_columns, ConstNullMapPtr null_map,
std::unique_ptr<IColumn::Filter> & filter,
IColumn::Offset_t & current_offset, std::unique_ptr<IColumn::Offsets_t> & offsets_to_replicate)
{
if (null_map)
joinBlockImplTypeCase<KIND, STRICTNESS, KeyGetter, Map, true>(
block, map, rows, key_columns, keys_size, key_sizes, num_columns_to_add, num_columns_to_skip,
added_columns, null_map, filter, current_offset, offsets_to_replicate);
else
joinBlockImplTypeCase<KIND, STRICTNESS, KeyGetter, Map, false>(
block, map, rows, key_columns, keys_size, key_sizes, num_columns_to_add, num_columns_to_skip,
added_columns, null_map, filter, current_offset, offsets_to_replicate);
}
}
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
@ -586,10 +639,10 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
size_t keys_size = key_names_left.size();
ConstColumnPlainPtrs key_columns(keys_size);
/// Редкий случай, когда ключи являются константами. Чтобы не поддерживать отдельный код, материализуем их.
/// Rare case, when keys are constant. To avoid code bloat, simply materialize them.
Columns materialized_columns;
/// Запоминаем столбцы ключей, с которыми будем работать
/// Memoize key columns to work.
for (size_t i = 0; i < keys_size; ++i)
{
key_columns[i] = block.getByName(key_names_left[i]).column.get();
@ -601,6 +654,11 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
}
}
/// Keys with NULL value in any column won't join to anything.
ColumnPtr null_map_holder;
ConstNullMapPtr null_map{};
extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map);
size_t existing_columns = block.columns();
/** Если используется FULL или RIGHT JOIN, то столбцы из "левой" части надо материализовать.
@ -660,9 +718,9 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
{
#define M(TYPE) \
case Join::Type::TYPE: \
Join::joinBlockImplType<KIND, STRICTNESS, typename KeyGetterForType<Join::Type::TYPE>::Type>(\
block, *maps.TYPE, rows, key_columns, keys_size, \
num_columns_to_add, num_columns_to_skip, added_columns, \
joinBlockImplType<KIND, STRICTNESS, typename KeyGetterForType<Join::Type::TYPE>::Type>(\
block, *maps.TYPE, rows, key_columns, keys_size, key_sizes, \
num_columns_to_add, num_columns_to_skip, added_columns, null_map, \
filter, current_offset, offsets_to_replicate); \
break;
APPLY_FOR_JOIN_VARIANTS(M)

View File

@ -18,6 +18,7 @@
#include <DB/Interpreters/Set.h>
#include <DB/Interpreters/convertFieldToType.h>
#include <DB/Interpreters/evaluateConstantExpression.h>
#include <DB/Interpreters/NullableUtils.h>
#include <DB/Storages/MergeTree/PKCondition.h>