ClickHouse/dbms/src/Interpreters/Join.cpp
2014-06-18 23:14:29 +04:00

424 lines
12 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnFixedString.h>
#include <DB/Parsers/ASTJoin.h>
#include <DB/Interpreters/Join.h>
namespace DB
{
size_t Join::getTotalRowCount() const
{
size_t rows = 0;
if (key64)
rows += key64->size();
if (key_string)
rows += key_string->size();
if (hashed)
rows += hashed->size();
return rows;
}
size_t Join::getTotalByteCount() const
{
size_t bytes = 0;
if (key64)
bytes += key64->getBufferSizeInBytes();
if (key_string)
bytes += key_string->getBufferSizeInBytes();
if (hashed)
bytes += hashed->getBufferSizeInBytes();
bytes += pool.size();
return bytes;
}
bool Join::checkSizeLimits() const
{
if (max_rows && getTotalRowCount() > max_rows)
return false;
if (max_bytes && getTotalByteCount() > max_bytes)
return false;
return true;
}
bool Join::checkExternalSizeLimits() const
{
if (max_rows_to_transfer && rows_in_external_table > max_rows_to_transfer)
return false;
if (max_bytes_to_transfer && bytes_in_external_table > max_bytes_to_transfer)
return false;
return true;
}
template <ASTJoin::Strictness STRICTNESS, typename Map>
struct Inserter
{
static void insert(Map & map, const typename Map::key_type & key, Block * stored_block, size_t i, Arena & pool);
};
template <typename Map>
struct Inserter<ASTJoin::Any, Map>
{
static void insert(Map & map, const typename Map::key_type & key, Block * stored_block, size_t i, Arena & pool)
{
typename Map::iterator it;
bool inserted;
map.emplace(key, it, inserted);
if (inserted)
new (&it->second) Join::RowRef(stored_block, i);
}
};
template <>
struct Inserter<ASTJoin::Any, Join::MapString>
{
static void insert(Join::MapString & map, const Join::MapString::key_type & key, Block * stored_block, size_t i, Arena & pool)
{
Join::MapString::iterator it;
bool inserted;
map.emplace(key, it, inserted);
if (inserted)
{
it->first.data = pool.insert(key.data, key.size);
new (&it->second) Join::RowRef(stored_block, i);
}
}
};
bool Join::insertFromBlock(const Block & block)
{
if (external_table)
{
BlockOutputStreamPtr output = external_table->write(ASTPtr());
output->write(block);
bytes_in_external_table += block.bytes();
rows_in_external_table += block.rows();
if (!checkExternalSizeLimits())
{
if (transfer_overflow_mode == OverflowMode::THROW)
throw Exception("JOIN external table size limit exceeded."
" Rows: " + toString(rows_in_external_table) +
", limit: " + toString(max_rows_to_transfer) +
". Bytes: " + toString(bytes_in_external_table) +
", limit: " + toString(max_bytes_to_transfer) + ".",
ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
if (transfer_overflow_mode == OverflowMode::BREAK)
return false;
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
if (only_external)
return true;
size_t keys_size = key_names.size();
ConstColumnPlainPtrs key_columns(keys_size);
/// Переводим имена столбцов в номера, если они ещё не вычислены.
if (key_numbers_right.empty())
for (const auto & name : key_names)
key_numbers_right.push_back(block.getPositionByName(name));
/// Запоминаем столбцы ключей, с которыми будем работать
for (size_t i = 0; i < keys_size; ++i)
key_columns[i] = block.getByPosition(key_numbers_right[i]).column;
size_t rows = block.rows();
/// Какую структуру данных для множества использовать?
keys_fit_128_bits = false;
if (empty())
init(Set::chooseMethod(key_columns, keys_fit_128_bits, key_sizes));
blocks.push_back(block);
Block * stored_block = &blocks.back();
/// Удаляем из stored_block ключевые столбцы, так как они не нужны.
for (const auto & name : key_names)
stored_block->erase(stored_block->getPositionByName(name));
if (type == Set::KEY_64)
{
typedef MapUInt64 Map;
Map & res = *key64;
const IColumn & column = *key_columns[0];
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
UInt64 key = column.get64(i);
Inserter<ASTJoin::Any, Map>::insert(res, key, stored_block, i, pool);
}
}
else if (type == Set::KEY_STRING)
{
typedef MapString Map;
Map & res = *key_string;
const IColumn & column = *key_columns[0];
if (const ColumnString * column_string = dynamic_cast<const ColumnString *>(&column))
{
const ColumnString::Offsets_t & offsets = column_string->getOffsets();
const ColumnString::Chars_t & data = column_string->getChars();
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
StringRef key(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1);
Inserter<ASTJoin::Any, Map>::insert(res, key, stored_block, i, pool);
}
}
else if (const ColumnFixedString * column_string = dynamic_cast<const ColumnFixedString *>(&column))
{
size_t n = column_string->getN();
const ColumnFixedString::Chars_t & data = column_string->getChars();
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
StringRef key(&data[i * n], n);
Inserter<ASTJoin::Any, Map>::insert(res, key, stored_block, i, pool);
}
}
else
throw Exception("Illegal type of column when creating set with string key: " + column.getName(), ErrorCodes::ILLEGAL_COLUMN);
}
else if (type == Set::HASHED)
{
typedef MapHashed Map;
Map & res = *hashed;
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
UInt128 key = keys_fit_128_bits
? pack128(i, keys_size, key_columns, key_sizes)
: hash128(i, keys_size, key_columns);
Inserter<ASTJoin::Any, Map>::insert(res, key, stored_block, i, pool);
}
}
else
throw Exception("Unknown JOIN variant.", ErrorCodes::UNKNOWN_SET_DATA_VARIANT);
if (!checkSizeLimits())
{
if (overflow_mode == OverflowMode::THROW)
throw Exception("Join size limit exceeded."
" Rows: " + toString(getTotalRowCount()) +
", limit: " + toString(max_rows) +
". Bytes: " + toString(getTotalByteCount()) +
", limit: " + toString(max_bytes) + ".",
ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
if (overflow_mode == OverflowMode::BREAK)
return false;
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
return true;
}
template <typename Map>
static void addAnyLeftRow(
const Map & map,
const typename Map::key_type & key,
size_t num_columns_to_add,
ColumnPlainPtrs & added_columns)
{
typename Map::const_iterator it = map.find(key);
if (it != map.end())
{
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(j).column.get(), it->second.row_num);
}
else
{
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertDefault();
}
}
template <typename Map>
static void addAnyInnerRow(
const Map & map,
const typename Map::key_type & key,
size_t num_columns_to_add,
ColumnPlainPtrs & added_columns,
size_t i,
IColumn::Filter & filter)
{
typename Map::const_iterator it = map.find(key);
if (it != map.end())
{
filter[i] = 1;
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(j).column.get(), it->second.row_num);
}
else
filter[i] = 0;
}
template <ASTJoin::Kind KIND, typename Map>
static void addAnyRow(
const Map & map,
const typename Map::key_type & key,
size_t num_columns_to_add,
ColumnPlainPtrs & added_columns,
size_t i,
IColumn::Filter * filter)
{
if (KIND == ASTJoin::Left)
addAnyLeftRow(map, key, num_columns_to_add, added_columns);
else
addAnyInnerRow(map, key, num_columns_to_add, added_columns, i, *filter);
}
template <ASTJoin::Kind KIND>
void Join::anyJoinBlock(Block & block)
{
if (blocks.empty())
throw Exception("Attempt to JOIN with empty table", ErrorCodes::EMPTY_DATA_PASSED);
size_t keys_size = key_names.size();
ConstColumnPlainPtrs key_columns(keys_size);
/// Переводим имена столбцов в номера, если они ещё не вычислены.
if (key_numbers_left.empty())
for (const auto & name : key_names)
key_numbers_left.push_back(block.getPositionByName(name));
/// Запоминаем столбцы ключей, с которыми будем работать
for (size_t i = 0; i < keys_size; ++i)
key_columns[i] = block.getByPosition(key_numbers_left[i]).column;
/// Добавляем в блок новые столбцы.
const Block & first_mapped_block = blocks.front();
size_t num_columns_to_add = first_mapped_block.columns();
ColumnPlainPtrs added_columns(num_columns_to_add);
size_t existing_columns = block.columns();
for (size_t i = 0; i < num_columns_to_add; ++i)
{
const ColumnWithNameAndType & src_column = first_mapped_block.getByPosition(i);
ColumnWithNameAndType new_column = src_column.cloneEmpty();
block.insert(new_column);
added_columns[i] = new_column.column;
added_columns[i]->reserve(src_column.column->size());
}
size_t rows = block.rowsInFirstColumn();
/// Используется при ANY INNER JOIN
std::unique_ptr<IColumn::Filter> filter;
if (kind == ASTJoin::Inner && strictness == ASTJoin::Any)
filter.reset(new IColumn::Filter(rows));
if (type == Set::KEY_64)
{
typedef MapUInt64 Map;
const Map & map = *key64;
const IColumn & column = *key_columns[0];
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
UInt64 key = column.get64(i);
addAnyRow<KIND, Map>(map, key, num_columns_to_add, added_columns, i, filter.get());
}
}
else if (type == Set::KEY_STRING)
{
typedef MapString Map;
const Map & map = *key_string;
const IColumn & column = *key_columns[0];
if (const ColumnString * column_string = dynamic_cast<const ColumnString *>(&column))
{
const ColumnString::Offsets_t & offsets = column_string->getOffsets();
const ColumnString::Chars_t & data = column_string->getChars();
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
StringRef key(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1);
addAnyRow<KIND, Map>(map, key, num_columns_to_add, added_columns, i, filter.get());
}
}
else if (const ColumnFixedString * column_string = dynamic_cast<const ColumnFixedString *>(&column))
{
size_t n = column_string->getN();
const ColumnFixedString::Chars_t & data = column_string->getChars();
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
/// Строим ключ
StringRef key(&data[i * n], n);
addAnyRow<KIND, Map>(map, key, num_columns_to_add, added_columns, i, filter.get());
}
}
else
throw Exception("Illegal type of column when creating set with string key: " + column.getName(), ErrorCodes::ILLEGAL_COLUMN);
}
else if (type == Set::HASHED)
{
typedef MapHashed Map;
Map & map = *hashed;
/// Для всех строчек
for (size_t i = 0; i < rows; ++i)
{
UInt128 key = keys_fit_128_bits
? pack128(i, keys_size, key_columns, key_sizes)
: hash128(i, keys_size, key_columns);
addAnyRow<KIND, Map>(map, key, num_columns_to_add, added_columns, i, filter.get());
}
}
else
throw Exception("Unknown JOIN variant.", ErrorCodes::UNKNOWN_SET_DATA_VARIANT);
/// Если ANY INNER JOIN - фильтруем все столбцы кроме новых.
if (kind == ASTJoin::Inner && strictness == ASTJoin::Any)
for (size_t i = 0; i < existing_columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(*filter);
}
void Join::joinBlock(Block & block)
{
if (kind == ASTJoin::Left)
anyJoinBlock<ASTJoin::Left>(block);
else
anyJoinBlock<ASTJoin::Inner>(block);
}
}