dbms: FULL and RIGHT JOIN: development [#METR-15418].

This commit is contained in:
Alexey Milovidov 2015-07-22 23:54:42 +03:00
parent d660d987ce
commit 0f954021cb

View File

@ -4,6 +4,10 @@
#include <DB/Parsers/ASTJoin.h>
#include <DB/Interpreters/Join.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Core/ColumnNumbers.h>
/*#include <DB/DataStreams/TabSeparatedBlockOutputStream.h>
*#include <DB/IO/WriteBufferFromFileDescriptor.h>*/
namespace DB
@ -389,9 +393,13 @@ bool Join::insertFromBlock(const Block & block)
blocks.push_back(block);
Block * stored_block = &blocks.back();
/// Удаляем из stored_block ключевые столбцы, так как они не нужны.
for (const auto & name : key_names_right)
stored_block->erase(stored_block->getPositionByName(name));
if (!getFullness(kind))
{
/// Удаляем из stored_block ключевые столбцы, так как они не нужны.
for (const auto & name : key_names_right)
stored_block->erase(stored_block->getPositionByName(name));
}
/// TODO Переупорядочивать, класть ключи в начало.
/// Редкий случай, когда соединяемые столбцы являются константами. Чтобы не поддерживать отдельный код, материализуем их.
for (size_t i = 0, size = stored_block->columns(); i < size; ++i)
@ -443,7 +451,8 @@ template <typename Map>
struct Adder<ASTJoin::Left, ASTJoin::Any, Map>
{
static void add(const Map & map, const typename Map::key_type & key, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets)
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets,
size_t num_columns_to_skip)
{
typename Map::const_iterator it = map.find(key);
@ -451,7 +460,7 @@ struct Adder<ASTJoin::Left, ASTJoin::Any, Map>
{
it->second.setUsed();
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(j).column.get(), it->second.row_num);
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(num_columns_to_skip + j).column.get(), it->second.row_num);
}
else
{
@ -465,7 +474,8 @@ template <typename Map>
struct Adder<ASTJoin::Inner, ASTJoin::Any, Map>
{
static void add(const Map & map, const typename Map::key_type & key, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets)
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets,
size_t num_columns_to_skip)
{
typename Map::const_iterator it = map.find(key);
@ -475,7 +485,7 @@ struct Adder<ASTJoin::Inner, ASTJoin::Any, Map>
it->second.setUsed();
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(j).column.get(), it->second.row_num);
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(num_columns_to_skip + j).column.get(), it->second.row_num);
}
else
(*filter)[i] = 0;
@ -486,7 +496,8 @@ template <ASTJoin::Kind KIND, typename Map>
struct Adder<KIND, ASTJoin::All, Map>
{
static void add(const Map & map, const typename Map::key_type & key, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets)
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets,
size_t num_columns_to_skip)
{
typename Map::const_iterator it = map.find(key);
@ -497,7 +508,7 @@ struct Adder<KIND, ASTJoin::All, Map>
for (auto current = &static_cast<const typename Map::mapped_type::Base_t &>(it->second); current != nullptr; current = current->next)
{
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*current->block->unsafeGetByPosition(j).column.get(), current->row_num);
added_columns[j]->insertFrom(*current->block->unsafeGetByPosition(num_columns_to_skip + j).column.get(), current->row_num);
++rows_joined;
}
@ -575,6 +586,14 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
if (strictness == ASTJoin::All)
offsets_to_replicate.reset(new IColumn::Offsets_t(rows));
/** Для LEFT/INNER JOIN, сохранённые блоки не содержат ключи.
* Для FULL/RIGHT JOIN, сохранённые блоки содержат ключи;
* но они не будут использоваться на этой стадии соединения (а будут в AdderNonJoined), и их нужно пропустить.
*/
size_t num_columns_to_skip = 0;
if (getFullness(kind))
num_columns_to_skip = keys_size;
if (type == Type::KEY_64)
{
typedef typename Maps::MapUInt64 Map;
@ -586,7 +605,8 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
{
/// Строим ключ
UInt64 key = column.get64(i);
Adder<KIND, STRICTNESS, Map>::add(map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
Adder<KIND, STRICTNESS, Map>::add(
map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get(), num_columns_to_skip);
}
}
else if (type == Type::KEY_STRING)
@ -605,7 +625,8 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
{
/// Строим ключ
StringRef key(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1);
Adder<KIND, STRICTNESS, Map>::add(map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
Adder<KIND, STRICTNESS, Map>::add(
map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get(), num_columns_to_skip);
}
}
else if (const ColumnFixedString * column_string = typeid_cast<const ColumnFixedString *>(&column))
@ -618,7 +639,8 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
{
/// Строим ключ
StringRef key(&data[i * n], n);
Adder<KIND, STRICTNESS, Map>::add(map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
Adder<KIND, STRICTNESS, Map>::add(
map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get(), num_columns_to_skip);
}
}
else
@ -636,7 +658,8 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
? packFixed<UInt128>(i, keys_size, key_columns, key_sizes)
: hash128(i, keys_size, key_columns);
Adder<KIND, STRICTNESS, Map>::add(map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
Adder<KIND, STRICTNESS, Map>::add(
map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get(), num_columns_to_skip);
}
}
else
@ -759,9 +782,45 @@ struct AdderNonJoined<ASTJoin::All, Mapped>
class NonJoinedBlockInputStream : public IProfilingBlockInputStream
{
public:
NonJoinedBlockInputStream(const Join & parent_, Block & left_sample_block_, size_t max_block_size_)
: parent(parent_), left_sample_block(left_sample_block_), max_block_size(max_block_size_)
NonJoinedBlockInputStream(const Join & parent_, Block & left_sample_block, size_t max_block_size_)
: parent(parent_), max_block_size(max_block_size_)
{
/** left_sample_block содержит ключи и "левые" столбцы.
* result_sample_block - ключи, "левые" столбцы и "правые" столбцы.
*/
size_t num_keys = parent.key_names_left.size();
size_t num_columns_left = left_sample_block.columns() - num_keys;
size_t num_columns_right = parent.sample_block_with_columns_to_add.columns();
result_sample_block = left_sample_block;
/// Добавляем в блок новые столбцы.
for (size_t i = 0; i < num_columns_right; ++i)
{
const ColumnWithTypeAndName & src_column = parent.sample_block_with_columns_to_add.getByPosition(i);
ColumnWithTypeAndName new_column = src_column.cloneEmpty();
result_sample_block.insert(new_column);
}
column_numbers_left.reserve(num_columns_left);
column_numbers_keys_and_right.reserve(num_keys + num_columns_right);
for (size_t i = 0; i < num_keys + num_columns_left; ++i)
{
const String & name = left_sample_block.getByPosition(i).name;
if (parent.key_names_left.end() == std::find(parent.key_names_left.begin(), parent.key_names_left.end(), name))
column_numbers_left.push_back(i);
else
column_numbers_keys_and_right.push_back(i);
}
for (size_t i = 0; i < num_columns_right; ++i)
column_numbers_keys_and_right.push_back(num_keys + num_columns_left + i);
columns_left.resize(num_columns_left);
columns_keys_and_right.resize(num_keys + num_columns_right);
}
String getName() const override { return "NonJoined"; }
@ -790,56 +849,59 @@ protected:
private:
const Join & parent;
Block left_sample_block;
size_t max_block_size;
Block result_sample_block;
ColumnNumbers column_numbers_left;
ColumnNumbers column_numbers_keys_and_right;
ColumnPlainPtrs columns_left;
ColumnPlainPtrs columns_keys_and_right;
std::unique_ptr<void, std::function<void(void *)>> position; /// type erasure
template <ASTJoin::Strictness STRICTNESS, typename Maps>
Block createBlock(const Maps & maps)
{
Block block = left_sample_block.cloneEmpty();
Block block = result_sample_block.cloneEmpty();
size_t num_columns_left = left_sample_block.columns();
ColumnPlainPtrs columns_left(num_columns_left);
size_t num_columns_left = column_numbers_left.size();
size_t num_columns_right = column_numbers_keys_and_right.size();
for (size_t i = 0; i < num_columns_left; ++i)
{
auto & column_with_name_and_type = block.getByPosition(i);
auto & column_with_name_and_type = block.getByPosition(column_numbers_left[i]);
column_with_name_and_type.column = column_with_name_and_type.type->createColumn();
columns_left[i] = column_with_name_and_type.column.get();
}
/// Добавляем в блок новые столбцы.
size_t num_columns_right = parent.sample_block_with_columns_to_add.columns();
ColumnPlainPtrs columns_right(num_columns_right);
for (size_t i = 0; i < num_columns_right; ++i)
{
const ColumnWithTypeAndName & src_column = parent.sample_block_with_columns_to_add.getByPosition(i);
ColumnWithTypeAndName new_column = src_column.cloneEmpty();
block.insert(new_column);
columns_right[i] = new_column.column;
columns_right[i]->reserve(src_column.column->size());
auto & column_with_name_and_type = block.getByPosition(column_numbers_keys_and_right[i]);
column_with_name_and_type.column = column_with_name_and_type.type->createColumn();
columns_keys_and_right[i] = column_with_name_and_type.column.get();
columns_keys_and_right[i]->reserve(column_with_name_and_type.column->size());
}
size_t rows_added = 0;
if (parent.type == Join::Type::KEY_64)
rows_added = fillColumns<STRICTNESS>(*maps.key64, num_columns_left, columns_left, num_columns_right, columns_right);
rows_added = fillColumns<STRICTNESS>(*maps.key64, num_columns_left, columns_left, num_columns_right, columns_keys_and_right);
else if (parent.type == Join::Type::KEY_STRING)
rows_added = fillColumns<STRICTNESS>(*maps.key_string, num_columns_left, columns_left, num_columns_right, columns_right);
rows_added = fillColumns<STRICTNESS>(*maps.key_string, num_columns_left, columns_left, num_columns_right, columns_keys_and_right);
else if (parent.type == Join::Type::HASHED)
rows_added = fillColumns<STRICTNESS>(*maps.hashed, num_columns_left, columns_left, num_columns_right, columns_right);
rows_added = fillColumns<STRICTNESS>(*maps.hashed, num_columns_left, columns_left, num_columns_right, columns_keys_and_right);
else
throw Exception("Unknown JOIN variant.", ErrorCodes::UNKNOWN_SET_DATA_VARIANT);
std::cerr << "rows added: " << rows_added << "\n";
// std::cerr << "rows added: " << rows_added << "\n";
if (!rows_added)
return Block();
std::cerr << block.dumpStructure() << "\n";
/* std::cerr << block.dumpStructure() << "\n";
WriteBufferFromFileDescriptor wb(STDERR_FILENO);
TabSeparatedBlockOutputStream out(wb);
out.write(block);*/
return block;
}
@ -862,7 +924,7 @@ private:
for (; it != end; ++it)
{
std::cerr << it->second.getUsed() << "\n";
// std::cerr << it->second.getUsed() << "\n";
if (it->second.getUsed())
continue;