This commit is contained in:
vdimir 2022-12-20 12:50:27 +00:00
parent efcfcca545
commit 57a35cae33
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
6 changed files with 149 additions and 81 deletions

View File

@ -841,6 +841,9 @@ Block concatenateBlocks(const std::vector<Block> & blocks)
if (blocks.empty()) if (blocks.empty())
return {}; return {};
if (blocks.size() == 1)
return blocks.front();
size_t num_rows = 0; size_t num_rows = 0;
for (const auto & block : blocks) for (const auto & block : blocks)
num_rows += block.rows(); num_rows += block.rows();

View File

@ -41,7 +41,7 @@ namespace
public: public:
AccumulatedBlockReader(TemporaryFileStream & reader_, AccumulatedBlockReader(TemporaryFileStream & reader_,
std::mutex & mutex_, std::mutex & mutex_,
size_t result_block_size_ = DEFAULT_BLOCK_SIZE * 8) size_t result_block_size_ = 0)
: reader(reader_) : reader(reader_)
, mutex(mutex_) , mutex(mutex_)
, result_block_size(result_block_size_) , result_block_size(result_block_size_)
@ -59,7 +59,7 @@ namespace
Blocks blocks; Blocks blocks;
size_t rows_read = 0; size_t rows_read = 0;
while (rows_read < result_block_size) while (true)
{ {
Block block = reader.read(); Block block = reader.read();
rows_read += block.rows(); rows_read += block.rows();
@ -69,6 +69,9 @@ namespace
return concatenateBlocks(blocks); return concatenateBlocks(blocks);
} }
blocks.push_back(std::move(block)); blocks.push_back(std::move(block));
if (rows_read >= result_block_size)
break;
} }
return concatenateBlocks(blocks); return concatenateBlocks(blocks);
@ -124,15 +127,14 @@ public:
TemporaryFileStream::Stat right; TemporaryFileStream::Stat right;
}; };
explicit FileBucket(size_t bucket_index_, explicit FileBucket(size_t bucket_index_, GraceHashJoin & grace_join_)
TemporaryFileStream & left_file_,
TemporaryFileStream & right_file_,
Poco::Logger * log_)
: idx{bucket_index_} : idx{bucket_index_}
, left_file{left_file_} , left_file{
, right_file{right_file_} grace_join_.tmp_data->createStream(grace_join_.left_sample_block)}
, right_file{
grace_join_.tmp_data->createStream(grace_join_.prepareRightBlock(grace_join_.right_sample_block))}
, state{State::WRITING_BLOCKS} , state{State::WRITING_BLOCKS}
, log(log_) , log{grace_join_.log}
{ {
} }
@ -174,14 +176,13 @@ public:
{ {
LOG_TRACE(log, "Joining file bucket {}", idx); LOG_TRACE(log, "Joining file bucket {}", idx);
{
std::unique_lock<std::mutex> left_lock(left_file_mutex); std::unique_lock<std::mutex> left_lock(left_file_mutex);
std::unique_lock<std::mutex> right_lock(right_file_mutex); std::unique_lock<std::mutex> right_lock(right_file_mutex);
stats.left = left_file.finishWriting(); stats.left = left_file.finishWriting();
stats.right = right_file.finishWriting(); stats.right = right_file.finishWriting();
state = State::JOINING_BLOCKS; state = State::JOINING_BLOCKS;
}
return AccumulatedBlockReader(right_file, right_file_mutex); return AccumulatedBlockReader(right_file, right_file_mutex);
} }
@ -238,15 +239,17 @@ private:
namespace namespace
{ {
template <JoinTableSide table_side> template <JoinTableSide table_side>
void flushBlocksToBuckets(Blocks & blocks, const GraceHashJoin::Buckets & buckets) void flushBlocksToBuckets(Blocks & blocks, const GraceHashJoin::Buckets & buckets, size_t except_index = 0)
{ {
chassert(blocks.size() == buckets.size()); chassert(blocks.size() == buckets.size());
retryForEach( retryForEach(
generateRandomPermutation(1, buckets.size()), // skipping 0 block, since we join it in memory w/o spilling on disk generateRandomPermutation(1, buckets.size()), // skipping 0 block, since we join it in memory w/o spilling on disk
[&](size_t i) [&](size_t i)
{ {
if (!blocks[i].rows()) /// Skip empty and current bucket
if (!blocks[i].rows() || i == except_index)
return true; return true;
bool flushed = false; bool flushed = false;
@ -281,6 +284,7 @@ GraceHashJoin::GraceHashJoin(
, right_key_names(table_join->getOnlyClause().key_names_right) , right_key_names(table_join->getOnlyClause().key_names_right)
, tmp_data(std::make_unique<TemporaryDataOnDisk>(tmp_data_, CurrentMetrics::TemporaryFilesForJoin)) , tmp_data(std::make_unique<TemporaryDataOnDisk>(tmp_data_, CurrentMetrics::TemporaryFilesForJoin))
, hash_join(makeInMemoryJoin()) , hash_join(makeInMemoryJoin())
, hash_join_sample_block(hash_join->savedBlockSample())
{ {
if (!GraceHashJoin::isSupported(table_join)) if (!GraceHashJoin::isSupported(table_join))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "GraceHashJoin is not supported for this join type"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "GraceHashJoin is not supported for this join type");
@ -303,7 +307,7 @@ void GraceHashJoin::initBuckets()
if (buckets.empty()) if (buckets.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No buckets created"); throw Exception(ErrorCodes::LOGICAL_ERROR, "No buckets created");
LOG_TRACE(log, "Initialize {} buckets", buckets.size()); LOG_TRACE(log, "Initialize {} bucket{}", buckets.size(), buckets.size() > 1 ? "s" : "");
current_bucket = buckets.front().get(); current_bucket = buckets.front().get();
current_bucket->startJoining(); current_bucket->startJoining();
@ -323,10 +327,26 @@ bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/)
throw Exception(ErrorCodes::LOGICAL_ERROR, "GraceHashJoin is not initialized"); throw Exception(ErrorCodes::LOGICAL_ERROR, "GraceHashJoin is not initialized");
Block materialized = materializeBlock(block); Block materialized = materializeBlock(block);
addJoinedBlockImpl(materialized); addJoinedBlockImpl(std::move(materialized));
return true; return true;
} }
bool GraceHashJoin::hasMemoryOverflow(const Block & block) const
{
/// One row can't be split, avoid loop
if (block.rows() < 2)
return false;
bool has_overflow = !table_join->sizeLimits().softCheck(block.rows(), block.allocatedBytes());
if (has_overflow)
LOG_TRACE(log, "GraceHashJoin has memory overflow, block {} / {} bytes, {} / {} rows",
ReadableSize(block.allocatedBytes()), ReadableSize(table_join->sizeLimits().max_bytes),
block.rows(), table_join->sizeLimits().max_rows);
return has_overflow;
}
bool GraceHashJoin::hasMemoryOverflow() const bool GraceHashJoin::hasMemoryOverflow() const
{ {
/// One row can't be split, avoid loop /// One row can't be split, avoid loop
@ -339,7 +359,7 @@ bool GraceHashJoin::hasMemoryOverflow() const
bool has_overflow = !table_join->sizeLimits().softCheck(total_rows, total_bytes); bool has_overflow = !table_join->sizeLimits().softCheck(total_rows, total_bytes);
if (has_overflow) if (has_overflow)
LOG_TRACE(log, "GraceHashJoin has memory overflow {} / {} bytes, {} / {} rows", LOG_TRACE(log, "GraceHashJoin has memory overflow, hash {} / {} bytes, {} / {} rows",
ReadableSize(total_bytes), ReadableSize(table_join->sizeLimits().max_bytes), ReadableSize(total_bytes), ReadableSize(table_join->sizeLimits().max_bytes),
total_rows, table_join->sizeLimits().max_rows); total_rows, table_join->sizeLimits().max_rows);
return has_overflow; return has_overflow;
@ -353,7 +373,7 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size)
if (to_size <= current_size) if (to_size <= current_size)
return buckets; return buckets;
assert(isPowerOf2(to_size)); chassert(isPowerOf2(to_size));
if (to_size > max_num_buckets) if (to_size > max_num_buckets)
{ {
@ -373,8 +393,7 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size)
void GraceHashJoin::addBucket(Buckets & destination) void GraceHashJoin::addBucket(Buckets & destination)
{ {
BucketPtr new_bucket = std::make_shared<FileBucket>( BucketPtr new_bucket = std::make_shared<FileBucket>(destination.size(), *this);
destination.size(), tmp_data->createStream(left_sample_block), tmp_data->createStream(right_sample_block), log);
destination.emplace_back(std::move(new_bucket)); destination.emplace_back(std::move(new_bucket));
} }
@ -538,17 +557,11 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
if (hash_join) if (hash_join)
{ {
auto right_blocks = hash_join->releaseJoinedBlocks(); auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_blocks, buckets.size()); for (auto & block : right_blocks)
for (size_t i = 0; i < blocks.size(); ++i)
{ {
if (blocks[i].rows() == 0 || i == bucket_idx) Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets.size());
continue; flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets, bucket_idx);
if (i < bucket_idx)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected bucket index {} when current bucket is {}", i, bucket_idx);
buckets[i]->addRightBlock(blocks[i]);
} }
} }
@ -591,42 +604,58 @@ GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin()
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row); return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row);
} }
Block GraceHashJoin::prepareRightBlock(const Block & block)
{
return HashJoin::prepareRightBlock(block, hash_join_sample_block);
}
void GraceHashJoin::addJoinedBlockImpl(Block block) void GraceHashJoin::addJoinedBlockImpl(Block block)
{ {
Buckets buckets_snapshot = getCurrentBuckets(); Buckets buckets_snapshot = getCurrentBuckets();
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size());
size_t bucket_index = current_bucket->idx; size_t bucket_index = current_bucket->idx;
Block current_block;
{
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size());
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets_snapshot, bucket_index);
current_block = std::move(blocks[bucket_index]);
}
// Add block to the in-memory join // Add block to the in-memory join
if (blocks[bucket_index].rows() > 0) if (current_block.rows() > 0)
{ {
std::lock_guard lock(hash_join_mutex); std::lock_guard lock(hash_join_mutex);
hash_join->addJoinedBlock(std::move(blocks[bucket_index]), /* check_limits = */ false); hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
bool overflow = hasMemoryOverflow();
if (overflow) if (!hasMemoryOverflow())
{ return;
auto right_blocks = hash_join->releaseJoinedBlocks();
right_blocks.pop_back(); current_block = {};
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
for (const auto & right_block : right_blocks) for (const auto & right_block : right_blocks)
blocks.push_back(right_block); {
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, right_block, buckets_snapshot.size());
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets_snapshot, bucket_index);
current_block = blocks[bucket_index];
} }
while (overflow) while (hasMemoryOverflow(current_block))
{ {
buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2); buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2);
blocks = JoinCommon::scatterBlockByHash(right_key_names, blocks, buckets_snapshot.size()); Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, current_block, buckets_snapshot.size());
hash_join = makeInMemoryJoin(); flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets_snapshot, bucket_index);
hash_join->addJoinedBlock(blocks[bucket_index], /* check_limits = */ false); current_block = blocks[bucket_index];
overflow = hasMemoryOverflow();
}
blocks[bucket_index].clear();
} }
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets_snapshot); hash_join = makeInMemoryJoin();
if (current_block.rows() > 0)
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
}
} }
size_t GraceHashJoin::getNumBuckets() const size_t GraceHashJoin::getNumBuckets() const

View File

@ -95,8 +95,10 @@ private:
/// Add right table block to the @join. Calls @rehash on overflow. /// Add right table block to the @join. Calls @rehash on overflow.
void addJoinedBlockImpl(Block block); void addJoinedBlockImpl(Block block);
/// Check that @join satisifes limits on rows/bytes in @table_join. /// Check that join satisifes limits on rows/bytes in table_join.
bool hasMemoryOverflow() const; bool hasMemoryOverflow() const;
/// Check that block satisifes limits on rows/bytes in table_join.
bool hasMemoryOverflow(const Block & block) const;
/// Create new bucket at the end of @destination. /// Create new bucket at the end of @destination.
void addBucket(Buckets & destination); void addBucket(Buckets & destination);
@ -114,6 +116,8 @@ private:
size_t getNumBuckets() const; size_t getNumBuckets() const;
Buckets getCurrentBuckets() const; Buckets getCurrentBuckets() const;
Block prepareRightBlock(const Block & block);
Poco::Logger * log; Poco::Logger * log;
ContextPtr context; ContextPtr context;
std::shared_ptr<TableJoin> table_join; std::shared_ptr<TableJoin> table_join;
@ -136,6 +140,7 @@ private:
mutable std::mutex current_bucket_mutex; mutable std::mutex current_bucket_mutex;
InMemoryJoinPtr hash_join; InMemoryJoinPtr hash_join;
Block hash_join_sample_block;
mutable std::mutex hash_join_mutex; mutable std::mutex hash_join_mutex;
}; };

View File

@ -469,6 +469,9 @@ bool HashJoin::alwaysReturnsEmptySet() const
size_t HashJoin::getTotalRowCount() const size_t HashJoin::getTotalRowCount() const
{ {
if (!data)
return 0;
size_t res = 0; size_t res = 0;
if (data->type == Type::CROSS) if (data->type == Type::CROSS)
@ -489,6 +492,9 @@ size_t HashJoin::getTotalRowCount() const
size_t HashJoin::getTotalByteCount() const size_t HashJoin::getTotalByteCount() const
{ {
if (!data)
return 0;
#ifdef NDEBUG #ifdef NDEBUG
size_t debug_blocks_allocated_size = 0; size_t debug_blocks_allocated_size = 0;
for (const auto & block : data->blocks) for (const auto & block : data->blocks)
@ -670,34 +676,44 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample)
} }
} }
Block HashJoin::structureRightBlock(const Block & block) const Block HashJoin::prepareRightBlock(const Block & block, const Block & saved_block_sample_)
{ {
Block structured_block; Block structured_block;
for (const auto & sample_column : savedBlockSample().getColumnsWithTypeAndName()) for (const auto & sample_column : saved_block_sample_.getColumnsWithTypeAndName())
{ {
ColumnWithTypeAndName column = block.getByName(sample_column.name); ColumnWithTypeAndName column = block.getByName(sample_column.name);
if (sample_column.column->isNullable()) if (sample_column.column->isNullable())
JoinCommon::convertColumnToNullable(column); JoinCommon::convertColumnToNullable(column);
structured_block.insert(column);
}
/// There's no optimization for right side const columns. Remove constness if any.
column.column = recursiveRemoveSparse(column.column->convertToFullColumnIfConst());
structured_block.insert(std::move(column));
}
return structured_block; return structured_block;
} }
Block HashJoin::prepareRightBlock(const Block & block) const
{
return prepareRightBlock(block, savedBlockSample());
}
bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits) bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
{ {
if (!data)
throw Exception("Join data was released", ErrorCodes::LOGICAL_ERROR);
Block structured_block = source_block;
prepareRightBlock(structured_block);
/// RowRef::SizeT is uint32_t (not size_t) for hash table Cell memory efficiency. /// RowRef::SizeT is uint32_t (not size_t) for hash table Cell memory efficiency.
/// It's possible to split bigger blocks and insert them by parts here. But it would be a dead code. /// It's possible to split bigger blocks and insert them by parts here. But it would be a dead code.
if (unlikely(source_block.rows() > std::numeric_limits<RowRef::SizeT>::max())) if (unlikely(structured_block.rows() > std::numeric_limits<RowRef::SizeT>::max()))
throw Exception("Too many rows in right table block for HashJoin: " + toString(source_block.rows()), ErrorCodes::NOT_IMPLEMENTED); throw Exception("Too many rows in right table block for HashJoin: " + toString(structured_block.rows()), ErrorCodes::NOT_IMPLEMENTED);
/// There's no optimization for right side const columns. Remove constness if any. size_t rows = structured_block.rows();
Block block = materializeBlock(source_block);
size_t rows = block.rows();
ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(block, table_join->getAllNames(JoinTableSide::Right)); ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(structured_block, table_join->getAllNames(JoinTableSide::Right));
Block structured_block = structureRightBlock(block);
size_t total_rows = 0; size_t total_rows = 0;
size_t total_bytes = 0; size_t total_bytes = 0;
{ {
@ -733,14 +749,14 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
save_nullmap |= (*null_map)[i]; save_nullmap |= (*null_map)[i];
} }
auto join_mask_col = JoinCommon::getColumnAsMask(block, onexprs[onexpr_idx].condColumnNames().second); auto join_mask_col = JoinCommon::getColumnAsMask(structured_block, onexprs[onexpr_idx].condColumnNames().second);
/// Save blocks that do not hold conditions in ON section /// Save blocks that do not hold conditions in ON section
ColumnUInt8::MutablePtr not_joined_map = nullptr; ColumnUInt8::MutablePtr not_joined_map = nullptr;
if (!multiple_disjuncts && isRightOrFull(kind) && !join_mask_col.isConstant()) if (!multiple_disjuncts && isRightOrFull(kind) && !join_mask_col.isConstant())
{ {
const auto & join_mask = join_mask_col.getData(); const auto & join_mask = join_mask_col.getData();
/// Save rows that do not hold conditions /// Save rows that do not hold conditions
not_joined_map = ColumnUInt8::create(block.rows(), 0); not_joined_map = ColumnUInt8::create(rows, 0);
for (size_t i = 0, sz = join_mask->size(); i < sz; ++i) for (size_t i = 0, sz = join_mask->size(); i < sz; ++i)
{ {
/// Condition hold, do not save row /// Condition hold, do not save row
@ -1697,7 +1713,7 @@ void HashJoin::checkTypesOfKeys(const Block & block) const
void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed) void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
{ {
if (data->released) if (!data)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released");
for (const auto & onexpr : table_join->getClauses()) for (const auto & onexpr : table_join->getClauses())
@ -1794,7 +1810,10 @@ class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller
public: public:
NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_) NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_)
: parent(parent_), max_block_size(max_block_size_), current_block_start(0) : parent(parent_), max_block_size(max_block_size_), current_block_start(0)
{} {
if (parent.data == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released");
}
Block getEmptyBlock() override { return parent.savedBlockSample().cloneEmpty(); } Block getEmptyBlock() override { return parent.savedBlockSample().cloneEmpty(); }
@ -1991,7 +2010,6 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block,
size_t left_columns_count = left_sample_block.columns(); size_t left_columns_count = left_sample_block.columns();
auto non_joined = std::make_unique<NotJoinedHash<true>>(*this, max_block_size); auto non_joined = std::make_unique<NotJoinedHash<true>>(*this, max_block_size);
return std::make_unique<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, *table_join); return std::make_unique<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, *table_join);
} }
else else
{ {
@ -2020,10 +2038,18 @@ void HashJoin::reuseJoinedData(const HashJoin & join)
} }
} }
BlocksList HashJoin::releaseJoinedBlocks() BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
{ {
BlocksList right_blocks = std::move(data->blocks); BlocksList right_blocks = std::move(data->blocks);
data->released = true; if (!restructure)
{
data.reset();
return right_blocks;
}
data->maps.clear();
data->blocks_nullmaps.clear();
BlocksList restored_blocks; BlocksList restored_blocks;
/// names to positions optimization /// names to positions optimization
@ -2052,6 +2078,7 @@ BlocksList HashJoin::releaseJoinedBlocks()
restored_blocks.emplace_back(std::move(restored_block)); restored_blocks.emplace_back(std::move(restored_block));
} }
data.reset();
return restored_blocks; return restored_blocks;
} }

View File

@ -338,7 +338,6 @@ public:
/// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows. /// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows.
Arena pool; Arena pool;
bool released = false;
size_t blocks_allocated_size = 0; size_t blocks_allocated_size = 0;
size_t blocks_nullmaps_allocated_size = 0; size_t blocks_nullmaps_allocated_size = 0;
}; };
@ -355,7 +354,13 @@ public:
void reuseJoinedData(const HashJoin & join); void reuseJoinedData(const HashJoin & join);
RightTableDataPtr getJoinedData() const { return data; } RightTableDataPtr getJoinedData() const { return data; }
BlocksList releaseJoinedBlocks(); BlocksList releaseJoinedBlocks(bool restructure = false);
/// Modify (structure) right block to save it in block list
static Block prepareRightBlock(const Block & block, const Block & saved_block_sample_);
Block prepareRightBlock(const Block & block) const;
const Block & savedBlockSample() const { return data->sample_block; }
bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); } bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); }
bool isUsed(const Block * block_ptr, size_t row_idx) const { return used_flags.getUsedSafe(block_ptr, row_idx); } bool isUsed(const Block * block_ptr, size_t row_idx) const { return used_flags.getUsedSafe(block_ptr, row_idx); }
@ -407,10 +412,6 @@ private:
void dataMapInit(MapsVariant &); void dataMapInit(MapsVariant &);
const Block & savedBlockSample() const { return data->sample_block; }
/// Modify (structure) right block to save it in block list
Block structureRightBlock(const Block & stored_block) const;
void initRightBlockStructure(Block & saved_block_sample); void initRightBlockStructure(Block & saved_block_sample);
template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps> template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps>

View File

@ -332,7 +332,11 @@ ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names)
for (const auto & column_name : names) for (const auto & column_name : names)
{ {
auto & column = block.getByName(column_name); auto & column = block.getByName(column_name);
column.column = recursiveRemoveLowCardinality(column.column->convertToFullColumnIfConst());
column.column = column.column->convertToFullColumnIfConst();
column.column = recursiveRemoveLowCardinality(column.column);
column.column = recursiveRemoveSparse(column.column);
column.type = recursiveRemoveLowCardinality(column.type); column.type = recursiveRemoveLowCardinality(column.type);
ptrs[column_name] = column.column.get(); ptrs[column_name] = column.column.get();
} }
@ -639,9 +643,8 @@ Blocks scatterBlockByHash(const Strings & key_columns_names, const Block & block
{ {
if (num_shards == 0) if (num_shards == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of shards must be positive"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of shards must be positive");
UNUSED(scatterBlockByHashPow2); if (likely(isPowerOf2(num_shards)))
// if (likely(isPowerOf2(num_shards))) return scatterBlockByHashPow2(key_columns_names, block, num_shards);
// return scatterBlockByHashPow2(key_columns_names, block, num_shards);
return scatterBlockByHashGeneric(key_columns_names, block, num_shards); return scatterBlockByHashGeneric(key_columns_names, block, num_shards);
} }