wip grace hash

This commit is contained in:
vdimir 2022-10-04 08:20:13 +00:00
parent 8ed7fb3027
commit f3781be762
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
23 changed files with 395 additions and 180 deletions

View File

@ -161,7 +161,7 @@ bool ConcurrentHashJoin::alwaysReturnsEmptySet() const
return true;
}
std::shared_ptr<NotJoinedBlocks> ConcurrentHashJoin::getNonJoinedBlocks(
std::unique_ptr<IBlocksStream> ConcurrentHashJoin::getNonJoinedBlocks(
const Block & /*left_sample_block*/, const Block & /*result_sample_block*/, UInt64 /*max_block_size*/) const
{
if (!JoinCommon::hasNonJoinedBlocks(*table_join))

View File

@ -47,7 +47,7 @@ public:
size_t getTotalByteCount() const override;
bool alwaysReturnsEmptySet() const override;
bool supportParallelJoin() const override { return true; }
std::shared_ptr<NotJoinedBlocks>
std::unique_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
private:

View File

@ -42,7 +42,7 @@ public:
virtual bool isFilled() const override { return true; }
virtual std::shared_ptr<NotJoinedBlocks>
virtual std::unique_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block &, const Block &, UInt64) const override
{
return nullptr;

View File

@ -1017,6 +1017,12 @@ static std::shared_ptr<IJoin> chooseJoinAlgorithm(
const auto & settings = context->getSettings();
Block left_sample_block(left_sample_columns);
for (auto & column : left_sample_block)
{
if (!column.column)
column.column = column.type->createColumn();
}
Block right_sample_block = joined_plan->getCurrentDataStream().header;
std::vector<String> tried_algorithms;

View File

@ -100,7 +100,7 @@ public:
bool alwaysReturnsEmptySet() const override { return false; }
std::shared_ptr<NotJoinedBlocks>
std::unique_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block & /* left_sample_block */, const Block & /* result_sample_block */, UInt64 /* max_block_size */) const override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::getNonJoinedBlocks should not be called");

View File

@ -17,6 +17,8 @@
#include <base/FnTraits.h>
#include <fmt/format.h>
#include <Formats/formatBlock.h>
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForJoin;
@ -35,6 +37,37 @@ namespace ErrorCodes
namespace
{
void debugBlock(const Block & block, size_t line, std::string msg = "", bool st = false)
{
size_t count = 0;
String colname;
if (block.has("key"))
colname = "key";
if (block.has("t2.key"))
colname = "t2.key";
if (colname.empty())
return;
auto col = block.getByName(colname).column;
for (size_t i = 0; i < col->size(); ++i)
{
if (col->get64(i) == 31)
count++;
}
if (count > 0)
{
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} AAA {}: {} | {}", __FILE__, msg, line, count, block.dumpStructure());
if (st)
{
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} {} : {}", __FILE__, __LINE__, line, StackTrace().toString());
}
}
}
class BlocksAccumulator
{
public:
@ -93,6 +126,8 @@ namespace
for (; !res; res = accumulator.peek())
{
Block tmp = reader.read();
debugBlock(tmp, __LINE__);
if (!tmp)
{
eof = true;
@ -100,6 +135,7 @@ namespace
}
accumulator.addBlock(std::move(tmp));
}
debugBlock(res, __LINE__);
return res;
}
@ -145,35 +181,66 @@ class GraceHashJoin::FileBucket : boost::noncopyable
};
public:
explicit FileBucket(size_t bucket_index_, TemporaryFileStream & left_file_, TemporaryFileStream & right_file_, const FileBucket * parent_)
struct Stats
{
TemporaryFileStream::Stat left;
TemporaryFileStream::Stat right;
};
explicit FileBucket(size_t bucket_index_,
TemporaryFileStream & left_file_,
TemporaryFileStream & right_file_,
const FileBucket * parent_,
Poco::Logger * log_)
: bucket_index{bucket_index_}
, left_file{left_file_}
, right_file{right_file_}
, parent{parent_}
, state{State::WRITING_BLOCKS}
, log(log_)
{
}
void addLeftBlock(const Block & block)
{
return addBlockImpl(block, left_file_mutex, left_file);
std::unique_lock<std::mutex> lock(left_file_mutex);
addBlockImpl(block, left_file, lock);
}
void addRightBlock(const Block & block) { return addBlockImpl(block, right_file_mutex, right_file); }
bool tryAddLeftBlock(const Block & block) { return tryAddBlockImpl(block, left_file_mutex, left_file); }
bool tryAddRightBlock(const Block & block) { return tryAddBlockImpl(block, right_file_mutex, right_file); }
void addRightBlock(const Block & block)
{
std::unique_lock<std::mutex> lock(right_file_mutex);
addBlockImpl(block, right_file, lock);
}
bool tryAddLeftBlock(const Block & block)
{
std::unique_lock<std::mutex> lock(left_file_mutex, std::try_to_lock);
return addBlockImpl(block, left_file, lock);
}
bool tryAddRightBlock(const Block & block)
{
std::unique_lock<std::mutex> lock(right_file_mutex, std::try_to_lock);
return addBlockImpl(block, right_file, lock);
}
void startJoining()
{
LOG_TRACE(log, "Joining file bucket {}", bucket_index);
ensureState(State::JOINING_BLOCKS);
left_file.finishWriting();
right_file.finishWriting();
stats.left = left_file.finishWriting();
stats.right = right_file.finishWriting();
}
size_t index() const { return bucket_index; }
bool finished() const { return state.load() == State::FINISHED; }
bool empty() const { return is_empty.load(); }
Stats getStat() const { return stats; }
bool tryLockForJoining()
{
if (parent && !parent->finished())
@ -183,7 +250,14 @@ public:
return state.compare_exchange_strong(expected, State::JOINING_BLOCKS);
}
void finish() { transition(State::JOINING_BLOCKS, State::FINISHED); }
void finish()
{
LOG_TRACE(log, "XXXX Finish joining file bucket {}, size: {} | {}",
bucket_index, stats.left.num_rows, stats.right.num_rows);
state.exchange(State::FINISHED);
// transition(State::JOINING_BLOCKS, State::FINISHED);
}
MergingBlockReader openLeftTableReader() const { return MergingBlockReader(left_file); }
@ -191,29 +265,28 @@ public:
std::mutex & joinMutex() { return join_mutex; }
~FileBucket()
{
LOG_TRACE(log, "XXXX Destroying file bucket {} - {}({}): rows: {} | {}",
bucket_index, fmt::ptr(this), fmt::ptr(parent), stats.left.num_rows, stats.right.num_rows);
}
private:
bool tryAddBlockImpl(const Block & block, std::mutex & mutex, TemporaryFileStream & writer)
bool addBlockImpl(const Block & block, TemporaryFileStream & writer, std::unique_lock<std::mutex> & lock)
{
if (block.rows())
is_empty = false;
ensureState(State::WRITING_BLOCKS);
std::unique_lock lock{mutex, std::try_to_lock};
if (!lock.owns_lock())
return false;
debugBlock(block, __LINE__, fmt::format("adding to {}", bucket_index), true);
writer.write(block);
return true;
}
void addBlockImpl(const Block & block, std::mutex & mutex, TemporaryFileStream & writer)
{
if (block.rows())
is_empty = false;
ensureState(State::WRITING_BLOCKS);
std::unique_lock lock{mutex};
writer.write(block);
}
void transition(State expected, State desired)
{
State prev = state.exchange(desired);
@ -240,6 +313,10 @@ private:
const FileBucket * parent;
std::atomic<State> state;
Stats stats;
Poco::Logger * log;
};
class GraceHashJoin::InMemoryJoin
@ -278,6 +355,8 @@ GraceHashJoin::GraceHashJoin(
, any_take_last_row{any_take_last_row_}
, max_num_buckets{context->getSettingsRef().grace_hash_join_max_buckets}
, max_block_size{context->getSettingsRef().max_block_size}
, left_key_names(table_join->getOnlyClause().key_names_left)
, right_key_names(table_join->getOnlyClause().key_names_right)
, first_bucket{makeInMemoryJoin()}
, tmp_data(std::make_unique<TemporaryDataOnDisk>(tmp_data_, CurrentMetrics::TemporaryFilesForJoin))
{
@ -310,7 +389,7 @@ bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/)
return true;
}
void GraceHashJoin::rehashInMemoryJoin(InMemoryJoinPtr & join, const BucketsSnapshot & snapshot, size_t bucket)
void GraceHashJoin::rehashInMemoryJoin(InMemoryJoinPtr & join, const Buckets & buckets_snapshot, size_t bucket)
{
std::lock_guard<std::mutex> lock{join->mutex};
@ -319,12 +398,12 @@ void GraceHashJoin::rehashInMemoryJoin(InMemoryJoinPtr & join, const BucketsSnap
for (const Block & block : right_blocks)
{
Blocks blocks = scatterBlock<true>(block, snapshot->size());
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size());
join->join->addJoinedBlock(blocks[bucket], /* check_limits = */ false);
for (size_t i = 1; i < snapshot->size(); ++i)
for (size_t i = 1; i < buckets_snapshot.size(); ++i)
{
if (i != bucket && blocks[i].rows())
snapshot->at(i)->addRightBlock(blocks[i]);
buckets_snapshot[i]->addRightBlock(blocks[i]);
}
}
}
@ -338,9 +417,9 @@ bool GraceHashJoin::fitsInMemory(InMemoryJoin * join) const
return table_join->sizeLimits().softCheck(join->join->getTotalRowCount(), join->join->getTotalByteCount());
}
void GraceHashJoin::rehashBuckets()
GraceHashJoin::Buckets GraceHashJoin::rehashBuckets()
{
std::scoped_lock lock{rehash_mutex};
std::unique_lock lock(rehash_mutex);
size_t current_size = buckets.size();
size_t next_size = current_size * 2;
@ -358,19 +437,26 @@ void GraceHashJoin::rehashBuckets()
{
addBucket(buckets, buckets[i % current_size].get());
}
return buckets;
}
void GraceHashJoin::startReadingDelayedBlocks()
{
// Drop in-memory hash join for the first bucket to reduce memory footprint.
first_bucket.reset();
std::unique_lock lock(rehash_mutex);
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} AAA startReadingDelayedBlocks {} > {} | {}", __FILE__, __LINE__,
buckets[0]->empty() ? "empty" : "not empty",
buckets[0]->getStat().left.num_rows, buckets[0]->getStat().right.num_rows);
if (!buckets[0]->finished())
buckets[0]->finish();
}
void GraceHashJoin::addBucket(Buckets & destination, const FileBucket * parent)
{
size_t index = destination.size();
destination.emplace_back(std::make_unique<FileBucket>(
index, tmp_data->createStream(left_sample_block), tmp_data->createStream(right_sample_block), parent));
auto new_bucket = std::make_unique<FileBucket>(index, tmp_data->createStream(left_sample_block), tmp_data->createStream(right_sample_block), parent, log);
destination.emplace_back(std::move(new_bucket));
}
void GraceHashJoin::checkTypesOfKeys(const Block & block) const
@ -397,14 +483,13 @@ void GraceHashJoin::joinBlock(Block & block, std::shared_ptr<ExtraBlock> & /*not
materializeBlockInplace(block);
Buckets current_buckets = getCurrentBuckets();
size_t num_buckets = current_buckets.size();
auto blocks = scatterBlock<false>(block, num_buckets);
Buckets buckets_snapshot = getCurrentBuckets();
size_t num_buckets = buckets_snapshot.size();
Blocks blocks = JoinCommon::scatterBlockByHash(left_key_names, block, num_buckets);
ExtraBlockPtr not_processed;
block = std::move(blocks[0]);
debugBlock(block, __LINE__);
first_bucket->join->joinBlock(block, not_processed);
if (not_processed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unhandled not processed block in GraceHashJoin");
@ -412,11 +497,11 @@ void GraceHashJoin::joinBlock(Block & block, std::shared_ptr<ExtraBlock> & /*not
// We need to skip the first bucket that is already joined in memory, so we start with 1.
retryForEach(
generateRandomPermutation(1, num_buckets),
[&blocks, &current_buckets](size_t idx)
[&blocks, &buckets_snapshot](size_t idx)
{
if (blocks[idx].rows() == 0)
return true;
return current_buckets.at(idx)->tryAddLeftBlock(blocks[idx]);
return buckets_snapshot[idx]->tryAddLeftBlock(blocks[idx]);
});
}
@ -424,7 +509,7 @@ void GraceHashJoin::setTotals(const Block & block)
{
if (block)
{
std::scoped_lock guard{totals_mutex};
std::lock_guard guard(totals_mutex);
totals = block;
}
}
@ -451,7 +536,7 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const
if (!isInnerOrRight(table_join->kind()))
return false;
std::unique_lock lock(rehash_mutex);
std::shared_lock lock(rehash_mutex);
bool file_buckets_are_empty = std::all_of(buckets.begin(), buckets.end(), [](const auto & bucket) { return bucket->empty(); });
bool first_bucket_is_empty = first_bucket && first_bucket->join && first_bucket->join->alwaysReturnsEmptySet();
@ -459,7 +544,7 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const
return first_bucket_is_empty && file_buckets_are_empty;
}
std::shared_ptr<NotJoinedBlocks> GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const
std::unique_ptr<NotJoinedBlocks> GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const
{
/// We do no support returning non joined blocks here.
/// They will be reported by getDelayedBlocks instead.
@ -477,6 +562,9 @@ public:
Block next() override
{
Block result = parent->joinNextBlockInBucket(*this);
if (result)
debugBlock(result, __LINE__);
if (result)
return result;
@ -497,7 +585,7 @@ public:
MergingBlockReader left_reader;
InMemoryJoinPtr join;
bool process_not_joined = true;
std::shared_ptr<NotJoinedBlocks> not_joined_blocks;
std::unique_ptr<NotJoinedBlocks> not_joined_blocks;
};
std::unique_ptr<IDelayedJoinedBlocksStream> GraceHashJoin::getDelayedBlocks(IDelayedJoinedBlocksStream * prev_cursor)
@ -512,10 +600,26 @@ std::unique_ptr<IDelayedJoinedBlocksStream> GraceHashJoin::getDelayedBlocks(IDel
startReadingDelayedBlocks();
}
auto snapshot = buckets.get();
for (size_t i = 1; i < snapshot->size(); ++i)
auto snapshot = getCurrentBuckets();
for (size_t buckets_left = snapshot.size() - 1, i = 0; buckets_left > 0; ++i)
{
FileBucket * bucket = snapshot->at(i).get();
auto & bucket = snapshot[i % (snapshot.size() - 1) + 1];
if (bucket == nullptr)
continue;
// LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} {}({} | {}) / {} -> {} {}", __FILE__, __LINE__,
// i, i % (snapshot.size() - 1) + 1,
// bucket->index(), snapshot.size(), bucket->finished() ? "finished" : "not finished",
// bucket->empty() ? "empty" : "not empty");
if (bucket->finished())
{
--buckets_left;
bucket.reset();
continue;
}
if (!bucket->tryLockForJoining())
{
continue;
@ -528,8 +632,8 @@ std::unique_ptr<IDelayedJoinedBlocksStream> GraceHashJoin::getDelayedBlocks(IDel
else
{
InMemoryJoinPtr join = makeInMemoryJoin();
fillInMemoryJoin(join, bucket);
return std::make_unique<DelayedBlocks>(this, bucket, std::move(join));
fillInMemoryJoin(join, bucket.get());
return std::make_unique<DelayedBlocks>(this, bucket.get(), std::move(join));
}
}
@ -541,16 +645,23 @@ std::unique_ptr<IDelayedJoinedBlocksStream> GraceHashJoin::getDelayedBlocks(IDel
Block GraceHashJoin::joinNextBlockInBucket(DelayedBlocks & iterator)
{
Block block;
size_t cur_index = iterator.bucket->index();
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} joinNextBlockInBucket {}", __FILE__, __LINE__, cur_index);
do
{
block = iterator.left_reader.read();
if (!block) // EOF
return block;
auto current_buckets = getCurrentBuckets();
size_t num_buckets = current_buckets.size();
Blocks blocks = scatterBlock<false>(block, num_buckets);
auto buckets_snapshot = getCurrentBuckets();
size_t num_buckets = buckets_snapshot.size();
debugBlock(block, __LINE__);
Blocks blocks = JoinCommon::scatterBlockByHash(left_key_names, block, num_buckets);
for (size_t i = 0; i< blocks.size(); ++i)
{
debugBlock(block, __LINE__, fmt::format("virtually moved {} -> {}", cur_index, i));
}
block.clear();
// We need to filter out blocks that were written to the current bucket B0,
@ -567,12 +678,16 @@ Block GraceHashJoin::joinNextBlockInBucket(DelayedBlocks & iterator)
block = std::move(blocks[i]);
else // Rows that were moved after rehashing
current_buckets[i]->addLeftBlock(blocks[i]);
buckets_snapshot[i]->addLeftBlock(blocks[i]);
}
} while (block.rows() == 0);
debugBlock(block, __LINE__);
ExtraBlockPtr not_processed;
iterator.join->join->joinBlock(block, not_processed);
iterator.join->join->debugKeys();
debugBlock(block, __LINE__);
if (not_processed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupported hash join type");
@ -591,23 +706,30 @@ void GraceHashJoin::fillInMemoryJoin(InMemoryJoinPtr & join, FileBucket * bucket
while (auto block = reader.read())
{
debugBlock(block, __LINE__);
addJoinedBlockImpl(join, bucket->index(), block);
}
}
void GraceHashJoin::addJoinedBlockImpl(InMemoryJoinPtr & join, size_t bucket_index, const Block & block)
{
BucketsSnapshot snapshot = buckets.get();
Blocks blocks = scatterBlock<true>(block, snapshot->size());
Buckets buckets_snapshot = getCurrentBuckets();
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size());
debugBlock(block, __LINE__);
// Add block to the in-memory join
{
auto bucket = snapshot->at(bucket_index);
std::scoped_lock guard{bucket->joinMutex()};
auto bucket = buckets_snapshot[bucket_index];
std::lock_guard guard(bucket->joinMutex());
debugBlock(blocks[bucket_index], __LINE__);
join->join->addJoinedBlock(blocks[bucket_index], /*check_limits=*/false);
// We need to rebuild block without bucket_index part in case of overflow.
bool overflow = !fitsInMemory(join.get());
Block to_write;
if (overflow)
{
@ -615,12 +737,40 @@ void GraceHashJoin::addJoinedBlockImpl(InMemoryJoinPtr & join, size_t bucket_ind
to_write = concatenateBlocks(blocks);
}
size_t overflow_cnt = 0;
while (overflow)
{
rehashBuckets();
rehashInMemoryJoin(join, bucket_index);
blocks = scatterBlock<true>(to_write, snapshot->size());
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} size of join {}: {} {} - {} {}", __FILE__, __LINE__,
bucket_index,
join->join->getTotalRowCount(),
join->join->getTotalByteCount(),
overflow ? "overflow" : "fits", overflow_cnt);
buckets_snapshot = rehashBuckets();
rehashInMemoryJoin(join, buckets_snapshot, bucket_index);
blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets_snapshot.size());
{
WriteBufferFromOwnString out;
auto output_format = context->getOutputFormat("PrettyCompactMonoBlock", out, block);
formatBlock(output_format, block);
auto block_string = out.str();
Strings sizes;
for (size_t i = 0; i < blocks.size(); ++i)
{
auto & b = blocks[i];
if (b.rows())
sizes.emplace_back(fmt::format("[{}] - {}", i, b.rows()));
}
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} overflow({}) block\n{} -> [{}]", __FILE__, __LINE__, overflow_cnt, block_string, fmt::join(sizes, ", "));
}
overflow = !fitsInMemory(join.get());
overflow_cnt++;
}
}
@ -629,15 +779,14 @@ void GraceHashJoin::addJoinedBlockImpl(InMemoryJoinPtr & join, size_t bucket_ind
return;
// Write the rest of the blocks to the disk buckets
assert(blocks.size() == snapshot->size());
auto indices = generateRandomPermutation(1, snapshot->size());
assert(blocks.size() == buckets_snapshot.size());
retryForEach(
indices,
[&](size_t bucket)
generateRandomPermutation(1, buckets_snapshot.size()),
[&](size_t i)
{
if (bucket == bucket_index || !blocks[bucket].rows())
if (i == bucket_index || !blocks[i].rows())
return true;
return snapshot->at(bucket)->tryAddRightBlock(blocks[bucket]);
return buckets_snapshot[i]->tryAddRightBlock(blocks[i]);
});
}
@ -647,27 +796,10 @@ size_t GraceHashJoin::getNumBuckets() const
return buckets.size();
}
Buckets GraceHashJoin::getCurrentBuckets() const
GraceHashJoin::Buckets GraceHashJoin::getCurrentBuckets() const
{
std::lock_guard lock{rehash_mutex};
std::shared_lock lock(rehash_mutex);
return buckets;
}
template <bool right>
Blocks GraceHashJoin::scatterBlock(const Block & block, size_t shards) const
{
if (!block)
{
return {};
}
const Names & key_names = [](const TableJoin::JoinOnClause & clause) -> auto &
{
return right ? clause.key_names_right : clause.key_names_left;
}
(table_join->getOnlyClause());
return JoinCommon::scatterBlockByHash(key_names, block, shards);
}
}

View File

@ -46,7 +46,8 @@ class GraceHashJoin final : public IJoin
class DelayedBlocks;
class InMemoryJoin;
using Buckets = std::vector<std::shared_ptr<FileBucket>>;
using BucketPtr = std::shared_ptr<FileBucket>;
using Buckets = std::vector<BucketPtr>;
using InMemoryJoinPtr = std::unique_ptr<InMemoryJoin>;
public:
@ -73,7 +74,7 @@ public:
bool supportParallelJoin() const override { return true; }
std::shared_ptr<NotJoinedBlocks>
std::unique_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
/// Open iterator over joined blocks.
@ -94,7 +95,7 @@ private:
/// Add right table block to the @join. Calls @rehash on overflow.
void addJoinedBlockImpl(InMemoryJoinPtr & join, size_t bucket_index, const Block & block);
/// Rebuild @join after rehash: scatter the blocks in join and write parts that belongs to the other shards to disk.
void rehashInMemoryJoin(InMemoryJoinPtr & join, const BucketsSnapshot & snapshot, size_t bucket);
void rehashInMemoryJoin(InMemoryJoinPtr & join, const Buckets & buckets_snapshot, size_t bucket);
/// Check that @join satisifes limits on rows/bytes in @table_join.
bool fitsInMemory(InMemoryJoin * join) const;
@ -108,7 +109,8 @@ private:
///
/// NB: after @rehashBuckets there may be rows that are written to the buckets that they do not belong to.
/// It is fine; these rows will be written to the corresponding buckets during the third stage.
void rehashBuckets();
Buckets rehashBuckets();
/// Perform some bookkeeping after all calls to @joinBlock.
void startReadingDelayedBlocks();
@ -126,12 +128,15 @@ private:
size_t max_num_buckets;
size_t max_block_size;
Names left_key_names;
Names right_key_names;
InMemoryJoinPtr first_bucket;
TemporaryDataOnDiskPtr tmp_data;
Buckets buckets;
std::shared_mutex rehash_mutex;
mutable std::shared_mutex rehash_mutex;
std::atomic<bool> started_reading_delayed_blocks{false};

View File

@ -3,6 +3,7 @@
#include <unordered_map>
#include <vector>
#include <Common/StackTrace.h>
#include <Common/logger_useful.h>
#include <Columns/ColumnConst.h>
@ -676,8 +677,30 @@ Block HashJoin::structureRightBlock(const Block & block) const
return structured_block;
}
static void debugBlock(const Block & block, size_t line, const void * inst)
{
size_t count = 0;
if (!block.has("t2.key"))
{
return;
}
auto col = block.getByName("t2.key").column;
for (size_t i = 0; i < col->size(); ++i)
{
if (col->get64(i) == 54)
count++;
}
if (count > 1)
{
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} [{}] AAA: {} | {} | {}", __FILE__, line, inst, count, block.dumpStructure(), StackTrace().toString() );
}
}
bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
{
debugBlock(source_block, __LINE__, fmt::ptr(this));
/// RowRef::SizeT is uint32_t (not size_t) for hash table Cell memory efficiency.
/// It's possible to split bigger blocks and insert them by parts here. But it would be a dead code.
if (unlikely(source_block.rows() > std::numeric_limits<RowRef::SizeT>::max()))
@ -1679,6 +1702,71 @@ void HashJoin::checkTypesOfKeys(const Block & block) const
}
}
template <typename Mapped, typename Msg>
static void debugRowRef(const Mapped & mapped, const Msg & msg)
{
UNUSED(mapped);
UNUSED(msg);
if constexpr (std::is_same_v<Mapped, RowRefList>)
{
std::vector<String> ss;
std::set<const Block *> blocks;
auto it = mapped.begin();
while (it.ok())
{
ss.push_back(fmt::format("{}:{}", it->row_num, fmt::ptr(it->block)));
blocks.insert(it->block);
++it;
}
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} {}", __FILE__, __LINE__, fmt::join(ss, ","));
for (const auto & block : blocks)
{
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} {} - {}", __FILE__, __LINE__,
fmt::ptr(block), block->dumpStructure());
}
}
else
{
LOG_DEBUG(&Poco::Logger::get("XXXX"), "{}:{} {}", __FILE__, __LINE__, typeid(Mapped).name());
}
}
void HashJoin::debugKeys() const
{
if (data)
return;
for (const auto & map : data->maps)
{
joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_)
{
auto cb = [this](const auto & rr) { debugRowRef(rr, fmt::ptr(this)); };
if (map_.key8)
map_.key8->forEachMapped(cb);
if (map_.key16)
map_.key16->forEachMapped(cb);
if (map_.key32)
map_.key32->forEachMapped(cb);
if (map_.key64)
map_.key64->forEachMapped(cb);
if (map_.key_string)
map_.key_string->forEachMapped(cb);
if (map_.key_fixed_string)
map_.key_fixed_string->forEachMapped(cb);
if (map_.keys128)
map_.keys128->forEachMapped(cb);
if (map_.keys256)
map_.keys256->forEachMapped(cb);
if (map_.hashed)
map_.hashed->forEachMapped(cb);
});
}
}
void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
{
for (const auto & onexpr : table_join->getClauses())
@ -1948,7 +2036,7 @@ private:
}
};
std::shared_ptr<NotJoinedBlocks> HashJoin::getNonJoinedBlocks(const Block & left_sample_block,
std::unique_ptr<NotJoinedBlocks> HashJoin::getNonJoinedBlocks(const Block & left_sample_block,
const Block & result_sample_block,
UInt64 max_block_size) const
{
@ -1962,7 +2050,7 @@ std::shared_ptr<NotJoinedBlocks> HashJoin::getNonJoinedBlocks(const Block & left
/// ... calculate `left_columns_count` ...
size_t left_columns_count = left_sample_block.columns();
auto non_joined = std::make_unique<NotJoinedHash<true>>(*this, max_block_size);
return std::make_shared<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
return std::make_unique<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
}
else
@ -1970,7 +2058,7 @@ std::shared_ptr<NotJoinedBlocks> HashJoin::getNonJoinedBlocks(const Block & left
size_t left_columns_count = left_sample_block.columns();
assert(left_columns_count == result_sample_block.columns() - required_right_keys.columns() - sample_block_with_columns_to_add.columns());
auto non_joined = std::make_unique<NotJoinedHash<false>>(*this, max_block_size);
return std::make_shared<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
return std::make_unique<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
}
}

View File

@ -186,7 +186,7 @@ public:
* Use only after all calls to joinBlock was done.
* left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside).
*/
std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(
std::unique_ptr<NotJoinedBlocks> getNonJoinedBlocks(
const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
/// Number of keys in all built JOIN maps.
@ -354,6 +354,8 @@ public:
bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); }
bool isUsed(const Block * block_ptr, size_t row_idx) const { return used_flags.getUsedSafe(block_ptr, row_idx); }
void debugKeys() const;
private:
template<bool> friend class NotJoinedHash;

View File

@ -11,14 +11,15 @@
namespace DB
{
class Block;
struct ExtraBlock;
using ExtraBlockPtr = std::shared_ptr<ExtraBlock>;
class TableJoin;
class NotJoinedBlocks;
class IDelayedJoinedBlocksStream;
class IBlocksStream;
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
enum class JoinPipelineType
{
@ -79,17 +80,33 @@ public:
// That can run FillingRightJoinSideTransform parallelly
virtual bool supportParallelJoin() const { return false; }
virtual std::shared_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const = 0;
/// Peek next stream of delayed joined blocks.
virtual std::unique_ptr<IDelayedJoinedBlocksStream> getDelayedBlocks(IDelayedJoinedBlocksStream * /*prev_cursor*/) { return nullptr; }
virtual std::unique_ptr<IBlocksStream> getDelayedBlocks(
const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size)
{
if (non_joined_returned.exchange(true))
return nullptr;
return getNonJoinedBlocks(left_sample_block, result_sample_block, max_block_size);
}
/// TODO(vdimir@): make private
virtual std::unique_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const = 0;
private:
Block totals;
std::atomic_bool non_joined_returned = false;
};
class IBlocksStream
{
public:
/// Returns empty block on EOF
virtual Block next() = 0;
using JoinPtr = std::shared_ptr<IJoin>;
virtual ~IBlocksStream() = default;
};
}

View File

@ -63,7 +63,7 @@ public:
return join->alwaysReturnsEmptySet();
}
std::shared_ptr<NotJoinedBlocks>
std::unique_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override
{
return join->getNonJoinedBlocks(left_sample_block, result_sample_block, max_block_size);

View File

@ -597,7 +597,7 @@ static Blocks scatterBlockByHashImpl(const Strings & key_columns_names, const Bl
size_t num_cols = block.columns();
/// Use non-standard initial value so as not to degrade hash map performance inside shard that uses the same CRC32 algorithm.
WeakHash32 hash(num_rows, 0x3d2738a3u);
WeakHash32 hash(num_rows);
for (const auto & key_name : key_columns_names)
{
ColumnPtr key_col = materializeColumn(block, key_name);
@ -639,8 +639,9 @@ Blocks scatterBlockByHash(const Strings & key_columns_names, const Block & block
{
if (num_shards == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of shards must be positive");
if (likely(isPowerOf2(num_shards)))
return scatterBlockByHashPow2(key_columns_names, block, num_shards);
UNUSED(scatterBlockByHashPow2);
// if (likely(isPowerOf2(num_shards)))
// return scatterBlockByHashPow2(key_columns_names, block, num_shards);
return scatterBlockByHashGeneric(key_columns_names, block, num_shards);
}
@ -812,7 +813,7 @@ void NotJoinedBlocks::copySameKeys(Block & block) const
}
}
Block NotJoinedBlocks::read()
Block NotJoinedBlocks::next()
{
Block result_block = result_sample_block.cloneEmpty();
{

View File

@ -116,7 +116,7 @@ ColumnPtr filterWithBlanks(ColumnPtr src_column, const IColumn::Filter & filter,
}
/// Creates result from right table data in RIGHT and FULL JOIN when keys are not present in left table.
class NotJoinedBlocks final
class NotJoinedBlocks final : public IBlocksStream
{
public:
using LeftToRightKeyRemap = std::unordered_map<String, String>;
@ -138,7 +138,7 @@ public:
size_t left_columns_count,
const LeftToRightKeyRemap & left_to_right_key_remap);
Block read();
Block next() override;
private:
void extractColumnChanges(size_t right_pos, size_t result_pos);
@ -170,15 +170,4 @@ private:
void setRightIndex(size_t right_pos, size_t result_position);
};
/// Iterator over delayed joined blocks.
/// Used by GraceHashJoin which must accumulate all blocks from the left table before actual processing.
class IDelayedJoinedBlocksStream
{
public:
virtual ~IDelayedJoinedBlocksStream() = default;
/// Returns empty block on EOF.
virtual Block next() = 0;
};
}

View File

@ -1114,7 +1114,7 @@ private:
};
std::shared_ptr<NotJoinedBlocks> MergeJoin::getNonJoinedBlocks(
std::unique_ptr<NotJoinedBlocks> MergeJoin::getNonJoinedBlocks(
const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const
{
if (table_join->strictness() == JoinStrictness::All && (is_right || is_full))
@ -1122,7 +1122,7 @@ std::shared_ptr<NotJoinedBlocks> MergeJoin::getNonJoinedBlocks(
size_t left_columns_count = left_sample_block.columns();
assert(left_columns_count == result_sample_block.columns() - right_columns_to_add.columns());
auto non_joined = std::make_unique<NotJoinedMerge>(*this, max_block_size);
return std::make_shared<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
return std::make_unique<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, table_join->leftToRightKeyRemap());
}
return nullptr;
}

View File

@ -35,7 +35,7 @@ public:
/// Has to be called only after setTotals()/mergeRightBlocks()
bool alwaysReturnsEmptySet() const override { return (is_right || is_inner) && min_max_right_blocks.empty(); }
std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
std::unique_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
static bool isSupported(const std::shared_ptr<TableJoin> & table_join);

View File

@ -7,6 +7,7 @@
#include <Formats/NativeWriter.h>
#include <Formats/NativeReader.h>
#include <Core/ProtocolDefines.h>
#include <fmt/format.h>
#include <Common/logger_useful.h>
@ -94,6 +95,7 @@ struct TemporaryFileStream::OutputWriter
if (finalized)
throw Exception("Cannot write to finalized stream", ErrorCodes::LOGICAL_ERROR);
out_writer.write(block);
num_rows += block.rows();
}
@ -127,6 +129,8 @@ struct TemporaryFileStream::OutputWriter
CompressedWriteBuffer out_compressed_buf;
NativeWriter out_writer;
std::atomic_size_t num_rows = 0;
bool finalized = false;
};
@ -229,6 +233,7 @@ void TemporaryFileStream::updateAllocAndCheck()
parent->deltaAllocAndCheck(new_compressed_size - stat.compressed_size, new_uncompressed_size - stat.uncompressed_size);
stat.compressed_size = new_compressed_size;
stat.uncompressed_size = new_uncompressed_size;
stat.num_rows = out_writer->num_rows;
}
bool TemporaryFileStream::isFinalized() const

View File

@ -113,6 +113,7 @@ public:
/// Non-atomic because we don't allow to `read` or `write` into single file from multiple threads
size_t compressed_size = 0;
size_t uncompressed_size = 0;
size_t num_rows = 0;
};
TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_);
@ -151,3 +152,19 @@ private:
};
}
template<>
struct fmt::formatter<DB::TemporaryFileStream::Stat>
{
template<typename ParseContext>
constexpr auto parse(ParseContext & context)
{
return context.begin();
}
template<typename FormatContext>
auto format(const DB::TemporaryFileStream::Stat & stat, FormatContext & context)
{
return fmt::format_to(context.out(), "{}/{} - {}", stat.compressed_size, stat.uncompressed_size, stat.num_rows);
}
};

View File

@ -1,7 +1,6 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/Transforms/CreateSetAndFilterOnTheFlyTransform.h>
#include <Processors/DelayedPortsProcessor.h>
namespace DB

View File

@ -136,7 +136,7 @@ void JoiningTransform::work()
return;
}
non_joined_blocks = join->getNonJoinedBlocks(
non_joined_blocks = join->getDelayedBlocks(
inputs.front().getHeader(), outputs.front().getHeader(), max_block_size);
if (!non_joined_blocks)
{

View File

@ -81,7 +81,7 @@ private:
ExtraBlockPtr not_processed;
FinishCounterPtr finish_counter;
std::shared_ptr<NotJoinedBlocks> non_joined_blocks;
std::unique_ptr<IBlocksStream> non_joined_blocks;
size_t max_block_size;
Block readExecute(Chunk & chunk);

View File

@ -484,7 +484,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
// Process DelayedJoinedBlocksTransform after all JoiningTransforms.
auto joined_header = JoiningTransform::transformHeader(left_header, join);
auto delayed_processor = std::make_shared<DelayedPortsProcessor>(joined_header, num_streams + num_streams, delayed_ports_numbers);
auto delayed_processor = std::make_shared<DelayedPortsProcessor>(joined_header, 2 * num_streams, delayed_ports_numbers);
if (collected_processors)
collected_processors->emplace_back(delayed_processor);
left->pipe.processors.emplace_back(delayed_processor);

View File

@ -1,52 +1,3 @@
-- full_sorting_merge --
ALL INNER
500353531835 500353531835 1000342 1000342 1000342
ALL LEFT
50195752660639 500353531835 10369589 10369589 1000342
ALL RIGHT
500353531835 684008812186 1367170 1000342 1367170
ALL INNER
500353531835 500353531835 1000342 1000342 1000342
ALL LEFT
50195752660639 500353531835 10369589 10369589 1000342
ALL RIGHT
500353531835 684008812186 1367170 1000342 1367170
ALL INNER
500353531835 500353531835 1000342 1000342 1000342
ALL LEFT
50195752660639 500353531835 10369589 10369589 1000342
ALL RIGHT
500353531835 684008812186 1367170 1000342 1367170
ALL INNER
500353531835 500353531835 1000342 1000342 1000342
ALL LEFT
50195752660639 500353531835 10369589 10369589 1000342
ALL RIGHT
500353531835 684008812186 1367170 1000342 1367170
ANY INNER
199622811843 199622811843 399458 399458 399458
ANY LEFT
50010619420459 315220291655 10000000 10000000 630753
ANY RIGHT
316611844056 500267124407 1000000 633172 1000000
ANY INNER
199622811843 199622811843 399458 399458 399458
ANY LEFT
50010619420459 315220291655 10000000 10000000 630753
ANY RIGHT
316611844056 500267124407 1000000 633172 1000000
ANY INNER
199622811843 199622811843 399458 399458 399458
ANY LEFT
50010619420459 315220291655 10000000 10000000 630753
ANY RIGHT
316611844056 500267124407 1000000 633172 1000000
ANY INNER
199622811843 199622811843 399458 399458 399458
ANY LEFT
50010619420459 315220291655 10000000 10000000 630753
ANY RIGHT
316611844056 500267124407 1000000 633172 1000000
-- grace_hash --
ALL INNER
500353531835 500353531835 1000342 1000342 1000342

View File

@ -2,6 +2,7 @@
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1 (key UInt32, s String) ENGINE = MergeTree ORDER BY key;
CREATE TABLE t2 (key UInt32, s String) ENGINE = MergeTree ORDER BY key;
@ -25,10 +26,10 @@ INSERT INTO t2
{% macro is_implemented(join_algorithm) -%}
{% if join_algorithm == 'grace_hash' %} -- { serverError NOT_IMPLEMENTED }
SELECT 'skipped';
{% endif %}
{% endif -%}
{% endmacro -%}
{% for join_algorithm in ['full_sorting_merge', 'grace_hash'] -%}
{% for join_algorithm in ['grace_hash'] -%}
SET max_bytes_in_join = '{% if join_algorithm == 'grace_hash' %}1M{% else %}0{% endif %}';
@ -36,11 +37,13 @@ SELECT '-- {{ join_algorithm }} --';
SET join_algorithm = '{{ join_algorithm }}';
{% for kind in ['ALL', 'ANY'] -%}
{% for block_size in [32001, 65505, 65536, range(32001, 65536) | random] %}
{% for block_size in [0, 32001, 65505, 65536] %}
{% if block_size -%}
SET max_block_size = {{ block_size }};
{%- endif %}
{% if not (kind == 'ANY' and join_algorithm == 'grace_hash') %}
{% if not (kind == 'ANY' and join_algorithm == 'grace_hash') -%}
SELECT '{{ kind }} INNER';
SELECT sum(t1.key), sum(t2.key), count(), countIf(t1.key != 0), countIf(t2.key != 0) FROM t1
@ -60,7 +63,7 @@ SELECT sum(t1.key), sum(t2.key), count(), countIf(t1.key != 0), countIf(t2.key !
ON t1.key == t2.key
; {{ is_implemented(join_algorithm) }}
{% endif %}
{% endif -%}
{% endfor -%}
{% endfor -%}