dbms: JOINs: development [#METR-11370].

This commit is contained in:
Alexey Milovidov 2014-06-19 22:15:46 +04:00
parent efbd0de1ee
commit b970c85546
2 changed files with 111 additions and 65 deletions

View File

@ -164,8 +164,8 @@ private:
template <ASTJoin::Strictness STRICTNESS, typename Maps>
void insertFromBlockImpl(Maps & maps, size_t rows, const ConstColumnPlainPtrs & key_columns, size_t keys_size, Block * stored_block);
template <ASTJoin::Kind KIND>
void anyJoinBlock(Block & block);
template <ASTJoin::Kind KIND, ASTJoin::Strictness STRICTNESS, typename Maps>
void joinBlockImpl(Block & block, Maps & maps);
/// Проверить не превышены ли допустимые размеры множества
bool checkSizeLimits() const;

View File

@ -350,67 +350,97 @@ bool Join::insertFromBlock(const Block & block)
}
template <typename Map>
static void addAnyLeftRow(
const Map & map,
const typename Map::key_type & key,
size_t num_columns_to_add,
ColumnPlainPtrs & added_columns)
template <ASTJoin::Kind KIND, ASTJoin::Strictness STRICTNESS, typename Map>
struct Adder
{
typename Map::const_iterator it = map.find(key);
if (it != map.end())
{
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(j).column.get(), it->second.row_num);
}
else
{
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertDefault();
}
}
static void add(const Map & map, const typename Map::key_type & key, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets);
};
template <typename Map>
static void addAnyInnerRow(
const Map & map,
const typename Map::key_type & key,
size_t num_columns_to_add,
ColumnPlainPtrs & added_columns,
size_t i,
IColumn::Filter & filter)
struct Adder<ASTJoin::Left, ASTJoin::Any, Map>
{
typename Map::const_iterator it = map.find(key);
if (it != map.end())
static void add(const Map & map, const typename Map::key_type & key, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets)
{
filter[i] = 1;
typename Map::const_iterator it = map.find(key);
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(j).column.get(), it->second.row_num);
if (it != map.end())
{
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(j).column.get(), it->second.row_num);
}
else
{
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertDefault();
}
}
else
filter[i] = 0;
}
};
template <typename Map>
struct Adder<ASTJoin::Inner, ASTJoin::Any, Map>
{
static void add(const Map & map, const typename Map::key_type & key, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets)
{
typename Map::const_iterator it = map.find(key);
if (it != map.end())
{
(*filter)[i] = 1;
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(j).column.get(), it->second.row_num);
}
else
(*filter)[i] = 0;
}
};
template <ASTJoin::Kind KIND, typename Map>
static void addAnyRow(
const Map & map,
const typename Map::key_type & key,
size_t num_columns_to_add,
ColumnPlainPtrs & added_columns,
size_t i,
IColumn::Filter * filter)
struct Adder<KIND, ASTJoin::All, Map>
{
if (KIND == ASTJoin::Left)
addAnyLeftRow(map, key, num_columns_to_add, added_columns);
else
addAnyInnerRow(map, key, num_columns_to_add, added_columns, i, *filter);
}
static void add(const Map & map, const typename Map::key_type & key, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets)
{
typename Map::const_iterator it = map.find(key);
if (it != map.end())
{
size_t rows_joined = 0;
for (const Join::RowRefList * current = &it->second; current != nullptr; current = current->next)
{
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*current->block->unsafeGetByPosition(j).column.get(), current->row_num);
++rows_joined;
}
current_offset += rows_joined;
(*offsets)[i] = current_offset;
}
else
{
if (KIND == ASTJoin::Inner)
{
(*offsets)[i] = current_offset;
}
else
{
++current_offset;
(*offsets)[i] = current_offset;
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertDefault();
}
}
}
};
template <ASTJoin::Kind KIND>
void Join::anyJoinBlock(Block & block)
template <ASTJoin::Kind KIND, ASTJoin::Strictness STRICTNESS, typename Maps>
void Join::joinBlockImpl(Block & block, Maps & maps)
{
if (blocks.empty())
throw Exception("Attempt to JOIN with empty table", ErrorCodes::EMPTY_DATA_PASSED);
@ -451,10 +481,17 @@ void Join::anyJoinBlock(Block & block)
if (kind == ASTJoin::Inner && strictness == ASTJoin::Any)
filter.reset(new IColumn::Filter(rows));
/// Используется при ALL ... JOIN
IColumn::Offset_t current_offset = 0;
std::unique_ptr<IColumn::Offsets_t> offsets_to_replicate;
if (strictness == ASTJoin::All)
offsets_to_replicate.reset(new IColumn::Offsets_t(rows));
if (type == Set::KEY_64)
{
typedef MapsAny::MapUInt64 Map;
const Map & map = *maps_any.key64;
typedef typename Maps::MapUInt64 Map;
const Map & map = *maps.key64;
const IColumn & column = *key_columns[0];
/// Для всех строчек
@ -462,13 +499,13 @@ void Join::anyJoinBlock(Block & block)
{
/// Строим ключ
UInt64 key = column.get64(i);
addAnyRow<KIND, Map>(map, key, num_columns_to_add, added_columns, i, filter.get());
Adder<KIND, STRICTNESS, Map>::add(map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
}
}
else if (type == Set::KEY_STRING)
{
typedef MapsAny::MapString Map;
const Map & map = *maps_any.key_string;
typedef typename Maps::MapString Map;
const Map & map = *maps.key_string;
const IColumn & column = *key_columns[0];
if (const ColumnString * column_string = dynamic_cast<const ColumnString *>(&column))
@ -481,7 +518,7 @@ void Join::anyJoinBlock(Block & block)
{
/// Строим ключ
StringRef key(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1);
addAnyRow<KIND, Map>(map, key, num_columns_to_add, added_columns, i, filter.get());
Adder<KIND, STRICTNESS, Map>::add(map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
}
}
else if (const ColumnFixedString * column_string = dynamic_cast<const ColumnFixedString *>(&column))
@ -494,7 +531,7 @@ void Join::anyJoinBlock(Block & block)
{
/// Строим ключ
StringRef key(&data[i * n], n);
addAnyRow<KIND, Map>(map, key, num_columns_to_add, added_columns, i, filter.get());
Adder<KIND, STRICTNESS, Map>::add(map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
}
}
else
@ -502,8 +539,8 @@ void Join::anyJoinBlock(Block & block)
}
else if (type == Set::HASHED)
{
typedef MapsAny::MapHashed Map;
Map & map = *maps_any.hashed;
typedef typename Maps::MapHashed Map;
Map & map = *maps.hashed;
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
@ -512,7 +549,7 @@ void Join::anyJoinBlock(Block & block)
? pack128(i, keys_size, key_columns, key_sizes)
: hash128(i, keys_size, key_columns);
addAnyRow<KIND, Map>(map, key, num_columns_to_add, added_columns, i, filter.get());
Adder<KIND, STRICTNESS, Map>::add(map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
}
}
else
@ -522,15 +559,24 @@ void Join::anyJoinBlock(Block & block)
if (kind == ASTJoin::Inner && strictness == ASTJoin::Any)
for (size_t i = 0; i < existing_columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(*filter);
/// Если ALL ... JOIN - размножаем все столбцы кроме новых.
if (strictness == ASTJoin::All)
for (size_t i = 0; i < existing_columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->replicate(*offsets_to_replicate);
}
void Join::joinBlock(Block & block)
{
if (kind == ASTJoin::Left)
anyJoinBlock<ASTJoin::Left>(block);
else
anyJoinBlock<ASTJoin::Inner>(block);
if (kind == ASTJoin::Left && strictness == ASTJoin::Any)
joinBlockImpl<ASTJoin::Left, ASTJoin::Any, MapsAny>(block, maps_any);
else if (kind == ASTJoin::Inner && strictness == ASTJoin::Any)
joinBlockImpl<ASTJoin::Inner, ASTJoin::Any, MapsAny>(block, maps_any);
else if (kind == ASTJoin::Left && strictness == ASTJoin::All)
joinBlockImpl<ASTJoin::Left, ASTJoin::All, MapsAll>(block, maps_all);
else if (kind == ASTJoin::Inner && strictness == ASTJoin::All)
joinBlockImpl<ASTJoin::Inner, ASTJoin::All, MapsAll>(block, maps_all);
}