Merge pull request #44238 from ClickHouse/vdimir/grace_join_mem

This commit is contained in:
Vladimir C 2023-01-24 13:02:11 +01:00 committed by GitHub
commit 0403f801a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 269 additions and 144 deletions

View File

@ -41,7 +41,7 @@ namespace
public:
AccumulatedBlockReader(TemporaryFileStream & reader_,
std::mutex & mutex_,
size_t result_block_size_ = DEFAULT_BLOCK_SIZE * 8)
size_t result_block_size_ = 0)
: reader(reader_)
, mutex(mutex_)
, result_block_size(result_block_size_)
@ -59,18 +59,22 @@ namespace
Blocks blocks;
size_t rows_read = 0;
while (rows_read < result_block_size)
do
{
Block block = reader.read();
rows_read += block.rows();
if (!block)
{
eof = true;
if (blocks.size() == 1)
return blocks.front();
return concatenateBlocks(blocks);
}
blocks.push_back(std::move(block));
}
} while (rows_read < result_block_size);
if (blocks.size() == 1)
return blocks.front();
return concatenateBlocks(blocks);
}
@ -118,21 +122,12 @@ class GraceHashJoin::FileBucket : boost::noncopyable
public:
using BucketLock = std::unique_lock<std::mutex>;
struct Stats
{
TemporaryFileStream::Stat left;
TemporaryFileStream::Stat right;
};
explicit FileBucket(size_t bucket_index_,
TemporaryFileStream & left_file_,
TemporaryFileStream & right_file_,
Poco::Logger * log_)
explicit FileBucket(size_t bucket_index_, TemporaryFileStream & left_file_, TemporaryFileStream & right_file_, Poco::Logger * log_)
: idx{bucket_index_}
, left_file{left_file_}
, right_file{right_file_}
, state{State::WRITING_BLOCKS}
, log(log_)
, log{log_}
{
}
@ -168,21 +163,18 @@ public:
bool empty() const { return is_empty.load(); }
Stats getStat() const { return stats; }
AccumulatedBlockReader startJoining()
{
LOG_TRACE(log, "Joining file bucket {}", idx);
{
std::unique_lock<std::mutex> left_lock(left_file_mutex);
std::unique_lock<std::mutex> right_lock(right_file_mutex);
stats.left = left_file.finishWriting();
stats.right = right_file.finishWriting();
left_file.finishWriting();
right_file.finishWriting();
state = State::JOINING_BLOCKS;
}
return AccumulatedBlockReader(right_file, right_file_mutex);
}
@ -231,22 +223,23 @@ private:
std::atomic_bool is_empty = true;
std::atomic<State> state;
Stats stats;
Poco::Logger * log;
};
namespace
{
template <JoinTableSide table_side>
void flushBlocksToBuckets(Blocks & blocks, const GraceHashJoin::Buckets & buckets)
void flushBlocksToBuckets(Blocks & blocks, const GraceHashJoin::Buckets & buckets, size_t except_index = 0)
{
chassert(blocks.size() == buckets.size());
retryForEach(
generateRandomPermutation(1, buckets.size()), // skipping 0 block, since we join it in memory w/o spilling on disk
[&](size_t i)
{
if (!blocks[i].rows())
/// Skip empty and current bucket
if (!blocks[i].rows() || i == except_index)
return true;
bool flushed = false;
@ -281,6 +274,7 @@ GraceHashJoin::GraceHashJoin(
, right_key_names(table_join->getOnlyClause().key_names_right)
, tmp_data(std::make_unique<TemporaryDataOnDisk>(tmp_data_, CurrentMetrics::TemporaryFilesForJoin))
, hash_join(makeInMemoryJoin())
, hash_join_sample_block(hash_join->savedBlockSample())
{
if (!GraceHashJoin::isSupported(table_join))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "GraceHashJoin is not supported for this join type");
@ -288,6 +282,9 @@ GraceHashJoin::GraceHashJoin(
void GraceHashJoin::initBuckets()
{
if (!buckets.empty())
return;
const auto & settings = context->getSettingsRef();
size_t initial_num_buckets = roundUpToPowerOfTwoOrZero(std::clamp<size_t>(settings.grace_hash_join_initial_buckets, 1, settings.grace_hash_join_max_buckets));
@ -300,7 +297,7 @@ void GraceHashJoin::initBuckets()
if (buckets.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No buckets created");
LOG_TRACE(log, "Initialize {} buckets", buckets.size());
LOG_TRACE(log, "Initialize {} bucket{}", buckets.size(), buckets.size() > 1 ? "s" : "");
current_bucket = buckets.front().get();
current_bucket->startJoining();
@ -320,18 +317,44 @@ bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/)
throw Exception(ErrorCodes::LOGICAL_ERROR, "GraceHashJoin is not initialized");
Block materialized = materializeBlock(block);
addJoinedBlockImpl(materialized);
addJoinedBlockImpl(std::move(materialized));
return true;
}
bool GraceHashJoin::fitsInMemory() const
bool GraceHashJoin::hasMemoryOverflow(size_t total_rows, size_t total_bytes) const
{
/// One row can't be split, avoid loop
size_t total_row_count = hash_join->getTotalRowCount();
if (total_row_count < 2)
return true;
if (total_rows < 2)
return false;
return table_join->sizeLimits().softCheck(total_row_count, hash_join->getTotalByteCount());
bool has_overflow = !table_join->sizeLimits().softCheck(total_rows, total_bytes);
if (has_overflow)
LOG_TRACE(log, "Memory overflow, size exceeded {} / {} bytes, {} / {} rows",
ReadableSize(total_bytes), ReadableSize(table_join->sizeLimits().max_bytes),
total_rows, table_join->sizeLimits().max_rows);
return has_overflow;
}
bool GraceHashJoin::hasMemoryOverflow(const BlocksList & blocks) const
{
size_t total_rows = 0;
size_t total_bytes = 0;
for (const auto & block : blocks)
{
total_rows += block.rows();
total_bytes += block.allocatedBytes();
}
return hasMemoryOverflow(total_rows, total_bytes);
}
bool GraceHashJoin::hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const
{
size_t total_rows = hash_join_->getTotalRowCount();
size_t total_bytes = hash_join_->getTotalByteCount();
return hasMemoryOverflow(total_rows, total_bytes);
}
GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size)
@ -342,7 +365,7 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size)
if (to_size <= current_size)
return buckets;
assert(isPowerOf2(to_size));
chassert(isPowerOf2(to_size));
if (to_size > max_num_buckets)
{
@ -363,14 +386,16 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size)
void GraceHashJoin::addBucket(Buckets & destination)
{
BucketPtr new_bucket = std::make_shared<FileBucket>(
destination.size(), tmp_data->createStream(left_sample_block), tmp_data->createStream(right_sample_block), log);
auto & left_file = tmp_data->createStream(left_sample_block);
auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
BucketPtr new_bucket = std::make_shared<FileBucket>(destination.size(), left_file, right_file, log);
destination.emplace_back(std::move(new_bucket));
}
void GraceHashJoin::checkTypesOfKeys(const Block & block) const
{
assert(hash_join);
chassert(hash_join);
return hash_join->checkTypesOfKeys(block);
}
@ -423,7 +448,7 @@ size_t GraceHashJoin::getTotalRowCount() const
size_t GraceHashJoin::getTotalByteCount() const
{
std::lock_guard lock(hash_join_mutex);
assert(hash_join);
chassert(hash_join);
return hash_join->getTotalByteCount();
}
@ -437,9 +462,14 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const
std::shared_lock lock(rehash_mutex);
return std::all_of(buckets.begin(), buckets.end(), [](const auto & bucket) { return bucket->empty(); });
}();
bool hash_join_is_empty = hash_join && hash_join->alwaysReturnsEmptySet();
return hash_join_is_empty && file_buckets_are_empty;
if (!file_buckets_are_empty)
return false;
chassert(hash_join);
bool hash_join_is_empty = hash_join->alwaysReturnsEmptySet();
return hash_join_is_empty;
}
IBlocksStreamPtr GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const
@ -528,17 +558,11 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
if (hash_join)
{
auto right_blocks = hash_join->releaseJoinedBlocks();
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_blocks, buckets.size());
for (size_t i = 0; i < blocks.size(); ++i)
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
for (auto & block : right_blocks)
{
if (blocks[i].rows() == 0 || i == bucket_idx)
continue;
if (i < bucket_idx)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected bucket index {} when current bucket is {}", i, bucket_idx);
buckets[i]->addRightBlock(blocks[i]);
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets.size());
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets, bucket_idx);
}
}
@ -570,7 +594,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
return std::make_unique<DelayedBlocks>(current_bucket->idx, buckets, hash_join, left_key_names, right_key_names);
}
LOG_TRACE(log, "Finished loading all buckets");
LOG_TRACE(log, "Finished loading all {} buckets", buckets.size());
current_bucket = nullptr;
return nullptr;
@ -581,42 +605,64 @@ GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin()
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row);
}
Block GraceHashJoin::prepareRightBlock(const Block & block)
{
return HashJoin::prepareRightBlock(block, hash_join_sample_block);
}
void GraceHashJoin::addJoinedBlockImpl(Block block)
{
Buckets buckets_snapshot = getCurrentBuckets();
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size());
size_t bucket_index = current_bucket->idx;
Block current_block;
{
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size());
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets_snapshot, bucket_index);
current_block = std::move(blocks[bucket_index]);
}
// Add block to the in-memory join
if (blocks[bucket_index].rows() > 0)
if (current_block.rows() > 0)
{
std::lock_guard lock(hash_join_mutex);
hash_join->addJoinedBlock(blocks[bucket_index], /* check_limits = */ false);
bool overflow = !fitsInMemory();
if (overflow)
{
auto right_blocks = hash_join->releaseJoinedBlocks();
right_blocks.pop_back();
for (const auto & right_block : right_blocks)
blocks.push_back(right_block);
}
while (overflow)
{
buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2);
blocks = JoinCommon::scatterBlockByHash(right_key_names, blocks, buckets_snapshot.size());
if (!hash_join)
hash_join = makeInMemoryJoin();
hash_join->addJoinedBlock(blocks[bucket_index], /* check_limits = */ false);
overflow = !fitsInMemory();
}
blocks[bucket_index].clear();
}
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets_snapshot);
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
if (!hasMemoryOverflow(hash_join))
return;
current_block = {};
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
hash_join = nullptr;
buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2);
{
Blocks current_blocks;
current_blocks.reserve(right_blocks.size());
for (const auto & right_block : right_blocks)
{
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_block, buckets_snapshot.size());
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets_snapshot, bucket_index);
current_blocks.emplace_back(std::move(blocks[bucket_index]));
}
if (current_blocks.size() == 1)
current_block = std::move(current_blocks.front());
else
current_block = concatenateBlocks(current_blocks);
}
hash_join = makeInMemoryJoin();
if (current_block.rows() > 0)
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
}
}
size_t GraceHashJoin::getNumBuckets() const

View File

@ -95,8 +95,10 @@ private:
/// Add right table block to the @join. Calls @rehash on overflow.
void addJoinedBlockImpl(Block block);
/// Check that @join satisifes limits on rows/bytes in @table_join.
bool fitsInMemory() const;
/// Check that join satisfies limits on rows/bytes in table_join.
bool hasMemoryOverflow(size_t total_rows, size_t total_bytes) const;
bool hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const;
bool hasMemoryOverflow(const BlocksList & blocks) const;
/// Create new bucket at the end of @destination.
void addBucket(Buckets & destination);
@ -114,6 +116,9 @@ private:
size_t getNumBuckets() const;
Buckets getCurrentBuckets() const;
/// Structure block to store in the HashJoin according to sample_block.
Block prepareRightBlock(const Block & block);
Poco::Logger * log;
ContextPtr context;
std::shared_ptr<TableJoin> table_join;
@ -136,6 +141,7 @@ private:
mutable std::mutex current_bucket_mutex;
InMemoryJoinPtr hash_join;
Block hash_join_sample_block;
mutable std::mutex hash_join_mutex;
};

View File

@ -221,8 +221,8 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
, right_sample_block(right_sample_block_)
, log(&Poco::Logger::get("HashJoin"))
{
LOG_DEBUG(log, "Datatype: {}, kind: {}, strictness: {}, right header: {}", data->type, kind, strictness, right_sample_block.dumpStructure());
LOG_DEBUG(log, "Keys: {}", TableJoin::formatClauses(table_join->getClauses(), true));
LOG_DEBUG(log, "({}) Datatype: {}, kind: {}, strictness: {}, right header: {}", fmt::ptr(this), data->type, kind, strictness, right_sample_block.dumpStructure());
LOG_DEBUG(log, "({}) Keys: {}", fmt::ptr(this), TableJoin::formatClauses(table_join->getClauses(), true));
if (isCrossOrComma(kind))
{
@ -469,6 +469,9 @@ bool HashJoin::alwaysReturnsEmptySet() const
size_t HashJoin::getTotalRowCount() const
{
if (!data)
return 0;
size_t res = 0;
if (data->type == Type::CROSS)
@ -484,28 +487,45 @@ size_t HashJoin::getTotalRowCount() const
}
}
return res;
}
size_t HashJoin::getTotalByteCount() const
{
if (!data)
return 0;
#ifdef NDEBUG
size_t debug_blocks_allocated_size = 0;
for (const auto & block : data->blocks)
debug_blocks_allocated_size += block.allocatedBytes();
if (data->blocks_allocated_size != debug_blocks_allocated_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_allocated_size != debug_blocks_allocated_size ({} != {})",
data->blocks_allocated_size, debug_blocks_allocated_size);
size_t debug_blocks_nullmaps_allocated_size = 0;
for (const auto & nullmap : data->blocks_nullmaps)
debug_blocks_nullmaps_allocated_size += nullmap.second->allocatedBytes();
if (data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})",
data->blocks_nullmaps_allocated_size, debug_blocks_nullmaps_allocated_size);
#endif
size_t res = 0;
if (data->type == Type::CROSS)
{
for (const auto & block : data->blocks)
res += block.bytes();
}
else
res += data->blocks_allocated_size;
res += data->blocks_nullmaps_allocated_size;
res += data->pool.size();
if (data->type != Type::CROSS)
{
for (const auto & map : data->maps)
{
joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { res += map_.getTotalByteCountImpl(data->type); });
}
res += data->pool.size();
}
return res;
}
@ -656,41 +676,57 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample)
}
}
Block HashJoin::structureRightBlock(const Block & block) const
Block HashJoin::prepareRightBlock(const Block & block, const Block & saved_block_sample_)
{
Block structured_block;
for (const auto & sample_column : savedBlockSample().getColumnsWithTypeAndName())
for (const auto & sample_column : saved_block_sample_.getColumnsWithTypeAndName())
{
ColumnWithTypeAndName column = block.getByName(sample_column.name);
if (sample_column.column->isNullable())
JoinCommon::convertColumnToNullable(column);
structured_block.insert(column);
if (column.column->lowCardinality() && !sample_column.column->lowCardinality())
{
column.column = column.column->convertToFullColumnIfLowCardinality();
column.type = removeLowCardinality(column.type);
}
/// There's no optimization for right side const columns. Remove constness if any.
column.column = recursiveRemoveSparse(column.column->convertToFullColumnIfConst());
structured_block.insert(std::move(column));
}
return structured_block;
}
Block HashJoin::prepareRightBlock(const Block & block) const
{
return prepareRightBlock(block, savedBlockSample());
}
bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
{
if (!data)
throw Exception("Join data was released", ErrorCodes::LOGICAL_ERROR);
/// RowRef::SizeT is uint32_t (not size_t) for hash table Cell memory efficiency.
/// It's possible to split bigger blocks and insert them by parts here. But it would be a dead code.
if (unlikely(source_block.rows() > std::numeric_limits<RowRef::SizeT>::max()))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Too many rows in right table block for HashJoin: {}", source_block.rows());
/// There's no optimization for right side const columns. Remove constness if any.
Block block = materializeBlock(source_block);
size_t rows = block.rows();
size_t rows = source_block.rows();
ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(block, table_join->getAllNames(JoinTableSide::Right));
ColumnPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(source_block, table_join->getAllNames(JoinTableSide::Right));
Block structured_block = structureRightBlock(block);
Block block_to_save = prepareRightBlock(source_block);
size_t total_rows = 0;
size_t total_bytes = 0;
{
if (storage_join_lock)
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "addJoinedBlock called when HashJoin locked to prevent updates");
data->blocks.emplace_back(std::move(structured_block));
data->blocks_allocated_size += block_to_save.allocatedBytes();
data->blocks.emplace_back(std::move(block_to_save));
Block * stored_block = &data->blocks.back();
if (rows)
@ -702,7 +738,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
{
ColumnRawPtrs key_columns;
for (const auto & name : onexprs[onexpr_idx].key_names_right)
key_columns.push_back(all_key_columns[name]);
key_columns.push_back(all_key_columns[name].get());
/// We will insert to the map only keys, where all components are not NULL.
ConstNullMapPtr null_map{};
@ -717,14 +753,14 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
save_nullmap |= (*null_map)[i];
}
auto join_mask_col = JoinCommon::getColumnAsMask(block, onexprs[onexpr_idx].condColumnNames().second);
auto join_mask_col = JoinCommon::getColumnAsMask(source_block, onexprs[onexpr_idx].condColumnNames().second);
/// Save blocks that do not hold conditions in ON section
ColumnUInt8::MutablePtr not_joined_map = nullptr;
if (!multiple_disjuncts && isRightOrFull(kind) && !join_mask_col.isConstant())
if (!multiple_disjuncts && isRightOrFull(kind) && join_mask_col.hasData())
{
const auto & join_mask = join_mask_col.getData();
/// Save rows that do not hold conditions
not_joined_map = ColumnUInt8::create(block.rows(), 0);
not_joined_map = ColumnUInt8::create(rows, 0);
for (size_t i = 0, sz = join_mask->size(); i < sz; ++i)
{
/// Condition hold, do not save row
@ -758,10 +794,16 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
}
if (!multiple_disjuncts && save_nullmap)
{
data->blocks_nullmaps_allocated_size += null_map_holder->allocatedBytes();
data->blocks_nullmaps.emplace_back(stored_block, null_map_holder);
}
if (!multiple_disjuncts && not_joined_map)
{
data->blocks_nullmaps_allocated_size += not_joined_map->allocatedBytes();
data->blocks_nullmaps.emplace_back(stored_block, std::move(not_joined_map));
}
if (!check_limits)
return true;
@ -794,7 +836,6 @@ struct JoinOnKeyColumns
Sizes key_sizes;
explicit JoinOnKeyColumns(const Block & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_)
: key_names(key_names_)
, materialized_keys_holder(JoinCommon::materializeColumns(block, key_names)) /// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them.
@ -1672,7 +1713,7 @@ void HashJoin::checkTypesOfKeys(const Block & block) const
void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
{
if (data->released)
if (!data)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released");
for (const auto & onexpr : table_join->getClauses())
@ -1711,6 +1752,16 @@ void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
}
}
HashJoin::~HashJoin()
{
if (!data)
{
LOG_TRACE(log, "({}) Join data has been already released", fmt::ptr(this));
return;
}
LOG_TRACE(log, "({}) Join data is being destroyed, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount());
}
template <typename Mapped>
struct AdderNonJoined
{
@ -1749,7 +1800,6 @@ struct AdderNonJoined
}
};
/// Stream from not joined earlier rows of the right table.
/// Based on:
/// - map offsetInternal saved in used_flags for single disjuncts
@ -1760,7 +1810,10 @@ class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller
public:
NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_)
: parent(parent_), max_block_size(max_block_size_), current_block_start(0)
{}
{
if (parent.data == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released");
}
Block getEmptyBlock() override { return parent.savedBlockSample().cloneEmpty(); }
@ -1957,7 +2010,6 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block,
size_t left_columns_count = left_sample_block.columns();
auto non_joined = std::make_unique<NotJoinedHash<true>>(*this, max_block_size);
return std::make_unique<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, *table_join);
}
else
{
@ -1986,10 +2038,20 @@ void HashJoin::reuseJoinedData(const HashJoin & join)
}
}
BlocksList HashJoin::releaseJoinedBlocks()
BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
{
LOG_TRACE(log, "({}) Join data is being released, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount());
BlocksList right_blocks = std::move(data->blocks);
data->released = true;
if (!restructure)
{
data.reset();
return right_blocks;
}
data->maps.clear();
data->blocks_nullmaps.clear();
BlocksList restored_blocks;
/// names to positions optimization
@ -2018,6 +2080,7 @@ BlocksList HashJoin::releaseJoinedBlocks()
restored_blocks.emplace_back(std::move(restored_block));
}
data.reset();
return restored_blocks;
}

View File

@ -149,6 +149,8 @@ class HashJoin : public IJoin
public:
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false);
~HashJoin() override;
const TableJoin & getTableJoin() const override { return *table_join; }
/** Add block of data from right hand of JOIN to the map.
@ -336,7 +338,8 @@ public:
/// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows.
Arena pool;
bool released = false;
size_t blocks_allocated_size = 0;
size_t blocks_nullmaps_allocated_size = 0;
};
using RightTableDataPtr = std::shared_ptr<RightTableData>;
@ -351,7 +354,13 @@ public:
void reuseJoinedData(const HashJoin & join);
RightTableDataPtr getJoinedData() const { return data; }
BlocksList releaseJoinedBlocks();
BlocksList releaseJoinedBlocks(bool restructure = false);
/// Modify right block (update structure according to sample block) to save it in block list
static Block prepareRightBlock(const Block & block, const Block & saved_block_sample_);
Block prepareRightBlock(const Block & block) const;
const Block & savedBlockSample() const { return data->sample_block; }
bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); }
bool isUsed(const Block * block_ptr, size_t row_idx) const { return used_flags.getUsedSafe(block_ptr, row_idx); }
@ -403,10 +412,6 @@ private:
void dataMapInit(MapsVariant &);
const Block & savedBlockSample() const { return data->sample_block; }
/// Modify (structure) right block to save it in block list
Block structureRightBlock(const Block & stored_block) const;
void initRightBlockStructure(Block & saved_block_sample);
template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps>

View File

@ -41,7 +41,7 @@ bool JoinSwitcher::addJoinedBlock(const Block & block, bool)
bool JoinSwitcher::switchJoin()
{
HashJoin * hash_join = assert_cast<HashJoin *>(join.get());
BlocksList right_blocks = hash_join->releaseJoinedBlocks();
BlocksList right_blocks = hash_join->releaseJoinedBlocks(true);
/// Destroy old join & create new one.
join = std::make_shared<MergeJoin>(table_join, right_sample_block);

View File

@ -324,17 +324,20 @@ ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names)
return ptrs;
}
ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names)
ColumnPtrMap materializeColumnsInplaceMap(const Block & block, const Names & names)
{
ColumnRawPtrMap ptrs;
ColumnPtrMap ptrs;
ptrs.reserve(names.size());
for (const auto & column_name : names)
{
auto & column = block.getByName(column_name);
column.column = recursiveRemoveLowCardinality(column.column->convertToFullColumnIfConst());
column.type = recursiveRemoveLowCardinality(column.type);
ptrs[column_name] = column.column.get();
ColumnPtr column = block.getByName(column_name).column;
column = column->convertToFullColumnIfConst();
column = recursiveRemoveLowCardinality(column);
column = recursiveRemoveSparse(column);
ptrs[column_name] = column;
}
return ptrs;
@ -529,24 +532,24 @@ bool typesEqualUpToNullability(DataTypePtr left_type, DataTypePtr right_type)
JoinMask getColumnAsMask(const Block & block, const String & column_name)
{
if (column_name.empty())
return JoinMask(true);
return JoinMask(true, block.rows());
const auto & src_col = block.getByName(column_name);
DataTypePtr col_type = recursiveRemoveLowCardinality(src_col.type);
if (isNothing(col_type))
return JoinMask(false);
return JoinMask(false, block.rows());
if (const auto * const_cond = checkAndGetColumn<ColumnConst>(*src_col.column))
{
return JoinMask(const_cond->getBool(0));
return JoinMask(const_cond->getBool(0), block.rows());
}
ColumnPtr join_condition_col = recursiveRemoveLowCardinality(src_col.column->convertToFullColumnIfConst());
if (const auto * nullable_col = typeid_cast<const ColumnNullable *>(join_condition_col.get()))
{
if (isNothing(assert_cast<const DataTypeNullable &>(*col_type).getNestedType()))
return JoinMask(false);
return JoinMask(false, block.rows());
/// Return nested column with NULL set to false
const auto & nest_col = assert_cast<const ColumnUInt8 &>(nullable_col->getNestedColumn());
@ -639,9 +642,8 @@ Blocks scatterBlockByHash(const Strings & key_columns_names, const Block & block
{
if (num_shards == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of shards must be positive");
UNUSED(scatterBlockByHashPow2);
// if (likely(isPowerOf2(num_shards)))
// return scatterBlockByHashPow2(key_columns_names, block, num_shards);
if (likely(isPowerOf2(num_shards)))
return scatterBlockByHashPow2(key_columns_names, block, num_shards);
return scatterBlockByHashGeneric(key_columns_names, block, num_shards);
}

View File

@ -14,30 +14,34 @@ class TableJoin;
class IColumn;
using ColumnRawPtrs = std::vector<const IColumn *>;
using ColumnPtrMap = std::unordered_map<String, ColumnPtr>;
using ColumnRawPtrMap = std::unordered_map<String, const IColumn *>;
using UInt8ColumnDataPtr = const ColumnUInt8::Container *;
namespace JoinCommon
{
/// Store boolean column handling constant value without materializing
/// Behaves similar to std::variant<bool, ColumnPtr>, but provides more convenient specialized interface
/// Helper interface to work with mask from JOIN ON section
class JoinMask
{
public:
explicit JoinMask(bool value)
explicit JoinMask()
: column(nullptr)
, const_value(value)
{}
explicit JoinMask(bool value, size_t size)
: column(ColumnUInt8::create(size, value))
{}
explicit JoinMask(ColumnPtr col)
: column(col)
, const_value(false)
{}
bool isConstant() { return !column; }
bool hasData()
{
return column != nullptr;
}
/// Return data if mask is not constant
UInt8ColumnDataPtr getData()
{
if (column)
@ -47,15 +51,11 @@ public:
inline bool isRowFiltered(size_t row) const
{
if (column)
return !assert_cast<const ColumnUInt8 &>(*column).getData()[row];
return !const_value;
return !assert_cast<const ColumnUInt8 &>(*column).getData()[row];
}
private:
ColumnPtr column;
/// Used if column is null
bool const_value;
};
@ -71,7 +71,7 @@ ColumnPtr emptyNotNullableClone(const ColumnPtr & column);
ColumnPtr materializeColumn(const Block & block, const String & name);
Columns materializeColumns(const Block & block, const Names & names);
ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names);
ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names);
ColumnPtrMap materializeColumnsInplaceMap(const Block & block, const Names & names);
ColumnRawPtrs getRawPointers(const Columns & columns);
void convertToFullColumnsInplace(Block & block);
void convertToFullColumnsInplace(Block & block, const Names & names, bool change_type = true);

View File

@ -55,7 +55,7 @@ ColumnWithTypeAndName condtitionColumnToJoinable(const Block & block, const Stri
if (!src_column_name.empty())
{
auto join_mask = JoinCommon::getColumnAsMask(block, src_column_name);
if (!join_mask.isConstant())
if (join_mask.hasData())
{
for (size_t i = 0; i < res_size; ++i)
null_map->getData()[i] = join_mask.isRowFiltered(i);

View File

@ -198,7 +198,7 @@ public:
: size_limits(limits)
, default_max_bytes(0)
, join_use_nulls(use_nulls)
, join_algorithm(JoinAlgorithm::HASH)
, join_algorithm(JoinAlgorithm::DEFAULT)
{
clauses.emplace_back().key_names_right = key_names_right;
table_join.kind = kind;

View File

@ -30,6 +30,8 @@ SELECT t1.key, t1.key2 FROM t1 INNER ALL JOIN t2 ON t1.id == t2.id AND t2.key ==
SELECT '--';
SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2;
SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND 0; -- { serverError INVALID_JOIN_ON_EXPRESSION }
SELECT t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND 1; -- { serverError INVALID_JOIN_ON_EXPRESSION }
SELECT '--';
SELECT '333' = t1.key FROM t1 INNER ANY JOIN t2 ON t1.id == t2.id AND t2.key == t2.key2 AND t1.key == t1.key2 AND t2.id > 2;

View File

@ -1,4 +1,4 @@
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge'] -%}
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge', 'grace_hash'] -%}
--- {{ join_algorithm }} ---
2014-03-17 1406958 265108
2014-03-19 1405797 261624

View File

@ -1,6 +1,7 @@
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge'] -%}
{% for join_algorithm in ['hash', 'parallel_hash', 'full_sorting_merge', 'grace_hash'] -%}
SET max_bytes_in_join = '{% if join_algorithm == 'grace_hash' %}20K{% else %}0{% endif %}';
SET max_rows_in_join = '{% if join_algorithm == 'grace_hash' %}10K{% else %}0{% endif %}';
SET grace_hash_join_initial_buckets = 4;
SELECT '--- {{ join_algorithm }} ---';