ClickHouse/dbms/src/Interpreters/JoinSwitcher.cpp

93 lines
2.6 KiB
C++
Raw Normal View History

#include <Common/typeid_cast.h>
#include <Interpreters/JoinSwitcher.h>
2020-02-17 17:21:03 +00:00
#include <Interpreters/Join.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/join_common.h>
namespace DB
{
static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column, bool nullable)
{
if (nullable)
JoinCommon::convertColumnToNullable(column);
else
JoinCommon::removeColumnNullability(column);
return std::move(column);
}
2020-02-17 17:21:03 +00:00
JoinSwitcher::JoinSwitcher(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block_)
: switched(false)
, limits(table_join_->sizeLimits())
2020-02-17 17:21:03 +00:00
, table_join(table_join_)
, right_sample_block(right_sample_block_.cloneEmpty())
{
join = std::make_shared<Join>(table_join, right_sample_block);
if (!limits.hasLimits())
limits.max_bytes = table_join->defaultMaxBytes();
2020-02-17 17:21:03 +00:00
}
bool JoinSwitcher::addJoinedBlock(const Block & block, bool)
{
/// Trying to make MergeJoin without lock
if (switched)
return join->addJoinedBlock(block);
std::lock_guard lock(switch_mutex);
if (switched)
return join->addJoinedBlock(block);
/// HashJoin with external limits check
join->addJoinedBlock(block, false);
size_t rows = join->getTotalRowCount();
size_t bytes = join->getTotalByteCount();
if (!limits.softCheck(rows, bytes))
switchJoin();
return true;
}
void JoinSwitcher::switchJoin()
{
std::shared_ptr<Join::RightTableData> joined_data = static_cast<const Join &>(*join).getJoinedData();
BlocksList right_blocks = std::move(joined_data->blocks);
/// Destroy old join & create new one. Destroy first in case of memory saving.
join = std::make_shared<MergeJoin>(table_join, right_sample_block);
2020-02-17 17:41:38 +00:00
/// names to positions optimization
std::vector<size_t> positions;
std::vector<bool> is_nullable;
if (right_blocks.size())
{
2020-02-17 17:41:38 +00:00
positions.reserve(right_sample_block.columns());
2020-02-18 12:41:23 +00:00
const Block & tmp_block = *right_blocks.begin();
for (const auto & sample_column : right_sample_block)
{
2020-02-17 17:41:38 +00:00
positions.emplace_back(tmp_block.getPositionByName(sample_column.name));
is_nullable.emplace_back(sample_column.type->isNullable());
}
}
2020-02-18 12:41:23 +00:00
for (Block & saved_block : right_blocks)
2020-02-17 17:41:38 +00:00
{
2020-02-18 12:41:23 +00:00
Block restored_block;
2020-02-17 17:41:38 +00:00
for (size_t i = 0; i < positions.size(); ++i)
{
2020-02-18 12:41:23 +00:00
auto & column = saved_block.getByPosition(positions[i]);
restored_block.insert(correctNullability(std::move(column), is_nullable[i]));
}
2020-02-18 12:41:23 +00:00
join->addJoinedBlock(restored_block);
}
switched = true;
}
}