mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge remote-tracking branch 'origin/master' into pr-right-joins
This commit is contained in:
commit
70a01f45ad
@ -488,6 +488,7 @@
|
||||
* Remove `is_deterministic` field from the `system.functions` table. [#66630](https://github.com/ClickHouse/ClickHouse/pull/66630) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
* Function `tuple` will now try to construct named tuples in query (controlled by `enable_named_columns_in_function_tuple`). Introduce function `tupleNames` to extract names from tuples. [#54881](https://github.com/ClickHouse/ClickHouse/pull/54881) ([Amos Bird](https://github.com/amosbird)).
|
||||
* Change how deduplication for Materialized Views works. Fixed a lot of cases like: - on destination table: data is split for 2 or more blocks and that blocks is considered as duplicate when that block is inserted in parallel. - on MV destination table: the equal blocks are deduplicated, that happens when MV often produces equal data as a result for different input data due to performing aggregation. - on MV destination table: the equal blocks which comes from different MV are deduplicated. [#61601](https://github.com/ClickHouse/ClickHouse/pull/61601) ([Sema Checherinda](https://github.com/CheSema)).
|
||||
* Functions `bitShiftLeft` and `bitShitfRight` return an error for out of bounds shift positions [#65838](https://github.com/ClickHouse/ClickHouse/pull/65838) ([Pablo Marcos](https://github.com/pamarcos)).
|
||||
|
||||
#### New Feature
|
||||
* Add `ASOF JOIN` support for `full_sorting_join` algorithm. [#55051](https://github.com/ClickHouse/ClickHouse/pull/55051) ([vdimir](https://github.com/vdimir)).
|
||||
@ -599,7 +600,6 @@
|
||||
* Functions `bitTest`, `bitTestAll`, and `bitTestAny` now return an error if the specified bit index is out-of-bounds [#65818](https://github.com/ClickHouse/ClickHouse/pull/65818) ([Pablo Marcos](https://github.com/pamarcos)).
|
||||
* Setting `join_any_take_last_row` is supported in any query with hash join. [#65820](https://github.com/ClickHouse/ClickHouse/pull/65820) ([vdimir](https://github.com/vdimir)).
|
||||
* Better handling of join conditions involving `IS NULL` checks (for example `ON (a = b AND (a IS NOT NULL) AND (b IS NOT NULL) ) OR ( (a IS NULL) AND (b IS NULL) )` is rewritten to `ON a <=> b`), fix incorrect optimization when condition other then `IS NULL` are present. [#65835](https://github.com/ClickHouse/ClickHouse/pull/65835) ([vdimir](https://github.com/vdimir)).
|
||||
* Functions `bitShiftLeft` and `bitShitfRight` return an error for out of bounds shift positions [#65838](https://github.com/ClickHouse/ClickHouse/pull/65838) ([Pablo Marcos](https://github.com/pamarcos)).
|
||||
* Fix growing memory usage in S3Queue. [#65839](https://github.com/ClickHouse/ClickHouse/pull/65839) ([Kseniia Sumarokova](https://github.com/kssenii)).
|
||||
* Fix tie handling in `arrayAUC` to match sklearn. [#65840](https://github.com/ClickHouse/ClickHouse/pull/65840) ([gabrielmcg44](https://github.com/gabrielmcg44)).
|
||||
* Fix possible issues with MySQL server protocol TLS connections. [#65917](https://github.com/ClickHouse/ClickHouse/pull/65917) ([Azat Khuzhin](https://github.com/azat)).
|
||||
|
@ -746,6 +746,12 @@ The server successfully detected this situation and will download merged part fr
|
||||
M(ReadTaskRequestsSentElapsedMicroseconds, "Time spent in callbacks requested from the remote server back to the initiator server to choose the read task (for s3Cluster table function and similar). Measured on the remote server side.", ValueType::Microseconds) \
|
||||
M(MergeTreeReadTaskRequestsSentElapsedMicroseconds, "Time spent in callbacks requested from the remote server back to the initiator server to choose the read task (for MergeTree tables). Measured on the remote server side.", ValueType::Microseconds) \
|
||||
M(MergeTreeAllRangesAnnouncementsSentElapsedMicroseconds, "Time spent in sending the announcement from the remote server to the initiator server about the set of data parts (for MergeTree tables). Measured on the remote server side.", ValueType::Microseconds) \
|
||||
M(MergerMutatorsGetPartsForMergeElapsedMicroseconds, "Time spent to take data parts snapshot to build ranges from them.", ValueType::Microseconds) \
|
||||
M(MergerMutatorPrepareRangesForMergeElapsedMicroseconds, "Time spent to prepare parts ranges which can be merged according to merge predicate.", ValueType::Microseconds) \
|
||||
M(MergerMutatorSelectPartsForMergeElapsedMicroseconds, "Time spent to select parts from ranges which can be merged.", ValueType::Microseconds) \
|
||||
M(MergerMutatorRangesForMergeCount, "Amount of candidate ranges for merge", ValueType::Number) \
|
||||
M(MergerMutatorPartsInRangesForMergeCount, "Amount of candidate parts for merge", ValueType::Number) \
|
||||
M(MergerMutatorSelectRangePartsCount, "Amount of parts in selected range for merge", ValueType::Number) \
|
||||
\
|
||||
M(ConnectionPoolIsFullMicroseconds, "Total time spent waiting for a slot in connection pool.", ValueType::Microseconds) \
|
||||
M(AsyncLoaderWaitMicroseconds, "Total time a query was waiting for async loader jobs.", ValueType::Microseconds) \
|
||||
|
@ -790,6 +790,7 @@ bool CachedOnDiskReadBufferFromFile::writeCache(char * data, size_t size, size_t
|
||||
LOG_INFO(log, "Insert into cache is skipped due to insufficient disk space. ({})", e.displayText());
|
||||
return false;
|
||||
}
|
||||
chassert(file_segment.state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||
throw;
|
||||
}
|
||||
|
||||
|
@ -83,7 +83,8 @@ void EvictionCandidates::removeQueueEntries(const CachePriorityGuard::Lock & loc
|
||||
queue_iterator->invalidate();
|
||||
|
||||
chassert(candidate->releasable());
|
||||
candidate->file_segment->resetQueueIterator();
|
||||
candidate->file_segment->markDelayedRemovalAndResetQueueIterator();
|
||||
|
||||
/// We need to set removed flag in file segment metadata,
|
||||
/// because in dynamic cache resize we first remove queue entries,
|
||||
/// then evict which also removes file segment metadata,
|
||||
|
@ -172,10 +172,11 @@ void FileSegment::setQueueIterator(Priority::IteratorPtr iterator)
|
||||
queue_iterator = iterator;
|
||||
}
|
||||
|
||||
void FileSegment::resetQueueIterator()
|
||||
void FileSegment::markDelayedRemovalAndResetQueueIterator()
|
||||
{
|
||||
auto lk = lock();
|
||||
queue_iterator.reset();
|
||||
on_delayed_removal = true;
|
||||
queue_iterator = {};
|
||||
}
|
||||
|
||||
size_t FileSegment::getCurrentWriteOffset() const
|
||||
@ -701,6 +702,8 @@ void FileSegment::complete(bool allow_background_download)
|
||||
case State::PARTIALLY_DOWNLOADED:
|
||||
{
|
||||
chassert(current_downloaded_size > 0);
|
||||
chassert(fs::exists(getPath()));
|
||||
chassert(fs::file_size(getPath()) > 0);
|
||||
|
||||
if (is_last_holder)
|
||||
{
|
||||
@ -843,29 +846,60 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock & lock)
|
||||
}
|
||||
}
|
||||
|
||||
if (download_state == State::DOWNLOADED)
|
||||
switch (download_state.load())
|
||||
{
|
||||
chassert(downloader_id.empty());
|
||||
chassert(downloaded_size == reserved_size);
|
||||
chassert(downloaded_size == range().size());
|
||||
chassert(downloaded_size > 0);
|
||||
chassert(std::filesystem::file_size(getPath()) > 0);
|
||||
check_iterator(queue_iterator);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (download_state == State::DOWNLOADING)
|
||||
{
|
||||
chassert(!downloader_id.empty());
|
||||
}
|
||||
else if (download_state == State::PARTIALLY_DOWNLOADED
|
||||
|| download_state == State::EMPTY)
|
||||
case State::EMPTY:
|
||||
{
|
||||
chassert(downloader_id.empty());
|
||||
chassert(!fs::exists(getPath()));
|
||||
chassert(!queue_iterator);
|
||||
break;
|
||||
}
|
||||
case State::DOWNLOADED:
|
||||
{
|
||||
chassert(downloader_id.empty());
|
||||
|
||||
chassert(reserved_size >= downloaded_size);
|
||||
check_iterator(queue_iterator);
|
||||
chassert(downloaded_size == reserved_size);
|
||||
chassert(downloaded_size == range().size());
|
||||
chassert(downloaded_size > 0);
|
||||
chassert(fs::file_size(getPath()) > 0);
|
||||
|
||||
chassert(queue_iterator || on_delayed_removal);
|
||||
check_iterator(queue_iterator);
|
||||
break;
|
||||
}
|
||||
case State::DOWNLOADING:
|
||||
{
|
||||
chassert(!downloader_id.empty());
|
||||
if (downloaded_size)
|
||||
{
|
||||
chassert(queue_iterator);
|
||||
chassert(fs::file_size(getPath()) > 0);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case State::PARTIALLY_DOWNLOADED:
|
||||
{
|
||||
chassert(downloader_id.empty());
|
||||
|
||||
chassert(reserved_size >= downloaded_size);
|
||||
chassert(downloaded_size > 0);
|
||||
chassert(fs::file_size(getPath()) > 0);
|
||||
|
||||
chassert(queue_iterator);
|
||||
check_iterator(queue_iterator);
|
||||
break;
|
||||
}
|
||||
case State::PARTIALLY_DOWNLOADED_NO_CONTINUATION:
|
||||
{
|
||||
chassert(reserved_size >= downloaded_size);
|
||||
check_iterator(queue_iterator);
|
||||
break;
|
||||
}
|
||||
case State::DETACHED:
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
@ -993,7 +1027,12 @@ FileSegmentsHolder::FileSegmentsHolder(FileSegments && file_segments_)
|
||||
FileSegmentPtr FileSegmentsHolder::getSingleFileSegment() const
|
||||
{
|
||||
if (file_segments.size() != 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected single file segment, got: {} in holder {}", file_segments.size(), toString());
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Expected single file segment, got: {} in holder {}",
|
||||
file_segments.size(), toString());
|
||||
}
|
||||
return file_segments.front();
|
||||
}
|
||||
|
||||
@ -1004,12 +1043,21 @@ void FileSegmentsHolder::reset()
|
||||
ProfileEvents::increment(ProfileEvents::FilesystemCacheUnusedHoldFileSegments, file_segments.size());
|
||||
for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();)
|
||||
{
|
||||
/// One might think it would have been more correct to do `false` here,
|
||||
/// not to allow background download for file segments that we actually did not start reading.
|
||||
/// But actually we would only do that, if those file segments were already read partially by some other thread/query
|
||||
/// but they were not put to the download queue, because current thread was holding them in Holder.
|
||||
/// So as a culprit, we need to allow to happen what would have happened if we did not exist.
|
||||
file_segment_it = completeAndPopFrontImpl(true);
|
||||
try
|
||||
{
|
||||
/// One might think it would have been more correct to do `false` here,
|
||||
/// not to allow background download for file segments that we actually did not start reading.
|
||||
/// But actually we would only do that, if those file segments were already read partially by some other thread/query
|
||||
/// but they were not put to the download queue, because current thread was holding them in Holder.
|
||||
/// So as a culprit, we need to allow to happen what would have happened if we did not exist.
|
||||
file_segment_it = completeAndPopFrontImpl(true);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
chassert(false);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
file_segments.clear();
|
||||
}
|
||||
|
@ -177,7 +177,7 @@ public:
|
||||
|
||||
void setQueueIterator(Priority::IteratorPtr iterator);
|
||||
|
||||
void resetQueueIterator();
|
||||
void markDelayedRemovalAndResetQueueIterator();
|
||||
|
||||
KeyMetadataPtr tryGetKeyMetadata() const;
|
||||
|
||||
@ -249,12 +249,13 @@ private:
|
||||
|
||||
String tryGetPath() const;
|
||||
|
||||
Key file_key;
|
||||
const Key file_key;
|
||||
Range segment_range;
|
||||
const FileSegmentKind segment_kind;
|
||||
/// Size of the segment is not known until it is downloaded and
|
||||
/// can be bigger than max_file_segment_size.
|
||||
const bool is_unbound = false;
|
||||
/// is_unbound == true for temporary data in cache.
|
||||
const bool is_unbound;
|
||||
const bool background_download_enabled;
|
||||
|
||||
std::atomic<State> download_state;
|
||||
@ -279,6 +280,8 @@ private:
|
||||
std::atomic<size_t> hits_count = 0; /// cache hits.
|
||||
std::atomic<size_t> ref_count = 0; /// Used for getting snapshot state
|
||||
|
||||
bool on_delayed_removal = false;
|
||||
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::CacheFileSegments};
|
||||
};
|
||||
|
||||
|
@ -940,7 +940,16 @@ KeyMetadata::iterator LockedKey::removeFileSegmentImpl(
|
||||
if (file_segment->queue_iterator && invalidate_queue_entry)
|
||||
file_segment->queue_iterator->invalidate();
|
||||
|
||||
file_segment->detach(segment_lock, *this);
|
||||
try
|
||||
{
|
||||
file_segment->detach(segment_lock, *this);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
chassert(false);
|
||||
/// Do not rethrow, we must delete the file below.
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
@ -990,8 +999,8 @@ void LockedKey::shrinkFileSegmentToDownloadedSize(
|
||||
* because of no space left in cache, we need to be able to cut file segment's size to downloaded_size.
|
||||
*/
|
||||
|
||||
auto metadata = getByOffset(offset);
|
||||
const auto & file_segment = metadata->file_segment;
|
||||
auto file_segment_metadata = getByOffset(offset);
|
||||
const auto & file_segment = file_segment_metadata->file_segment;
|
||||
chassert(file_segment->assertCorrectnessUnlocked(segment_lock));
|
||||
|
||||
const size_t downloaded_size = file_segment->getDownloadedSize();
|
||||
@ -1006,15 +1015,15 @@ void LockedKey::shrinkFileSegmentToDownloadedSize(
|
||||
chassert(file_segment->reserved_size >= downloaded_size);
|
||||
int64_t diff = file_segment->reserved_size - downloaded_size;
|
||||
|
||||
metadata->file_segment = std::make_shared<FileSegment>(
|
||||
file_segment_metadata->file_segment = std::make_shared<FileSegment>(
|
||||
getKey(), offset, downloaded_size, FileSegment::State::DOWNLOADED,
|
||||
CreateFileSegmentSettings(file_segment->getKind()), false,
|
||||
file_segment->cache, key_metadata, file_segment->queue_iterator);
|
||||
|
||||
if (diff)
|
||||
metadata->getQueueIterator()->decrementSize(diff);
|
||||
file_segment_metadata->getQueueIterator()->decrementSize(diff);
|
||||
|
||||
chassert(file_segment->assertCorrectnessUnlocked(segment_lock));
|
||||
chassert(file_segment_metadata->file_segment->assertCorrectnessUnlocked(segment_lock));
|
||||
}
|
||||
|
||||
bool LockedKey::addToDownloadQueue(size_t offset, const FileSegmentGuard::Lock &)
|
||||
|
@ -1981,7 +1981,9 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
||||
Block before_prewhere_sample = source_header;
|
||||
if (sanitizeBlock(before_prewhere_sample))
|
||||
{
|
||||
before_prewhere_sample = prewhere_dag_and_flags->dag.updateHeader(before_prewhere_sample);
|
||||
ExpressionActions(
|
||||
prewhere_dag_and_flags->dag.clone(),
|
||||
ExpressionActionsSettings::fromSettings(context->getSettingsRef())).execute(before_prewhere_sample);
|
||||
auto & column_elem = before_prewhere_sample.getByName(query.prewhere()->getColumnName());
|
||||
/// If the filter column is a constant, record it.
|
||||
if (column_elem.column)
|
||||
@ -2013,7 +2015,9 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
||||
before_where_sample = source_header;
|
||||
if (sanitizeBlock(before_where_sample))
|
||||
{
|
||||
before_where_sample = before_where->dag.updateHeader(before_where_sample);
|
||||
ExpressionActions(
|
||||
before_where->dag.clone(),
|
||||
ExpressionActionsSettings::fromSettings(context->getSettingsRef())).execute(before_where_sample);
|
||||
|
||||
auto & column_elem
|
||||
= before_where_sample.getByName(query.where()->getColumnName());
|
||||
|
@ -1647,10 +1647,7 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
|
||||
SortingStep::Settings sort_settings(*query_context);
|
||||
|
||||
auto sorting_step = std::make_unique<SortingStep>(
|
||||
plan.getCurrentHeader(),
|
||||
std::move(sort_description),
|
||||
0 /*limit*/,
|
||||
sort_settings);
|
||||
plan.getCurrentHeader(), std::move(sort_description), 0 /*limit*/, sort_settings, true /*is_sorting_for_merge_join*/);
|
||||
sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", join_table_side));
|
||||
plan.addStep(std::move(sorting_step));
|
||||
};
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
#include <Processors/QueryPlan/FilterStep.h>
|
||||
#include <Processors/QueryPlan/JoinStep.h>
|
||||
#include <Processors/QueryPlan/SortingStep.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/StorageDummy.h>
|
||||
#include <Storages/StorageMaterializedView.h>
|
||||
@ -169,12 +170,25 @@ const QueryNode * findQueryForParallelReplicas(
|
||||
const std::unordered_map<const QueryNode *, const QueryPlan::Node *> & mapping,
|
||||
const Settings & settings)
|
||||
{
|
||||
const QueryPlan::Node * prev_checked_node = nullptr;
|
||||
struct Frame
|
||||
{
|
||||
const QueryPlan::Node * node = nullptr;
|
||||
/// Below we will check subqueries from `stack` to find outermost subquery that could be executed remotely.
|
||||
/// Currently traversal algorithm considers only steps with 0 or 1 children and JOIN specifically.
|
||||
/// When we found some step that requires finalization on the initiator (e.g. GROUP BY) there are two options:
|
||||
/// 1. If plan looks like a single path (e.g. AggregatingStep -> ExpressionStep -> Reading) we can execute
|
||||
/// current subquery as a whole with replicas.
|
||||
/// 2. If we were inside JOIN we cannot offload the whole subquery to replicas because at least one side
|
||||
/// of the JOIN needs to be finalized on the initiator.
|
||||
/// So this flag is used to track what subquery to return once we hit a step that needs finalization.
|
||||
bool inside_join = false;
|
||||
};
|
||||
|
||||
const QueryNode * res = nullptr;
|
||||
|
||||
while (!stack.empty())
|
||||
{
|
||||
const QueryNode * subquery_node = stack.top();
|
||||
const QueryNode * const subquery_node = stack.top();
|
||||
stack.pop();
|
||||
|
||||
auto it = mapping.find(subquery_node);
|
||||
@ -182,22 +196,21 @@ const QueryNode * findQueryForParallelReplicas(
|
||||
if (it == mapping.end())
|
||||
break;
|
||||
|
||||
const QueryPlan::Node * curr_node = it->second;
|
||||
const QueryPlan::Node * next_node_to_check = curr_node;
|
||||
std::stack<Frame> nodes_to_check;
|
||||
nodes_to_check.push({.node = it->second, .inside_join = false});
|
||||
bool can_distribute_full_node = true;
|
||||
bool currently_inside_join = false;
|
||||
|
||||
while (next_node_to_check && next_node_to_check != prev_checked_node)
|
||||
while (!nodes_to_check.empty())
|
||||
{
|
||||
const auto & [next_node_to_check, inside_join] = nodes_to_check.top();
|
||||
nodes_to_check.pop();
|
||||
const auto & children = next_node_to_check->children;
|
||||
auto * step = next_node_to_check->step.get();
|
||||
|
||||
if (children.empty())
|
||||
{
|
||||
/// Found a source step. This should be possible only in the first iteration.
|
||||
if (prev_checked_node)
|
||||
return nullptr;
|
||||
|
||||
next_node_to_check = nullptr;
|
||||
/// Found a source step.
|
||||
}
|
||||
else if (children.size() == 1)
|
||||
{
|
||||
@ -205,12 +218,19 @@ const QueryNode * findQueryForParallelReplicas(
|
||||
const auto * filter = typeid_cast<FilterStep *>(step);
|
||||
|
||||
const auto * creating_sets = typeid_cast<DelayedCreatingSetsStep *>(step);
|
||||
bool allowed_creating_sets = settings[Setting::parallel_replicas_allow_in_with_subquery] && creating_sets;
|
||||
const bool allowed_creating_sets = settings[Setting::parallel_replicas_allow_in_with_subquery] && creating_sets;
|
||||
|
||||
if (!expression && !filter && !allowed_creating_sets)
|
||||
const auto * sorting = typeid_cast<SortingStep *>(step);
|
||||
/// Sorting for merge join is supposed to be done locally before join itself, so it doesn't need finalization.
|
||||
const bool allowed_sorting = sorting && sorting->isSortingForMergeJoin();
|
||||
|
||||
if (!expression && !filter && !allowed_creating_sets && !allowed_sorting)
|
||||
{
|
||||
can_distribute_full_node = false;
|
||||
currently_inside_join = inside_join;
|
||||
}
|
||||
|
||||
next_node_to_check = children.front();
|
||||
nodes_to_check.push({.node = children.front(), .inside_join = inside_join});
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -220,12 +240,11 @@ const QueryNode * findQueryForParallelReplicas(
|
||||
if (!join)
|
||||
return res;
|
||||
|
||||
next_node_to_check = children.front();
|
||||
for (const auto & child : children)
|
||||
nodes_to_check.push({.node = child, .inside_join = true});
|
||||
}
|
||||
}
|
||||
|
||||
/// Current node contains steps like GROUP BY / DISTINCT
|
||||
/// Will try to execute query up to WithMergableStage
|
||||
if (!can_distribute_full_node)
|
||||
{
|
||||
/// Current query node does not contain subqueries.
|
||||
@ -233,12 +252,11 @@ const QueryNode * findQueryForParallelReplicas(
|
||||
if (!res)
|
||||
return nullptr;
|
||||
|
||||
return subquery_node;
|
||||
return currently_inside_join ? res : subquery_node;
|
||||
}
|
||||
|
||||
/// Query is simple enough to be fully distributed.
|
||||
res = subquery_node;
|
||||
prev_checked_node = curr_node;
|
||||
}
|
||||
|
||||
return res;
|
||||
|
@ -77,13 +77,11 @@ static ITransformingStep::Traits getTraits(size_t limit)
|
||||
}
|
||||
|
||||
SortingStep::SortingStep(
|
||||
const Header & input_header,
|
||||
SortDescription description_,
|
||||
UInt64 limit_,
|
||||
const Settings & settings_)
|
||||
const Header & input_header, SortDescription description_, UInt64 limit_, const Settings & settings_, bool is_sorting_for_merge_join_)
|
||||
: ITransformingStep(input_header, input_header, getTraits(limit_))
|
||||
, type(Type::Full)
|
||||
, result_description(std::move(description_))
|
||||
, is_sorting_for_merge_join(is_sorting_for_merge_join_)
|
||||
, limit(limit_)
|
||||
, sort_settings(settings_)
|
||||
{
|
||||
|
@ -39,7 +39,8 @@ public:
|
||||
const Header & input_header,
|
||||
SortDescription description_,
|
||||
UInt64 limit_,
|
||||
const Settings & settings_);
|
||||
const Settings & settings_,
|
||||
bool is_sorting_for_merge_join_ = false);
|
||||
|
||||
/// Full with partitioning
|
||||
SortingStep(
|
||||
@ -81,6 +82,8 @@ public:
|
||||
|
||||
bool hasPartitions() const { return !partition_by_description.empty(); }
|
||||
|
||||
bool isSortingForMergeJoin() const { return is_sorting_for_merge_join; }
|
||||
|
||||
void convertToFinishSorting(SortDescription prefix_description, bool use_buffering_);
|
||||
|
||||
Type getType() const { return type; }
|
||||
@ -125,6 +128,9 @@ private:
|
||||
|
||||
SortDescription partition_by_description;
|
||||
|
||||
/// See `findQueryForParallelReplicas`
|
||||
bool is_sorting_for_merge_join = false;
|
||||
|
||||
UInt64 limit;
|
||||
bool always_read_till_end = false;
|
||||
bool use_buffering = false;
|
||||
|
@ -48,6 +48,16 @@ namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric BackgroundMergesAndMutationsPoolTask;
|
||||
}
|
||||
namespace ProfileEvents
|
||||
{
|
||||
|
||||
extern const Event MergerMutatorsGetPartsForMergeElapsedMicroseconds;
|
||||
extern const Event MergerMutatorPrepareRangesForMergeElapsedMicroseconds;
|
||||
extern const Event MergerMutatorSelectPartsForMergeElapsedMicroseconds;
|
||||
extern const Event MergerMutatorRangesForMergeCount;
|
||||
extern const Event MergerMutatorPartsInRangesForMergeCount;
|
||||
extern const Event MergerMutatorSelectRangePartsCount;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -215,6 +225,7 @@ MergeTreeDataMergerMutator::PartitionIdsHint MergeTreeDataMergerMutator::getPart
|
||||
{
|
||||
PartitionIdsHint res;
|
||||
MergeTreeData::DataPartsVector data_parts = getDataPartsToSelectMergeFrom(txn);
|
||||
|
||||
if (data_parts.empty())
|
||||
return res;
|
||||
|
||||
@ -272,6 +283,8 @@ MergeTreeDataMergerMutator::PartitionIdsHint MergeTreeDataMergerMutator::getPart
|
||||
MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::getDataPartsToSelectMergeFrom(
|
||||
const MergeTreeTransactionPtr & txn, const PartitionIdsHint * partitions_hint) const
|
||||
{
|
||||
|
||||
Stopwatch get_data_parts_for_merge_timer;
|
||||
auto res = getDataPartsToSelectMergeFrom(txn);
|
||||
if (!partitions_hint)
|
||||
return res;
|
||||
@ -280,6 +293,8 @@ MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::getDataPartsToSelectM
|
||||
{
|
||||
return !partitions_hint->contains(part->info.partition_id);
|
||||
});
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::MergerMutatorsGetPartsForMergeElapsedMicroseconds, get_data_parts_for_merge_timer.elapsedMicroseconds());
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -357,6 +372,7 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo
|
||||
const MergeTreeTransactionPtr & txn,
|
||||
PreformattedMessage & out_disable_reason) const
|
||||
{
|
||||
Stopwatch ranges_for_merge_timer;
|
||||
MergeSelectingInfo res;
|
||||
|
||||
res.current_time = std::time(nullptr);
|
||||
@ -457,6 +473,10 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo
|
||||
prev_part = ∂
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::MergerMutatorPartsInRangesForMergeCount, res.parts_selected_precondition);
|
||||
ProfileEvents::increment(ProfileEvents::MergerMutatorRangesForMergeCount, res.parts_ranges.size());
|
||||
ProfileEvents::increment(ProfileEvents::MergerMutatorPrepareRangesForMergeElapsedMicroseconds, ranges_for_merge_timer.elapsedMicroseconds());
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -471,6 +491,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
|
||||
PreformattedMessage & out_disable_reason,
|
||||
bool dry_run)
|
||||
{
|
||||
Stopwatch select_parts_from_ranges_timer;
|
||||
const auto data_settings = data.getSettings();
|
||||
IMergeSelector::PartsRange parts_to_merge;
|
||||
|
||||
@ -570,7 +591,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
|
||||
|
||||
if (parts_to_merge.empty())
|
||||
{
|
||||
out_disable_reason = PreformattedMessage::create("Did not find any parts to merge (with usual merge selectors)");
|
||||
ProfileEvents::increment(ProfileEvents::MergerMutatorSelectPartsForMergeElapsedMicroseconds, select_parts_from_ranges_timer.elapsedMicroseconds());
|
||||
out_disable_reason = PreformattedMessage::create("Did not find any parts to merge (with usual merge selectors) in {}ms", select_parts_from_ranges_timer.elapsedMicroseconds() / 1000);
|
||||
return SelectPartsDecision::CANNOT_SELECT;
|
||||
}
|
||||
}
|
||||
@ -583,8 +605,11 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
|
||||
parts.push_back(part);
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Selected {} parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
|
||||
LOG_DEBUG(log, "Selected {} parts from {} to {} in {}ms", parts.size(), parts.front()->name, parts.back()->name, select_parts_from_ranges_timer.elapsedMicroseconds() / 1000);
|
||||
ProfileEvents::increment(ProfileEvents::MergerMutatorSelectRangePartsCount, parts.size());
|
||||
|
||||
future_part->assign(std::move(parts));
|
||||
ProfileEvents::increment(ProfileEvents::MergerMutatorSelectPartsForMergeElapsedMicroseconds, select_parts_from_ranges_timer.elapsedMicroseconds());
|
||||
return SelectPartsDecision::SELECTED;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,10 @@
|
||||
1 1
|
||||
1 1
|
||||
|
||||
0 0
|
||||
-----
|
||||
1 1
|
||||
1 1
|
||||
|
||||
0 0
|
||||
-----
|
48
tests/queries/0_stateless/03254_parallel_replicas_join_with_totals.sh
Executable file
48
tests/queries/0_stateless/03254_parallel_replicas_join_with_totals.sh
Executable file
@ -0,0 +1,48 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="
|
||||
CREATE TABLE t
|
||||
(
|
||||
item_id UInt64,
|
||||
price_sold Float32,
|
||||
date Date
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY item_id;
|
||||
|
||||
INSERT INTO t VALUES (1, 100, '1970-01-01'), (1, 200, '1970-01-02');
|
||||
"
|
||||
|
||||
for enable_parallel_replicas in {0..1}; do
|
||||
${CLICKHOUSE_CLIENT} --query="
|
||||
--- Old analyzer uses different code path and it produces wrong result in this case.
|
||||
set enable_analyzer=1;
|
||||
set allow_experimental_parallel_reading_from_replicas=${enable_parallel_replicas}, cluster_for_parallel_replicas='parallel_replicas', max_parallel_replicas=100, parallel_replicas_for_non_replicated_merge_tree=1;
|
||||
|
||||
SELECT *
|
||||
FROM
|
||||
(
|
||||
SELECT item_id
|
||||
FROM t
|
||||
) AS l
|
||||
LEFT JOIN
|
||||
(
|
||||
SELECT item_id
|
||||
FROM t
|
||||
GROUP BY item_id
|
||||
WITH TOTALS
|
||||
ORDER BY item_id ASC
|
||||
) AS r ON l.item_id = r.item_id;
|
||||
|
||||
SELECT '-----';
|
||||
"
|
||||
done
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="
|
||||
DROP TABLE t;
|
||||
"
|
@ -0,0 +1,116 @@
|
||||
4999950000
|
||||
4999950000
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1` GROUP BY `__table1`.`item_id`
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1`
|
||||
4999950000
|
||||
4999950000
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1`
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1` GROUP BY `__table1`.`item_id`
|
||||
499950000
|
||||
499960000
|
||||
499970000
|
||||
499980000
|
||||
499990000
|
||||
500000000
|
||||
500010000
|
||||
500020000
|
||||
500030000
|
||||
500040000
|
||||
499950000
|
||||
499960000
|
||||
499970000
|
||||
499980000
|
||||
499990000
|
||||
500000000
|
||||
500010000
|
||||
500020000
|
||||
500030000
|
||||
500040000
|
||||
SELECT sum(`__table1`.`item_id`) AS `sum(item_id)` FROM (SELECT `__table2`.`item_id` AS `item_id`, `__table2`.`price_sold` AS `price_sold` FROM `default`.`t` AS `__table2`) AS `__table1` ALL LEFT JOIN (SELECT `__table4`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table4`) AS `__table3` ON `__table1`.`item_id` = `__table3`.`item_id` GROUP BY `__table1`.`price_sold` ORDER BY `__table1`.`price_sold` ASC
|
||||
4999950000
|
||||
4999950000
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1` GROUP BY `__table1`.`item_id`
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1`
|
||||
4999950000
|
||||
4999950000
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1`
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1` GROUP BY `__table1`.`item_id`
|
||||
499950000
|
||||
499960000
|
||||
499970000
|
||||
499980000
|
||||
499990000
|
||||
500000000
|
||||
500010000
|
||||
500020000
|
||||
500030000
|
||||
500040000
|
||||
499950000
|
||||
499960000
|
||||
499970000
|
||||
499980000
|
||||
499990000
|
||||
500000000
|
||||
500010000
|
||||
500020000
|
||||
500030000
|
||||
500040000
|
||||
SELECT sum(`__table1`.`item_id`) AS `sum(item_id)` FROM (SELECT `__table2`.`item_id` AS `item_id`, `__table2`.`price_sold` AS `price_sold` FROM `default`.`t` AS `__table2`) AS `__table1` ALL LEFT JOIN (SELECT `__table4`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table4`) AS `__table3` ON `__table1`.`item_id` = `__table3`.`item_id` GROUP BY `__table1`.`price_sold` ORDER BY `__table1`.`price_sold` ASC
|
||||
4999950000
|
||||
4999950000
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1` GROUP BY `__table1`.`item_id`
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1`
|
||||
4999950000
|
||||
4999950000
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1`
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1` GROUP BY `__table1`.`item_id`
|
||||
499950000
|
||||
499960000
|
||||
499970000
|
||||
499980000
|
||||
499990000
|
||||
500000000
|
||||
500010000
|
||||
500020000
|
||||
500030000
|
||||
500040000
|
||||
499950000
|
||||
499960000
|
||||
499970000
|
||||
499980000
|
||||
499990000
|
||||
500000000
|
||||
500010000
|
||||
500020000
|
||||
500030000
|
||||
500040000
|
||||
SELECT sum(`__table1`.`item_id`) AS `sum(item_id)` FROM (SELECT `__table2`.`item_id` AS `item_id`, `__table2`.`price_sold` AS `price_sold` FROM `default`.`t` AS `__table2`) AS `__table1` GLOBAL ALL LEFT JOIN `_data_x_y_` AS `__table3` ON `__table1`.`item_id` = `__table3`.`item_id` GROUP BY `__table1`.`price_sold` ORDER BY `__table1`.`price_sold` ASC
|
||||
4999950000
|
||||
4999950000
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1` GROUP BY `__table1`.`item_id`
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1`
|
||||
4999950000
|
||||
4999950000
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t` AS `__table1`
|
||||
SELECT `__table1`.`item_id` AS `item_id` FROM `default`.`t1` AS `__table1` GROUP BY `__table1`.`item_id`
|
||||
499950000
|
||||
499960000
|
||||
499970000
|
||||
499980000
|
||||
499990000
|
||||
500000000
|
||||
500010000
|
||||
500020000
|
||||
500030000
|
||||
500040000
|
||||
499950000
|
||||
499960000
|
||||
499970000
|
||||
499980000
|
||||
499990000
|
||||
500000000
|
||||
500010000
|
||||
500020000
|
||||
500030000
|
||||
500040000
|
||||
SELECT sum(`__table1`.`item_id`) AS `sum(item_id)` FROM (SELECT `__table2`.`item_id` AS `item_id`, `__table2`.`price_sold` AS `price_sold` FROM `default`.`t` AS `__table2`) AS `__table1` GLOBAL ALL LEFT JOIN `_data_x_y_` AS `__table3` ON `__table1`.`item_id` = `__table3`.`item_id` GROUP BY `__table1`.`price_sold` ORDER BY `__table1`.`price_sold` ASC
|
101
tests/queries/0_stateless/03255_parallel_replicas_join_algo_and_analyzer_4.sh
Executable file
101
tests/queries/0_stateless/03255_parallel_replicas_join_algo_and_analyzer_4.sh
Executable file
@ -0,0 +1,101 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: long, no-random-settings, no-random-merge-tree-settings
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="
|
||||
CREATE TABLE t
|
||||
(
|
||||
item_id UInt64,
|
||||
price_sold Float32,
|
||||
date Date
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY item_id;
|
||||
|
||||
CREATE TABLE t1
|
||||
(
|
||||
item_id UInt64,
|
||||
price_sold Float32,
|
||||
date Date
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY item_id;
|
||||
|
||||
INSERT INTO t SELECT number, number % 10, toDate(number) FROM numbers(100000);
|
||||
INSERT INTO t1 SELECT number, number % 10, toDate(number) FROM numbers(100000);
|
||||
"
|
||||
|
||||
query1="
|
||||
SELECT sum(item_id)
|
||||
FROM
|
||||
(
|
||||
SELECT item_id
|
||||
FROM t
|
||||
GROUP BY item_id
|
||||
) AS l
|
||||
LEFT JOIN
|
||||
(
|
||||
SELECT item_id
|
||||
FROM t1
|
||||
) AS r ON l.item_id = r.item_id
|
||||
"
|
||||
|
||||
query2="
|
||||
SELECT sum(item_id)
|
||||
FROM
|
||||
(
|
||||
SELECT item_id
|
||||
FROM t
|
||||
) AS l
|
||||
LEFT JOIN
|
||||
(
|
||||
SELECT item_id
|
||||
FROM t1
|
||||
GROUP BY item_id
|
||||
) AS r ON l.item_id = r.item_id
|
||||
"
|
||||
|
||||
query3="
|
||||
SELECT sum(item_id)
|
||||
FROM
|
||||
(
|
||||
SELECT item_id, price_sold
|
||||
FROM t
|
||||
) AS l
|
||||
LEFT JOIN
|
||||
(
|
||||
SELECT item_id
|
||||
FROM t1
|
||||
) AS r ON l.item_id = r.item_id
|
||||
GROUP BY price_sold
|
||||
ORDER BY price_sold
|
||||
"
|
||||
|
||||
for parallel_replicas_prefer_local_join in 1 0; do
|
||||
for prefer_local_plan in {0..1}; do
|
||||
for query in "${query1}" "${query2}" "${query3}"; do
|
||||
for enable_parallel_replicas in {0..1}; do
|
||||
${CLICKHOUSE_CLIENT} --query="
|
||||
set enable_analyzer=1;
|
||||
set parallel_replicas_prefer_local_join=${parallel_replicas_prefer_local_join};
|
||||
set parallel_replicas_local_plan=${prefer_local_plan};
|
||||
set allow_experimental_parallel_reading_from_replicas=${enable_parallel_replicas}, cluster_for_parallel_replicas='parallel_replicas', max_parallel_replicas=100, parallel_replicas_for_non_replicated_merge_tree=1;
|
||||
|
||||
--SELECT '----- enable_parallel_replicas=$enable_parallel_replicas prefer_local_plan=$prefer_local_plan parallel_replicas_prefer_local_join=$parallel_replicas_prefer_local_join -----';
|
||||
${query};
|
||||
|
||||
SELECT replaceRegexpAll(replaceRegexpAll(explain, '.*Query: (.*) Replicas:.*', '\\1'), '(.*)_data_[\d]+_[\d]+(.*)', '\1_data_x_y_\2')
|
||||
FROM
|
||||
(
|
||||
EXPLAIN actions=1 ${query}
|
||||
)
|
||||
WHERE explain LIKE '%ParallelReplicas%';
|
||||
"
|
||||
done
|
||||
done
|
||||
done
|
||||
done
|
@ -0,0 +1,23 @@
|
||||
WITH
|
||||
multiIf('-1' = '-1', 10080, '-1' = '7', 60, '-1' = '1', 5, 1440) AS interval_start, -- noqa
|
||||
multiIf('-1' = '-1', CEIL((today() - toDate('2017-06-22')) / 7)::UInt16, '-1' = '7', 168, '-1' = '1', 288, 90) AS days_run, -- noqa:L045
|
||||
block_time as (SELECT arrayJoin(
|
||||
arrayMap(
|
||||
i -> toDateTime(toStartOfInterval(now(), INTERVAL interval_start MINUTE) - interval_start * 60 * i, 'UTC'),
|
||||
range(days_run)
|
||||
)
|
||||
)),
|
||||
|
||||
sales AS (
|
||||
SELECT
|
||||
toDateTime(toStartOfInterval(now(), INTERVAL interval_start MINUTE), 'UTC') AS block_time
|
||||
FROM
|
||||
numbers(1)
|
||||
GROUP BY
|
||||
block_time
|
||||
ORDER BY
|
||||
block_time)
|
||||
|
||||
SELECT
|
||||
block_time
|
||||
FROM sales where block_time >= (SELECT MIN(block_time) FROM sales) format Null;
|
Loading…
Reference in New Issue
Block a user