Merge pull request #51852 from ClickHouse/fix-misleading-naming-in-joins

Change misleading name in joins: addJoinedBlock -> addBlockToJoin
This commit is contained in:
Igor Nikonov 2023-07-06 17:27:26 +02:00 committed by GitHub
commit 83af43b8ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 64 additions and 67 deletions

View File

@ -9,7 +9,6 @@ Columns:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — Event date.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Event time.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — Event time with microseconds resolution.
- `name` ([String](../../sql-reference/data-types/string.md)) — Metric name.
- `value` ([Float64](../../sql-reference/data-types/float.md)) — Metric value.
@ -20,18 +19,18 @@ SELECT * FROM system.asynchronous_metric_log LIMIT 10
```
``` text
┌─event_date─┬──────────event_time─┬────event_time_microseconds─┬─name─────────────────────────────────────┬─────value─┐
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ CPUFrequencyMHz_0 │ 2120.9 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pmuzzy │ 743 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pdirty │ 26288 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.run_intervals │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.num_runs │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.retained │ 60694528 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.mapped │ 303161344 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.resident │ 260931584 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.metadata │ 12079488 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.allocated │ 133756128 │
└────────────┴─────────────────────┴────────────────────────────┴──────────────────────────────────────────┴───────────┘
┌─event_date─┬──────────event_time─┬─name─────────────────────────────────────┬─────value─┐
│ 2020-09-05 │ 2020-09-05 15:56:30 │ CPUFrequencyMHz_0 │ 2120.9 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pmuzzy │ 743 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pdirty │ 26288 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.run_intervals │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.num_runs │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.retained │ 60694528 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.mapped │ 303161344 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.resident │ 260931584 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.metadata │ 12079488 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.allocated │ 133756128 │
└────────────┴─────────────────────┴──────────────────────────────────────────┴───────────┘
```
**See Also**

View File

@ -8,7 +8,6 @@ slug: /ru/operations/system-tables/asynchronous_metric_log
Столбцы:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — дата события.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — время события.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — время события в микросекундах.
- `name` ([String](../../sql-reference/data-types/string.md)) — название метрики.
- `value` ([Float64](../../sql-reference/data-types/float.md)) — значение метрики.

View File

@ -8,7 +8,6 @@ slug: /zh/operations/system-tables/asynchronous_metric_log
列:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — 事件日期。
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — 事件时间。
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — 事件时间(微秒)。
- `name` ([String](../../sql-reference/data-types/string.md)) — 指标名。
- `value` ([Float64](../../sql-reference/data-types/float.md)) — 指标值。
@ -17,18 +16,18 @@ slug: /zh/operations/system-tables/asynchronous_metric_log
SELECT * FROM system.asynchronous_metric_log LIMIT 10
```
``` text
┌─event_date─┬──────────event_time─┬────event_time_microseconds─┬─name─────────────────────────────────────┬─────value─┐
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ CPUFrequencyMHz_0 │ 2120.9 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pmuzzy │ 743 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pdirty │ 26288 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.run_intervals │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.num_runs │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.retained │ 60694528 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.mapped │ 303161344 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.resident │ 260931584 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.metadata │ 12079488 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.allocated │ 133756128 │
└────────────┴─────────────────────┴────────────────────────────┴──────────────────────────────────────────┴───────────┘
┌─event_date─┬──────────event_time─┬─name─────────────────────────────────────┬─────value─┐
│ 2020-09-05 │ 2020-09-05 15:56:30 │ CPUFrequencyMHz_0 │ 2120.9 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pmuzzy │ 743 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pdirty │ 26288 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.run_intervals │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.num_runs │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.retained │ 60694528 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.mapped │ 303161344 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.resident │ 260931584 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.metadata │ 12079488 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.allocated │ 133756128 │
└────────────┴─────────────────────┴──────────────────────────────────────────┴───────────┘
```
**另请参阅**

View File

@ -49,7 +49,7 @@ ConcurrentHashJoin::ConcurrentHashJoin(ContextPtr context_, std::shared_ptr<Tabl
}
}
bool ConcurrentHashJoin::addJoinedBlock(const Block & right_block, bool check_limits)
bool ConcurrentHashJoin::addBlockToJoin(const Block & right_block, bool check_limits)
{
Blocks dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_right, right_block);
@ -77,7 +77,7 @@ bool ConcurrentHashJoin::addJoinedBlock(const Block & right_block, bool check_li
if (!lock.owns_lock())
continue;
bool limit_exceeded = !hash_join->data->addJoinedBlock(dispatched_block, check_limits);
bool limit_exceeded = !hash_join->data->addBlockToJoin(dispatched_block, check_limits);
dispatched_block = {};
blocks_left--;

View File

@ -16,13 +16,13 @@ namespace DB
{
/**
* Can run addJoinedBlock() parallelly to speedup the join process. On test, it almose linear speedup by
* Can run addBlockToJoin() parallelly to speedup the join process. On test, it almose linear speedup by
* the degree of parallelism.
*
* The default HashJoin is not thread safe for inserting right table's rows and run it in a single thread. When
* the right table is large, the join process is too slow.
*
* We create multiple HashJoin instances here. In addJoinedBlock(), one input block is split into multiple blocks
* We create multiple HashJoin instances here. In addBlockToJoin(), one input block is split into multiple blocks
* corresponding to the HashJoin instances by hashing every row on the join keys. And make a guarantee that every HashJoin
* instance is written by only one thread.
*
@ -37,7 +37,7 @@ public:
~ConcurrentHashJoin() override = default;
const TableJoin & getTableJoin() const override { return *table_join; }
bool addJoinedBlock(const Block & block, bool check_limits) override;
bool addBlockToJoin(const Block & block, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override;
void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) override;
void setTotals(const Block & block) override;

View File

@ -103,7 +103,7 @@ DirectKeyValueJoin::DirectKeyValueJoin(
right_sample_block_with_storage_column_names = right_sample_block_with_storage_column_names_;
}
bool DirectKeyValueJoin::addJoinedBlock(const Block &, bool)
bool DirectKeyValueJoin::addBlockToJoin(const Block &, bool)
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable code reached");
}

View File

@ -32,10 +32,10 @@ public:
virtual const TableJoin & getTableJoin() const override { return *table_join; }
virtual bool addJoinedBlock(const Block &, bool) override;
virtual bool addBlockToJoin(const Block &, bool) override;
virtual void checkTypesOfKeys(const Block &) const override;
/// Join the block with data from left hand of JOIN to the right hand data (that was previously built by calls to addJoinedBlock).
/// Join the block with data from left hand of JOIN to the right hand data (that was previously built by calls to addBlockToJoin).
/// Could be called from different threads in parallel.
virtual void joinBlock(Block & block, std::shared_ptr<ExtraBlock> &) override;

View File

@ -30,9 +30,9 @@ public:
const TableJoin & getTableJoin() const override { return *table_join; }
bool addJoinedBlock(const Block & /* block */, bool /* check_limits */) override
bool addBlockToJoin(const Block & /* block */, bool /* check_limits */) override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::addJoinedBlock should not be called");
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::addBlockToJoin should not be called");
}
static bool isSupported(const std::shared_ptr<TableJoin> & table_join)

View File

@ -310,13 +310,13 @@ bool GraceHashJoin::isSupported(const std::shared_ptr<TableJoin> & table_join)
GraceHashJoin::~GraceHashJoin() = default;
bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/)
bool GraceHashJoin::addBlockToJoin(const Block & block, bool /*check_limits*/)
{
if (current_bucket == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "GraceHashJoin is not initialized");
Block materialized = materializeBlock(block);
addJoinedBlockImpl(std::move(materialized));
addBlockToJoinImpl(std::move(materialized));
return true;
}
@ -596,7 +596,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
while (Block block = right_reader.read())
{
num_rows += block.rows();
addJoinedBlockImpl(std::move(block));
addBlockToJoinImpl(std::move(block));
}
LOG_TRACE(log, "Loaded bucket {} with {}(/{}) rows",
@ -621,7 +621,7 @@ Block GraceHashJoin::prepareRightBlock(const Block & block)
return HashJoin::prepareRightBlock(block, hash_join_sample_block);
}
void GraceHashJoin::addJoinedBlockImpl(Block block)
void GraceHashJoin::addBlockToJoinImpl(Block block)
{
block = prepareRightBlock(block);
Buckets buckets_snapshot = getCurrentBuckets();
@ -646,7 +646,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
if (!hash_join)
hash_join = makeInMemoryJoin();
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
hash_join->addBlockToJoin(current_block, /* check_limits = */ false);
if (!hasMemoryOverflow(hash_join))
return;
@ -677,7 +677,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
hash_join = makeInMemoryJoin();
if (current_block.rows() > 0)
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
hash_join->addBlockToJoin(current_block, /* check_limits = */ false);
}
}

View File

@ -23,11 +23,11 @@ class HashJoin;
*
* The joining algorithm consists of three stages:
*
* 1) During the first stage we accumulate blocks of the right table via @addJoinedBlock.
* 1) During the first stage we accumulate blocks of the right table via @addBlockToJoin.
* Each input block is split into multiple buckets based on the hash of the row join keys.
* The first bucket is added to the in-memory HashJoin, and the remaining buckets are written to disk for further processing.
* When the size of HashJoin exceeds the limits, we double the number of buckets.
* There can be multiple threads calling addJoinedBlock, just like @ConcurrentHashJoin.
* There can be multiple threads calling addBlockToJoin, just like @ConcurrentHashJoin.
*
* 2) At the second stage we process left table blocks via @joinBlock.
* Again, each input block is split into multiple buckets by hash.
@ -65,7 +65,7 @@ public:
void initialize(const Block & sample_block) override;
bool addJoinedBlock(const Block & block, bool check_limits) override;
bool addBlockToJoin(const Block & block, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override;
void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) override;
@ -94,7 +94,7 @@ private:
InMemoryJoinPtr makeInMemoryJoin();
/// Add right table block to the @join. Calls @rehash on overflow.
void addJoinedBlockImpl(Block block);
void addBlockToJoinImpl(Block block);
/// Check that join satisfies limits on rows/bytes in table_join.
bool hasMemoryOverflow(size_t total_rows, size_t total_bytes) const;

View File

@ -79,8 +79,8 @@ namespace JoinStuff
{
assert(flags[nullptr].size() <= size);
need_flags = true;
// For one disjunct clause case, we don't need to reinit each time we call addJoinedBlock.
// and there is no value inserted in this JoinUsedFlags before addJoinedBlock finish.
// For one disjunct clause case, we don't need to reinit each time we call addBlockToJoin.
// and there is no value inserted in this JoinUsedFlags before addBlockToJoin finish.
// So we reinit only when the hash table is rehashed to a larger size.
if (flags.empty() || flags[nullptr].size() < size) [[unlikely]]
{
@ -714,7 +714,7 @@ Block HashJoin::prepareRightBlock(const Block & block) const
return prepareRightBlock(block, savedBlockSample());
}
bool HashJoin::addJoinedBlock(const Block & source_block_, bool check_limits)
bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
{
if (!data)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Join data was released");
@ -766,7 +766,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block_, bool check_limits)
size_t total_bytes = 0;
{
if (storage_join_lock)
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "addJoinedBlock called when HashJoin locked to prevent updates");
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "addBlockToJoin called when HashJoin locked to prevent updates");
data->blocks_allocated_size += block_to_save.allocatedBytes();
data->blocks.emplace_back(std::move(block_to_save));

View File

@ -155,11 +155,11 @@ public:
/** Add block of data from right hand of JOIN to the map.
* Returns false, if some limit was exceeded and you should not insert more data.
*/
bool addJoinedBlock(const Block & source_block_, bool check_limits) override;
bool addBlockToJoin(const Block & source_block_, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override;
/** Join data from the map (that was previously built by calls to addJoinedBlock) to the block with data from "left" table.
/** Join data from the map (that was previously built by calls to addBlockToJoin) to the block with data from "left" table.
* Could be called from different threads in parallel.
*/
void joinBlock(Block & block, ExtraBlockPtr & not_processed) override;
@ -406,7 +406,7 @@ private:
Poco::Logger * log;
/// Should be set via setLock to protect hash table from modification from StorageJoin
/// If set HashJoin instance is not available for modification (addJoinedBlock)
/// If set HashJoin instance is not available for modification (addBlockToJoin)
TableLockHolder storage_join_lock = nullptr;
void dataMapInit(MapsVariant &);

View File

@ -52,7 +52,7 @@ public:
/// Add block of data from right hand of JOIN.
/// @returns false, if some limit was exceeded and you should not insert more data.
virtual bool addJoinedBlock(const Block & block, bool check_limits = true) = 0; /// NOLINT
virtual bool addBlockToJoin(const Block & block, bool check_limits = true) = 0; /// NOLINT
/* Some initialization may be required before joinBlock() call.
* It's better to done in in constructor, but left block exact structure is not known at that moment.
@ -62,7 +62,7 @@ public:
virtual void checkTypesOfKeys(const Block & block) const = 0;
/// Join the block with data from left hand of JOIN to the right hand data (that was previously built by calls to addJoinedBlock).
/// Join the block with data from left hand of JOIN to the right hand data (that was previously built by calls to addBlockToJoin).
/// Could be called from different threads in parallel.
virtual void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) = 0;
@ -79,7 +79,7 @@ public:
/// Returns true if no data to join with.
virtual bool alwaysReturnsEmptySet() const = 0;
/// StorageJoin/Dictionary is already filled. No need to call addJoinedBlock.
/// StorageJoin/Dictionary is already filled. No need to call addBlockToJoin.
/// Different query plan is used for such joins.
virtual bool isFilled() const { return pipelineType() == JoinPipelineType::FilledRight; }
virtual JoinPipelineType pipelineType() const { return JoinPipelineType::FillRightFirst; }

View File

@ -19,16 +19,16 @@ JoinSwitcher::JoinSwitcher(std::shared_ptr<TableJoin> table_join_, const Block &
limits.max_bytes = table_join->defaultMaxBytes();
}
bool JoinSwitcher::addJoinedBlock(const Block & block, bool)
bool JoinSwitcher::addBlockToJoin(const Block & block, bool)
{
std::lock_guard lock(switch_mutex);
if (switched)
return join->addJoinedBlock(block);
return join->addBlockToJoin(block);
/// HashJoin with external limits check
join->addJoinedBlock(block, false);
join->addBlockToJoin(block, false);
size_t rows = join->getTotalRowCount();
size_t bytes = join->getTotalByteCount();
@ -48,7 +48,7 @@ bool JoinSwitcher::switchJoin()
bool success = true;
for (const Block & saved_block : right_blocks)
success = success && join->addJoinedBlock(saved_block);
success = success && join->addBlockToJoin(saved_block);
switched = true;
return success;

View File

@ -23,7 +23,7 @@ public:
/// Add block of data from right hand of JOIN into current join object.
/// If join-in-memory memory limit exceeded switches to join-on-disk and continue with it.
/// @returns false, if join-on-disk disk limit exceeded
bool addJoinedBlock(const Block & block, bool check_limits) override;
bool addBlockToJoin(const Block & block, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override
{

View File

@ -669,7 +669,7 @@ Block MergeJoin::modifyRightBlock(const Block & src_block) const
return block;
}
bool MergeJoin::addJoinedBlock(const Block & src_block, bool)
bool MergeJoin::addBlockToJoin(const Block & src_block, bool)
{
Block block = modifyRightBlock(src_block);

View File

@ -23,7 +23,7 @@ public:
MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block);
const TableJoin & getTableJoin() const override { return *table_join; }
bool addJoinedBlock(const Block & block, bool check_limits) override;
bool addBlockToJoin(const Block & block, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override;
void joinBlock(Block &, ExtraBlockPtr & not_processed) override;

View File

@ -305,7 +305,7 @@ void FillingRightJoinSideTransform::work()
if (for_totals)
join->setTotals(block);
else
stop_reading = !join->addJoinedBlock(block);
stop_reading = !join->addBlockToJoin(block);
set_totals = for_totals;
}

View File

@ -146,7 +146,7 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
Block block;
while (executor.pull(block))
{
new_data->addJoinedBlock(block, true);
new_data->addBlockToJoin(block, true);
if (persistent)
backup_stream.write(block);
}
@ -257,7 +257,7 @@ void StorageJoin::insertBlock(const Block & block, ContextPtr context)
if (!holder)
throw Exception(ErrorCodes::DEADLOCK_AVOIDED, "StorageJoin: cannot insert data because current query tries to read from this storage");
join->addJoinedBlock(block_to_insert, true);
join->addBlockToJoin(block_to_insert, true);
}
size_t StorageJoin::getSize(ContextPtr context) const