Join.cpp refactoring

This commit is contained in:
chertus 2019-03-19 19:53:36 +03:00
parent 1446c50884
commit e5a9633132
2 changed files with 123 additions and 173 deletions

View File

@ -120,56 +120,6 @@ Join::Type Join::chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_siz
}
template <typename Maps>
static void initImpl(Maps & maps, Join::Type type)
{
switch (type)
{
case Join::Type::EMPTY: break;
case Join::Type::CROSS: break;
#define M(TYPE) \
case Join::Type::TYPE: maps.TYPE = std::make_unique<typename decltype(maps.TYPE)::element_type>(); break;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
}
template <typename Maps>
static size_t getTotalRowCountImpl(const Maps & maps, Join::Type type)
{
switch (type)
{
case Join::Type::EMPTY: return 0;
case Join::Type::CROSS: return 0;
#define M(NAME) \
case Join::Type::NAME: return maps.NAME ? maps.NAME->size() : 0;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
template <typename Maps>
static size_t getTotalByteCountImpl(const Maps & maps, Join::Type type)
{
switch (type)
{
case Join::Type::EMPTY: return 0;
case Join::Type::CROSS: return 0;
#define M(NAME) \
case Join::Type::NAME: return maps.NAME ? maps.NAME->getBufferSizeInBytes() : 0;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
template <Join::Type type, typename Value, typename Mapped>
struct KeyGetterForTypeImpl;
@ -227,7 +177,7 @@ void Join::init(Type type_)
if (kind == ASTTableJoin::Kind::Cross)
return;
dispatch(MapInitTag());
dispatch([&](auto, auto, auto & map) { initImpl(map, type); });
dispatch([&](auto, auto, auto & map) { map.create(type); });
}
size_t Join::getTotalRowCount() const
@ -241,7 +191,7 @@ size_t Join::getTotalRowCount() const
}
else
{
dispatch([&](auto, auto, auto & map) { res += getTotalRowCountImpl(map, type); });
dispatch([&](auto, auto, auto & map) { res += map.getTotalRowCount(type); });
}
return res;
@ -258,7 +208,7 @@ size_t Join::getTotalByteCount() const
}
else
{
dispatch([&](auto, auto, auto & map) { res += getTotalByteCountImpl(map, type); });
dispatch([&](auto, auto, auto & map) { res += map.getTotalByteCountImpl(type); });
res += pool.size();
}
@ -526,61 +476,20 @@ bool Join::insertFromBlock(const Block & block)
namespace
{
template <bool fill_left, ASTTableJoin::Strictness STRICTNESS, typename Map>
struct Adder;
template <typename Map>
struct Adder<true, ASTTableJoin::Strictness::Any, Map>
template <ASTTableJoin::Strictness STRICTNESS, typename Map>
void addFoundRow(const typename Map::mapped_type & mapped, const std::vector<size_t> & right_indexes,
MutableColumns & added_columns, IColumn::Offset & current_offset [[maybe_unused]])
{
static void addFound(const typename Map::mapped_type & mapped, size_t num_columns_to_add, MutableColumns & added_columns,
size_t i, IColumn::Filter & filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/,
const std::vector<size_t> & right_indexes)
{
filter[i] = 1;
size_t num_columns_to_add = right_indexes.size();
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any)
{
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*mapped.block->getByPosition(right_indexes[j]).column, mapped.row_num);
}
static void addNotFound(size_t num_columns_to_add, MutableColumns & added_columns,
size_t i, IColumn::Filter & filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/)
if constexpr (STRICTNESS == ASTTableJoin::Strictness::All)
{
filter[i] = 0;
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertDefault();
}
};
template <typename Map>
struct Adder<false, ASTTableJoin::Strictness::Any, Map>
{
static void addFound(const typename Map::mapped_type & mapped, size_t num_columns_to_add, MutableColumns & added_columns,
size_t i, IColumn::Filter & filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/,
const std::vector<size_t> & right_indexes)
{
filter[i] = 1;
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*mapped.block->getByPosition(right_indexes[j]).column, mapped.row_num);
}
static void addNotFound(size_t /*num_columns_to_add*/, MutableColumns & /*added_columns*/,
size_t i, IColumn::Filter & filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/)
{
filter[i] = 0;
}
};
template <bool fill_left, typename Map>
struct Adder<fill_left, ASTTableJoin::Strictness::All, Map>
{
static void addFound(const typename Map::mapped_type & mapped, size_t num_columns_to_add, MutableColumns & added_columns,
size_t i, IColumn::Filter & filter, IColumn::Offset & current_offset, IColumn::Offsets * offsets,
const std::vector<size_t> & right_indexes)
{
filter[i] = 1;
size_t rows_joined = 0;
for (auto current = &static_cast<const typename Map::mapped_type::Base_t &>(mapped); current != nullptr; current = current->next)
{
@ -591,48 +500,42 @@ namespace
}
current_offset += rows_joined;
(*offsets)[i] = current_offset;
}
static void addNotFound(size_t num_columns_to_add, MutableColumns & added_columns,
size_t i, IColumn::Filter & filter, IColumn::Offset & current_offset, IColumn::Offsets * offsets)
{
filter[i] = 0;
if (!fill_left)
{
(*offsets)[i] = current_offset;
}
else
{
++current_offset;
(*offsets)[i] = current_offset;
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertDefault();
}
}
};
template <bool fill_left>
void addNotFoundRow(const std::vector<size_t> & right_indexes [[maybe_unused]], MutableColumns & added_columns [[maybe_unused]],
IColumn::Offset & current_offset [[maybe_unused]])
{
if constexpr (fill_left)
{
++current_offset;
for (size_t j = 0; j < right_indexes.size(); ++j)
added_columns[j]->insertDefault();
}
}
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
void NO_INLINE joinBlockImplTypeCase(
const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexColumns(
const Map & map, size_t rows, KeyGetter & key_getter,
MutableColumns & added_columns, ConstNullMapPtr null_map, IColumn::Filter & filter,
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate,
const std::vector<size_t> & right_indexes)
{
IColumn::Offset current_offset = 0;
size_t num_columns_to_add = right_indexes.size();
constexpr bool fill_left = Join::KindTrait<KIND>::fill_left;
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::All)
offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows);
IColumn::Offset current_offset = 0;
Arena pool;
KeyGetter key_getter(key_columns, key_sizes, nullptr);
for (size_t i = 0; i < rows; ++i)
{
if (has_null_map && (*null_map)[i])
{
Adder<Join::KindTrait<KIND>::fill_left, STRICTNESS, Map>::addNotFound(
num_columns_to_add, added_columns, i, filter, current_offset, offsets_to_replicate.get());
addNotFoundRow<fill_left>(right_indexes, added_columns, current_offset);
}
else
{
@ -640,43 +543,60 @@ namespace
if (find_result.isFound())
{
filter[i] = 1;
auto & mapped = find_result.getMapped();
mapped.setUsed();
Adder<Join::KindTrait<KIND>::fill_left, STRICTNESS, Map>::addFound(
mapped, num_columns_to_add, added_columns, i, filter, current_offset, offsets_to_replicate.get(), right_indexes);
addFoundRow<STRICTNESS, Map>(mapped, right_indexes, added_columns, current_offset);
}
else
Adder<Join::KindTrait<KIND>::fill_left, STRICTNESS, Map>::addNotFound(
num_columns_to_add, added_columns, i, filter, current_offset, offsets_to_replicate.get());
addNotFoundRow<fill_left>(right_indexes, added_columns, current_offset);
}
if constexpr (STRICTNESS == ASTTableJoin::Strictness::All)
(*offsets_to_replicate)[i] = current_offset;
}
return offsets_to_replicate;
}
using BlockFilterData = std::pair<
std::unique_ptr<IColumn::Filter>,
std::unique_ptr<IColumn::Offsets>>;
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
BlockFilterData joinBlockImplType(
IColumn::Filter joinRightIndex(
const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
MutableColumns & added_columns, ConstNullMapPtr null_map, const std::vector<size_t> & right_indexes)
MutableColumns & added_columns, ConstNullMapPtr null_map, const std::vector<size_t> & right_indexes,
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate)
{
std::unique_ptr<IColumn::Filter> filter = std::make_unique<IColumn::Filter>(rows);
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
if (STRICTNESS == ASTTableJoin::Strictness::All)
offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows);
IColumn::Filter filter(rows, 0);
KeyGetter key_getter(key_columns, key_sizes, nullptr);
if (null_map)
joinBlockImplTypeCase<KIND, STRICTNESS, KeyGetter, Map, true>(
map, rows, key_columns, key_sizes, added_columns, null_map, *filter,
offsets_to_replicate, right_indexes);
offsets_to_replicate = joinRightIndexColumns<KIND, STRICTNESS, KeyGetter, Map, true>(
map, rows, key_getter, added_columns, null_map, filter, right_indexes);
else
joinBlockImplTypeCase<KIND, STRICTNESS, KeyGetter, Map, false>(
map, rows, key_columns, key_sizes, added_columns, null_map, *filter,
offsets_to_replicate, right_indexes);
offsets_to_replicate = joinRightIndexColumns<KIND, STRICTNESS, KeyGetter, Map, false>(
map, rows, key_getter, added_columns, null_map, filter, right_indexes);
return {std::move(filter), std::move(offsets_to_replicate)};
return filter;
}
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
IColumn::Filter switchJoinRightIndex(
Join::Type type,
const Maps & maps_, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
MutableColumns & added_columns, ConstNullMapPtr null_map, const std::vector<size_t> & right_indexes,
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate)
{
switch (type)
{
#define M(TYPE) \
case Join::Type::TYPE: \
return joinRightIndex<KIND, STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, const std::remove_reference_t<decltype(*maps_.TYPE)>>::Type>(\
*maps_.TYPE, rows, key_columns, key_sizes, added_columns, null_map, right_indexes, offsets_to_replicate);
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
default:
throw Exception("Unknown JOIN keys variant.", ErrorCodes::UNKNOWN_SET_DATA_VARIANT);
}
}
}
@ -762,31 +682,15 @@ void Join::joinBlockImpl(
}
}
std::unique_ptr<IColumn::Filter> filter;
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
switch (type)
{
#define M(TYPE) \
case Join::Type::TYPE: \
std::tie(filter, offsets_to_replicate) = \
joinBlockImplType<KIND, STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, const std::remove_reference_t<decltype(*maps_.TYPE)>>::Type>(\
*maps_.TYPE, block.rows(), key_columns, key_sizes, added_columns, null_map, right_indexes); \
break;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
default:
throw Exception("Unknown JOIN keys variant.", ErrorCodes::UNKNOWN_SET_DATA_VARIANT);
}
IColumn::Filter filter = switchJoinRightIndex<KIND, STRICTNESS>(
type, maps_, block.rows(), key_columns, key_sizes, added_columns, null_map, right_indexes, offsets_to_replicate);
const auto added_columns_size = added_columns.size();
for (size_t i = 0; i < added_columns_size; ++i)
block.insert(ColumnWithTypeAndName(std::move(added_columns[i]), added_type_name[i].first, added_type_name[i].second));
if (!filter)
throw Exception("No data to filter columns", ErrorCodes::LOGICAL_ERROR);
NameSet needed_key_names_right = requiredRightKeys(key_names_right, columns_added_by_join);
if (strictness == ASTTableJoin::Strictness::Any)
@ -795,7 +699,7 @@ void Join::joinBlockImpl(
{
/// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones.
for (size_t i = 0; i < existing_columns; ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1);
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, -1);
/// Add join key columns from right block if they has different name.
for (size_t i = 0; i < key_names_right.size(); ++i)
@ -824,9 +728,9 @@ void Join::joinBlockImpl(
auto & column = col.column;
MutableColumnPtr mut_column = column->cloneEmpty();
for (size_t col_no = 0; col_no < filter->size(); ++col_no)
for (size_t col_no = 0; col_no < filter.size(); ++col_no)
{
if ((*filter)[col_no])
if (filter[col_no])
mut_column->insertFrom(*column, col_no);
else
mut_column->insertDefault();
@ -859,7 +763,7 @@ void Join::joinBlockImpl(
{
if (size_t to_insert = (*offsets_to_replicate)[col_no] - last_offset)
{
if (!(*filter)[col_no])
if (!filter[col_no])
mut_column->insertDefault();
else
for (size_t dup = 0; dup < to_insert; ++dup)

View File

@ -228,6 +228,52 @@ public:
std::unique_ptr<HashMap<UInt128, Mapped, UInt128HashCRC32>> keys128;
std::unique_ptr<HashMap<UInt256, Mapped, UInt256HashCRC32>> keys256;
std::unique_ptr<HashMap<UInt128, Mapped, UInt128TrivialHash>> hashed;
void create(Type which)
{
switch (which)
{
case Type::EMPTY: break;
case Type::CROSS: break;
#define M(NAME) \
case Type::NAME: NAME = std::make_unique<typename decltype(NAME)::element_type>(); break;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
}
size_t getTotalRowCount(Type which) const
{
switch (which)
{
case Type::EMPTY: return 0;
case Type::CROSS: return 0;
#define M(NAME) \
case Type::NAME: return NAME ? NAME->size() : 0;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
size_t getTotalByteCountImpl(Type which) const
{
switch (which)
{
case Type::EMPTY: return 0;
case Type::CROSS: return 0;
#define M(NAME) \
case Type::NAME: return NAME ? NAME->getBufferSizeInBytes() : 0;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
};
using MapsAny = MapsTemplate<WithFlags<false, false, RowRef>>;