Merge branch 'master' into fix-projection-merge

This commit is contained in:
jsc0218 2024-08-14 21:38:44 -04:00 committed by GitHub
commit c840a12761
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
52 changed files with 580 additions and 152 deletions

View File

@ -101,6 +101,7 @@ jobs:
--volume=".:/wd" --workdir="/wd" \
clickhouse/style-test \
./tests/ci/changelog.py -v --debug-helpers \
--gh-user-or-token ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }} \
--jobs=5 \
--output="./docs/changelogs/${{ env.RELEASE_TAG }}.md" ${{ env.RELEASE_TAG }}
git add ./docs/changelogs/${{ env.RELEASE_TAG }}.md

View File

@ -0,0 +1,29 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v24.3.7.30-lts (c8a28cf4331) FIXME as compared to v24.3.6.48-lts (b2d33c3c45d)
#### Improvement
* Backported in [#68103](https://github.com/ClickHouse/ClickHouse/issues/68103): Distinguish booleans and integers while parsing values for custom settings: ``` SET custom_a = true; SET custom_b = 1; ```. [#62206](https://github.com/ClickHouse/ClickHouse/pull/62206) ([Vitaly Baranov](https://github.com/vitlibar)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Backported in [#67931](https://github.com/ClickHouse/ClickHouse/issues/67931): Fixing the `Not-ready Set` error after the `PREWHERE` optimization for StorageMerge. [#65057](https://github.com/ClickHouse/ClickHouse/pull/65057) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Backported in [#68062](https://github.com/ClickHouse/ClickHouse/issues/68062): Fix boolean literals in query sent to external database (for engines like `PostgreSQL`). [#66282](https://github.com/ClickHouse/ClickHouse/pull/66282) ([vdimir](https://github.com/vdimir)).
* Backported in [#67812](https://github.com/ClickHouse/ClickHouse/issues/67812): Only relevant to the experimental Variant data type. Fix crash with Variant + AggregateFunction type. [#67122](https://github.com/ClickHouse/ClickHouse/pull/67122) ([Kruglov Pavel](https://github.com/Avogar)).
* Backported in [#67848](https://github.com/ClickHouse/ClickHouse/issues/67848): Fixes [#66026](https://github.com/ClickHouse/ClickHouse/issues/66026). Avoid unresolved table function arguments traversal in `ReplaceTableNodeToDummyVisitor`. [#67522](https://github.com/ClickHouse/ClickHouse/pull/67522) ([Dmitry Novik](https://github.com/novikd)).
* Backported in [#68271](https://github.com/ClickHouse/ClickHouse/issues/68271): Fix inserting into stream like engines (Kafka, RabbitMQ, NATS) through HTTP interface. [#67554](https://github.com/ClickHouse/ClickHouse/pull/67554) ([János Benjamin Antal](https://github.com/antaljanosbenjamin)).
* Backported in [#67806](https://github.com/ClickHouse/ClickHouse/issues/67806): Fix reloading SQL UDFs with UNION. Previously, restarting the server could make UDF invalid. [#67665](https://github.com/ClickHouse/ClickHouse/pull/67665) ([Antonio Andelic](https://github.com/antonio2368)).
* Backported in [#67834](https://github.com/ClickHouse/ClickHouse/issues/67834): Fix potential stack overflow in `JSONMergePatch` function. Renamed this function from `jsonMergePatch` to `JSONMergePatch` because the previous name was wrong. The previous name is still kept for compatibility. Improved diagnostic of errors in the function. This closes [#67304](https://github.com/ClickHouse/ClickHouse/issues/67304). [#67756](https://github.com/ClickHouse/ClickHouse/pull/67756) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Backported in [#68206](https://github.com/ClickHouse/ClickHouse/issues/68206): Fix wrong `count()` result when there is non-deterministic function in predicate. [#67922](https://github.com/ClickHouse/ClickHouse/pull/67922) ([János Benjamin Antal](https://github.com/antaljanosbenjamin)).
* Backported in [#68089](https://github.com/ClickHouse/ClickHouse/issues/68089): Fixed the calculation of the maximum thread soft limit in containerized environments where the usable CPU count is limited. [#67963](https://github.com/ClickHouse/ClickHouse/pull/67963) ([Robert Schulze](https://github.com/rschu1ze)).
* Backported in [#68120](https://github.com/ClickHouse/ClickHouse/issues/68120): Fixed skipping of untouched parts in mutations with new analyzer. Previously with enabled analyzer data in part could be rewritten by mutation even if mutation doesn't affect this part according to predicate. [#68052](https://github.com/ClickHouse/ClickHouse/pull/68052) ([Anton Popov](https://github.com/CurtizJ)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Update version after release. [#67676](https://github.com/ClickHouse/ClickHouse/pull/67676) ([robot-clickhouse](https://github.com/robot-clickhouse)).
* Backported in [#68074](https://github.com/ClickHouse/ClickHouse/issues/68074): Add an explicit error for `ALTER MODIFY SQL SECURITY` on non-view tables. [#67953](https://github.com/ClickHouse/ClickHouse/pull/67953) ([pufit](https://github.com/pufit)).

View File

@ -307,7 +307,7 @@
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
\
M(S3DiskNoKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \
M(DiskS3NoSuchKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \
#ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)

View File

@ -209,8 +209,35 @@
\
M(Merge, "Number of launched background merges.") \
M(MergedRows, "Rows read for background merges. This is the number of rows before merge.") \
M(MergedColumns, "Number of columns merged during the horizontal stage of merges.") \
M(GatheredColumns, "Number of columns gathered during the vertical stage of merges.") \
M(MergedUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) that was read for background merges. This is the number before merge.") \
M(MergesTimeMilliseconds, "Total time spent for background merges.")\
M(MergeTotalMilliseconds, "Total time spent for background merges") \
M(MergeExecuteMilliseconds, "Total busy time spent for execution of background merges") \
M(MergeHorizontalStageTotalMilliseconds, "Total time spent for horizontal stage of background merges") \
M(MergeHorizontalStageExecuteMilliseconds, "Total busy time spent for execution of horizontal stage of background merges") \
M(MergeVerticalStageTotalMilliseconds, "Total time spent for vertical stage of background merges") \
M(MergeVerticalStageExecuteMilliseconds, "Total busy time spent for execution of vertical stage of background merges") \
M(MergeProjectionStageTotalMilliseconds, "Total time spent for projection stage of background merges") \
M(MergeProjectionStageExecuteMilliseconds, "Total busy time spent for execution of projection stage of background merges") \
\
M(MergingSortedMilliseconds, "Total time spent while merging sorted columns") \
M(AggregatingSortedMilliseconds, "Total time spent while aggregating sorted columns") \
M(CollapsingSortedMilliseconds, "Total time spent while collapsing sorted columns") \
M(ReplacingSortedMilliseconds, "Total time spent while replacing sorted columns") \
M(SummingSortedMilliseconds, "Total time spent while summing sorted columns") \
M(VersionedCollapsingSortedMilliseconds, "Total time spent while version collapsing sorted columns") \
M(GatheringColumnMilliseconds, "Total time spent while gathering columns for vertical merge") \
\
M(MutationTotalParts, "Number of total parts for which mutations tried to be applied") \
M(MutationUntouchedParts, "Number of total parts for which mutations tried to be applied but which was completely skipped according to predicate") \
M(MutatedRows, "Rows read for mutations. This is the number of rows before mutation") \
M(MutatedUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) that was read for mutations. This is the number before mutation.") \
M(MutationTotalMilliseconds, "Total time spent for mutations.") \
M(MutationExecuteMilliseconds, "Total busy time spent for execution of mutations.") \
M(MutationAllPartColumns, "Number of times when task to mutate all columns in part was created") \
M(MutationSomePartColumns, "Number of times when task to mutate some columns in part was created") \
M(MutateTaskProjectionsCalculationMicroseconds, "Time spent calculating projections in mutations.") \
\
M(MergeTreeDataWriterRows, "Number of rows INSERTed to MergeTree tables.") \
M(MergeTreeDataWriterUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) INSERTed to MergeTree tables.") \
@ -225,7 +252,6 @@
M(MergeTreeDataWriterProjectionsCalculationMicroseconds, "Time spent calculating projections") \
M(MergeTreeDataProjectionWriterSortingBlocksMicroseconds, "Time spent sorting blocks (for projection it might be a key different from table's sorting key)") \
M(MergeTreeDataProjectionWriterMergingBlocksMicroseconds, "Time spent merging blocks") \
M(MutateTaskProjectionsCalculationMicroseconds, "Time spent calculating projections") \
\
M(InsertedWideParts, "Number of parts inserted in Wide format.") \
M(InsertedCompactParts, "Number of parts inserted in Compact format.") \

View File

@ -46,7 +46,7 @@ namespace ProfileEvents
namespace CurrentMetrics
{
extern const Metric S3DiskNoKeyErrors;
extern const Metric DiskS3NoSuchKeyErrors;
}
namespace DB
@ -701,7 +701,7 @@ RequestResult Client::processRequestResult(RequestResult && outcome) const
return std::forward<RequestResult>(outcome);
if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
CurrentMetrics::add(CurrentMetrics::S3DiskNoKeyErrors);
CurrentMetrics::add(CurrentMetrics::DiskS3NoSuchKeyErrors);
String enriched_message = fmt::format(
"{} {}",

View File

@ -354,8 +354,8 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
{
if (unlikely(current_offset >= max_joined_block_rows))
{
added_columns.offsets_to_replicate->resize_assume_reserved(i);
added_columns.filter.resize_assume_reserved(i);
added_columns.offsets_to_replicate->resize(i);
added_columns.filter.resize(i);
break;
}
}

View File

@ -18,11 +18,25 @@ struct JoinFeatures
static constexpr bool inner = KIND == JoinKind::Inner;
static constexpr bool full = KIND == JoinKind::Full;
/** Whether we may need duplicate rows from the left table.
* For example, when we have row (key1, attr1) in left table
* and rows (key1, attr2), (key1, attr3) in right table,
* then we need to duplicate row (key1, attr1) for each of joined rows from right table, so result will be
* (key1, attr1, key1, attr2)
* (key1, attr1, key1, attr3)
*/
static constexpr bool need_replication = is_all_join || (is_any_join && right) || (is_semi_join && right);
/// Whether we need to filter rows from the left table that do not have matches in the right table.
static constexpr bool need_filter = !need_replication && (inner || right || (is_semi_join && left) || (is_anti_join && left));
/// Whether we need to add default values for columns from the left table.
static constexpr bool add_missing = (left || full) && !is_semi_join;
/// Whether we need to store flags for rows from the right table table
/// that indicates if they have matches in the left table.
static constexpr bool need_flags = MapGetter<KIND, STRICTNESS, std::is_same_v<std::decay_t<Map>, HashJoin::MapsAll>>::flagged;
static constexpr bool is_maps_all = std::is_same_v<std::decay_t<Map>, HashJoin::MapsAll>;
};

View File

@ -3,6 +3,11 @@
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event AggregatingSortedMilliseconds;
}
namespace DB
{
@ -29,6 +34,11 @@ public:
}
String getName() const override { return "AggregatingSortedTransform"; }
void onFinish() override
{
logMergedStats(ProfileEvents::AggregatingSortedMilliseconds, "Aggregated sorted", getLogger("AggregatingSortedTransform"));
}
};
}

View File

@ -30,6 +30,8 @@ public:
void consume(Input & input, size_t source_num) override;
Status merge() override;
MergedStats getMergedStats() const override { return merged_data.getMergedStats(); }
/// Stores information for aggregation of SimpleAggregateFunction columns
struct SimpleAggregateDescription
{

View File

@ -126,6 +126,9 @@ IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
Chunk FinishAggregatingInOrderAlgorithm::prepareToMerge()
{
total_merged_rows += accumulated_rows;
total_merged_bytes += accumulated_bytes;
accumulated_rows = 0;
accumulated_bytes = 0;

View File

@ -50,6 +50,8 @@ public:
void consume(Input & input, size_t source_num) override;
Status merge() override;
MergedStats getMergedStats() const override { return {.bytes = accumulated_bytes, .rows = accumulated_rows, .blocks = chunk_num}; }
private:
Chunk prepareToMerge();
void addToAggregation();
@ -92,6 +94,9 @@ private:
UInt64 chunk_num = 0;
size_t accumulated_rows = 0;
size_t accumulated_bytes = 0;
size_t total_merged_rows = 0;
size_t total_merged_bytes = 0;
};
}

View File

@ -33,6 +33,8 @@ public:
const char * getName() const override { return "GraphiteRollupSortedAlgorithm"; }
Status merge() override;
MergedStats getMergedStats() const override { return merged_data->getMergedStats(); }
struct ColumnsDefinition
{
size_t path_column_num;

View File

@ -1,7 +1,7 @@
#pragma once
#include <Processors/Chunk.h>
#include <variant>
#include <Common/ProfileEvents.h>
namespace DB
{
@ -65,6 +65,15 @@ public:
IMergingAlgorithm() = default;
virtual ~IMergingAlgorithm() = default;
struct MergedStats
{
UInt64 bytes = 0;
UInt64 rows = 0;
UInt64 blocks = 0;
};
virtual MergedStats getMergedStats() const = 0;
};
// TODO: use when compile with clang which could support it

View File

@ -16,6 +16,8 @@ public:
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
MergedStats getMergedStats() const override { return merged_data->getMergedStats(); }
private:
Block header;
SortDescription description;

View File

@ -183,6 +183,8 @@ public:
UInt64 totalAllocatedBytes() const { return total_allocated_bytes; }
UInt64 maxBlockSize() const { return max_block_size; }
IMergingAlgorithm::MergedStats getMergedStats() const { return {.bytes = total_allocated_bytes, .rows = total_merged_rows, .blocks = total_chunks}; }
virtual ~MergedData() = default;
protected:

View File

@ -31,7 +31,7 @@ public:
void consume(Input & input, size_t source_num) override;
Status merge() override;
const MergedData & getMergedData() const { return merged_data; }
MergedStats getMergedStats() const override { return merged_data.getMergedStats(); }
private:
Block header;

View File

@ -30,6 +30,8 @@ public:
void consume(Input & input, size_t source_num) override;
Status merge() override;
MergedStats getMergedStats() const override { return merged_data.getMergedStats(); }
struct AggregateDescription;
struct MapDescription;

View File

@ -3,6 +3,11 @@
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event CollapsingSortedMilliseconds;
}
namespace DB
{
@ -36,6 +41,11 @@ public:
}
String getName() const override { return "CollapsingSortedTransform"; }
void onFinish() override
{
logMergedStats(ProfileEvents::CollapsingSortedMilliseconds, "Collapsed sorted", getLogger("CollapsingSortedTransform"));
}
};
}

View File

@ -2,7 +2,10 @@
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/IProcessor.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
namespace DB
{
@ -110,6 +113,8 @@ public:
void work() override
{
Stopwatch watch{CLOCK_MONOTONIC_COARSE};
if (!state.init_chunks.empty())
algorithm.initialize(std::move(state.init_chunks));
@ -147,6 +152,8 @@ public:
// std::cerr << "Finished" << std::endl;
state.is_finished = true;
}
merging_elapsed_ns += watch.elapsedNanoseconds();
}
protected:
@ -156,7 +163,33 @@ protected:
Algorithm algorithm;
/// Profile info.
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
UInt64 merging_elapsed_ns = 0;
void logMergedStats(ProfileEvents::Event elapsed_ms_event, std::string_view transform_message, LoggerPtr log) const
{
auto stats = algorithm.getMergedStats();
UInt64 elapsed_ms = merging_elapsed_ns / 1000000LL;
ProfileEvents::increment(elapsed_ms_event, elapsed_ms);
/// Don't print info for small parts (< 1M rows)
if (stats.rows < 1000000)
return;
double seconds = static_cast<double>(merging_elapsed_ns) / 1000000000ULL;
if (seconds == 0.0)
{
LOG_DEBUG(log, "{}, {} blocks, {} rows, {} bytes in 0 sec.",
transform_message, stats.blocks, stats.rows, stats.bytes);
}
else
{
LOG_DEBUG(log, "{}, {} blocks, {} rows, {} bytes in {} sec., {} rows/sec., {}/sec.",
transform_message, stats.blocks, stats.rows, stats.bytes,
seconds, stats.rows / seconds, ReadableSize(stats.bytes / seconds));
}
}
private:
using IMergingTransformBase::state;

View File

@ -1,9 +1,12 @@
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteBuffer.h>
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
namespace ProfileEvents
{
extern const Event MergingSortedMilliseconds;
}
namespace DB
{
@ -18,7 +21,6 @@ MergingSortedTransform::MergingSortedTransform(
UInt64 limit_,
bool always_read_till_end_,
WriteBuffer * out_row_sources_buf_,
bool quiet_,
bool use_average_block_sizes,
bool have_all_inputs_)
: IMergingTransform(
@ -37,7 +39,6 @@ MergingSortedTransform::MergingSortedTransform(
limit_,
out_row_sources_buf_,
use_average_block_sizes)
, quiet(quiet_)
{
}
@ -48,22 +49,7 @@ void MergingSortedTransform::onNewInput()
void MergingSortedTransform::onFinish()
{
if (quiet)
return;
const auto & merged_data = algorithm.getMergedData();
auto log = getLogger("MergingSortedTransform");
double seconds = total_stopwatch.elapsedSeconds();
if (seconds == 0.0)
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in 0 sec.", merged_data.totalChunks(), merged_data.totalMergedRows());
else
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in {} sec., {} rows/sec., {}/sec",
merged_data.totalChunks(), merged_data.totalMergedRows(), seconds,
merged_data.totalMergedRows() / seconds,
ReadableSize(merged_data.totalAllocatedBytes() / seconds));
logMergedStats(ProfileEvents::MergingSortedMilliseconds, "Merged sorted", getLogger("MergingSortedTransform"));
}
}

View File

@ -21,7 +21,6 @@ public:
UInt64 limit_ = 0,
bool always_read_till_end_ = false,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool quiet_ = false,
bool use_average_block_sizes = false,
bool have_all_inputs_ = true);
@ -30,9 +29,6 @@ public:
protected:
void onNewInput() override;
void onFinish() override;
private:
bool quiet = false;
};
}

View File

@ -3,6 +3,10 @@
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event ReplacingSortedMilliseconds;
}
namespace DB
{
@ -38,6 +42,11 @@ public:
}
String getName() const override { return "ReplacingSorted"; }
void onFinish() override
{
logMergedStats(ProfileEvents::ReplacingSortedMilliseconds, "Replaced sorted", getLogger("ReplacingSortedTransform"));
}
};
}

View File

@ -3,6 +3,11 @@
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/SummingSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event SummingSortedMilliseconds;
}
namespace DB
{
@ -33,6 +38,11 @@ public:
}
String getName() const override { return "SummingSortedTransform"; }
void onFinish() override
{
logMergedStats(ProfileEvents::SummingSortedMilliseconds, "Summed sorted", getLogger("SummingSortedTransform"));
}
};
}

View File

@ -3,6 +3,10 @@
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h>
namespace ProfileEvents
{
extern const Event VersionedCollapsingSortedMilliseconds;
}
namespace DB
{
@ -33,6 +37,11 @@ public:
}
String getName() const override { return "VersionedCollapsingTransform"; }
void onFinish() override
{
logMergedStats(ProfileEvents::VersionedCollapsingSortedMilliseconds, "Versioned collapsed sorted", getLogger("VersionedCollapsingTransform"));
}
};
}

View File

@ -1,11 +1,15 @@
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Common/formatReadable.h>
#include <Columns/ColumnSparse.h>
#include <IO/WriteHelpers.h>
#include <iomanip>
namespace ProfileEvents
{
extern const Event GatheringColumnMilliseconds;
}
namespace DB
{
@ -33,6 +37,13 @@ ColumnGathererStream::ColumnGathererStream(
throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "There are no streams to gather");
}
void ColumnGathererStream::updateStats(const IColumn & column)
{
merged_rows += column.size();
merged_bytes += column.allocatedBytes();
++merged_blocks;
}
void ColumnGathererStream::initialize(Inputs inputs)
{
Columns source_columns;
@ -82,7 +93,9 @@ IMergingAlgorithm::Status ColumnGathererStream::merge()
{
res.addColumn(source_to_fully_copy->column);
}
merged_rows += source_to_fully_copy->size;
updateStats(*source_to_fully_copy->column);
source_to_fully_copy->pos = source_to_fully_copy->size;
source_to_fully_copy = nullptr;
return Status(std::move(res));
@ -96,8 +109,7 @@ IMergingAlgorithm::Status ColumnGathererStream::merge()
{
next_required_source = 0;
Chunk res;
merged_rows += sources.front().column->size();
merged_bytes += sources.front().column->allocatedBytes();
updateStats(*sources.front().column);
res.addColumn(std::move(sources.front().column));
sources.front().pos = sources.front().size = 0;
return Status(std::move(res));
@ -123,8 +135,8 @@ IMergingAlgorithm::Status ColumnGathererStream::merge()
if (source_to_fully_copy && result_column->empty())
{
Chunk res;
merged_rows += source_to_fully_copy->column->size();
merged_bytes += source_to_fully_copy->column->allocatedBytes();
updateStats(*source_to_fully_copy->column);
if (result_column->hasDynamicStructure())
{
auto col = result_column->cloneEmpty();
@ -140,13 +152,13 @@ IMergingAlgorithm::Status ColumnGathererStream::merge()
return Status(std::move(res));
}
auto col = result_column->cloneEmpty();
result_column.swap(col);
auto return_column = result_column->cloneEmpty();
result_column.swap(return_column);
Chunk res;
merged_rows += col->size();
merged_bytes += col->allocatedBytes();
res.addColumn(std::move(col));
updateStats(*return_column);
res.addColumn(std::move(return_column));
return Status(std::move(res), row_sources_buf.eof() && !source_to_fully_copy);
}
@ -185,31 +197,10 @@ ColumnGathererTransform::ColumnGathererTransform(
toString(header.columns()));
}
void ColumnGathererTransform::work()
{
Stopwatch stopwatch;
IMergingTransform<ColumnGathererStream>::work();
elapsed_ns += stopwatch.elapsedNanoseconds();
}
void ColumnGathererTransform::onFinish()
{
auto merged_rows = algorithm.getMergedRows();
auto merged_bytes = algorithm.getMergedRows();
/// Don't print info for small parts (< 10M rows)
if (merged_rows < 10000000)
return;
double seconds = static_cast<double>(elapsed_ns) / 1000000000ULL;
const auto & column_name = getOutputPort().getHeader().getByPosition(0).name;
if (seconds == 0.0)
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in 0 sec.",
column_name, static_cast<double>(merged_bytes) / merged_rows);
else
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.",
column_name, static_cast<double>(merged_bytes) / merged_rows, seconds,
merged_rows / seconds, ReadableSize(merged_bytes / seconds));
logMergedStats(ProfileEvents::GatheringColumnMilliseconds, fmt::format("Gathered column {}", column_name), log);
}
}

View File

@ -72,10 +72,11 @@ public:
template <typename Column>
void gather(Column & column_res);
UInt64 getMergedRows() const { return merged_rows; }
UInt64 getMergedBytes() const { return merged_bytes; }
MergedStats getMergedStats() const override { return {.bytes = merged_bytes, .rows = merged_rows, .blocks = merged_blocks}; }
private:
void updateStats(const IColumn & column);
/// Cache required fields
struct Source
{
@ -105,6 +106,7 @@ private:
ssize_t next_required_source = -1;
UInt64 merged_rows = 0;
UInt64 merged_bytes = 0;
UInt64 merged_blocks = 0;
};
class ColumnGathererTransform final : public IMergingTransform<ColumnGathererStream>
@ -120,12 +122,8 @@ public:
String getName() const override { return "ColumnGathererTransform"; }
void work() override;
protected:
void onFinish() override;
UInt64 elapsed_ns = 0;
LoggerPtr log;
};

View File

@ -511,6 +511,16 @@ void MergeJoinAlgorithm::logElapsed(double seconds)
stat.max_blocks_loaded);
}
IMergingAlgorithm::MergedStats MergeJoinAlgorithm::getMergedStats() const
{
return
{
.bytes = stat.num_bytes[0] + stat.num_bytes[1],
.rows = stat.num_rows[0] + stat.num_rows[1],
.blocks = stat.num_blocks[0] + stat.num_blocks[1],
};
}
static void prepareChunk(Chunk & chunk)
{
if (!chunk)
@ -547,6 +557,7 @@ void MergeJoinAlgorithm::consume(Input & input, size_t source_num)
{
stat.num_blocks[source_num] += 1;
stat.num_rows[source_num] += input.chunk.getNumRows();
stat.num_bytes[source_num] += input.chunk.allocatedBytes();
}
prepareChunk(input.chunk);
@ -1271,7 +1282,7 @@ MergeJoinTransform::MergeJoinTransform(
void MergeJoinTransform::onFinish()
{
algorithm.logElapsed(total_stopwatch.elapsedSeconds());
algorithm.logElapsed(static_cast<double>(merging_elapsed_ns) / 1000000000ULL);
}
}

View File

@ -245,6 +245,8 @@ public:
void setAsofInequality(ASOFJoinInequality asof_inequality_);
void logElapsed(double seconds);
MergedStats getMergedStats() const override;
private:
std::optional<Status> handleAnyJoinState();
Status anyJoin();
@ -280,6 +282,7 @@ private:
{
size_t num_blocks[2] = {0, 0};
size_t num_rows[2] = {0, 0};
size_t num_bytes[2] = {0, 0};
size_t max_blocks_loaded = 0;
};

View File

@ -185,7 +185,6 @@ void MergeSortingTransform::consume(Chunk chunk)
if (!external_merging_sorted)
{
bool quiet = false;
bool have_all_inputs = false;
bool use_average_block_sizes = false;
@ -199,7 +198,6 @@ void MergeSortingTransform::consume(Chunk chunk)
limit,
/*always_read_till_end_=*/ false,
nullptr,
quiet,
use_average_block_sizes,
have_all_inputs);

View File

@ -58,6 +58,16 @@ static void prepareChunk(Chunk & chunk)
chunk.setColumns(std::move(columns), num_rows);
}
IMergingAlgorithm::MergedStats PasteJoinAlgorithm::getMergedStats() const
{
return
{
.bytes = stat.num_bytes[0] + stat.num_bytes[1],
.rows = stat.num_rows[0] + stat.num_rows[1],
.blocks = stat.num_blocks[0] + stat.num_blocks[1],
};
}
void PasteJoinAlgorithm::initialize(Inputs inputs)
{
if (inputs.size() != 2)

View File

@ -35,8 +35,7 @@ public:
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
Status merge() override;
void logElapsed(double seconds);
MergedStats getMergedStats() const override;
private:
Chunk createBlockWithDefaults(size_t source_num);
@ -55,6 +54,7 @@ private:
{
size_t num_blocks[2] = {0, 0};
size_t num_rows[2] = {0, 0};
size_t num_bytes[2] = {0, 0};
size_t max_blocks_loaded = 0;
};

View File

@ -208,6 +208,12 @@ Block executePipeline(QueryPipeline && pipeline)
template <typename T>
void assertColumnVectorEq(const typename ColumnVector<T>::Container & expected, const Block & block, const std::string & name)
{
if (expected.empty())
{
ASSERT_TRUE(block.columns() == 0);
return;
}
const auto * actual = typeid_cast<const ColumnVector<T> *>(block.getByName(name).column.get());
ASSERT_TRUE(actual) << "unexpected column type: " << block.getByName(name).column->dumpStructure() << "expected: " << typeid(ColumnVector<T>).name();
@ -230,6 +236,12 @@ void assertColumnVectorEq(const typename ColumnVector<T>::Container & expected,
template <typename T>
void assertColumnEq(const IColumn & expected, const Block & block, const std::string & name)
{
if (expected.empty())
{
ASSERT_TRUE(block.columns() == 0);
return;
}
const ColumnPtr & actual = block.getByName(name).column;
ASSERT_TRUE(checkColumn<T>(*actual));
ASSERT_TRUE(checkColumn<T>(expected));

View File

@ -83,7 +83,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, true);
pipe.addTransform(std::move(transform));
@ -125,7 +125,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, true);
pipe.addTransform(std::move(transform));

View File

@ -8,10 +8,10 @@
namespace ProfileEvents
{
extern const Event MergesTimeMilliseconds;
extern const Event MergedUncompressedBytes;
extern const Event MergedRows;
extern const Event Merge;
extern const Event MutatedRows;
extern const Event MutatedUncompressedBytes;
}
namespace DB
@ -63,18 +63,17 @@ public:
void updateWatch()
{
UInt64 watch_curr_elapsed = merge_list_element_ptr->watch.elapsed();
ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000);
watch_prev_elapsed = watch_curr_elapsed;
}
void operator() (const Progress & value)
void operator()(const Progress & value)
{
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.read_bytes);
if (stage.is_first)
{
ProfileEvents::increment(ProfileEvents::MergedRows, value.read_rows);
ProfileEvents::increment(ProfileEvents::Merge);
}
if (merge_list_element_ptr->is_mutation)
updateProfileEvents(value, ProfileEvents::MutatedRows, ProfileEvents::MutatedUncompressedBytes);
else
updateProfileEvents(value, ProfileEvents::MergedRows, ProfileEvents::MergedUncompressedBytes);
updateWatch();
merge_list_element_ptr->bytes_read_uncompressed += value.read_bytes;
@ -90,6 +89,14 @@ public:
std::memory_order_relaxed);
}
}
private:
void updateProfileEvents(const Progress & value, ProfileEvents::Event rows_event, ProfileEvents::Event bytes_event) const
{
ProfileEvents::increment(bytes_event, value.read_bytes);
if (stage.is_first)
ProfileEvents::increment(rows_event, value.read_rows);
}
};
}

View File

@ -8,6 +8,7 @@
#include <Common/logger_useful.h>
#include <Common/ActionBlocker.h>
#include <Core/Settings.h>
#include <Common/ProfileEvents.h>
#include <Processors/Transforms/CheckSortedTransform.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Compression/CompressedWriteBuffer.h>
@ -41,6 +42,18 @@
#include <Interpreters/MergeTreeTransaction.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace ProfileEvents
{
extern const Event Merge;
extern const Event MergedColumns;
extern const Event GatheredColumns;
extern const Event MergeTotalMilliseconds;
extern const Event MergeExecuteMilliseconds;
extern const Event MergeHorizontalStageExecuteMilliseconds;
extern const Event MergeVerticalStageExecuteMilliseconds;
extern const Event MergeProjectionStageExecuteMilliseconds;
}
namespace DB
{
@ -179,6 +192,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
{
ProfileEvents::increment(ProfileEvents::Merge);
String local_tmp_prefix;
if (global_ctx->need_prefix)
{
@ -458,6 +473,13 @@ void MergeTask::addGatheringColumn(GlobalRuntimeContextPtr global_ctx, const Str
MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::getContextForNextStage()
{
/// Do not increment for projection stage because time is already accounted in main task.
if (global_ctx->parent_part == nullptr)
{
ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
ProfileEvents::increment(ProfileEvents::MergeHorizontalStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
}
auto new_ctx = std::make_shared<VerticalMergeRuntimeContext>();
new_ctx->rows_sources_write_buf = std::move(ctx->rows_sources_write_buf);
@ -475,8 +497,14 @@ MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::g
MergeTask::StageRuntimeContextPtr MergeTask::VerticalMergeStage::getContextForNextStage()
{
auto new_ctx = std::make_shared<MergeProjectionsRuntimeContext>();
/// Do not increment for projection stage because time is already accounted in main task.
if (global_ctx->parent_part == nullptr)
{
ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
ProfileEvents::increment(ProfileEvents::MergeVerticalStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
}
auto new_ctx = std::make_shared<MergeProjectionsRuntimeContext>();
new_ctx->need_sync = std::move(ctx->need_sync);
ctx.reset();
@ -486,9 +514,14 @@ MergeTask::StageRuntimeContextPtr MergeTask::VerticalMergeStage::getContextForNe
bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute()
{
assert(subtasks_iterator != subtasks.end());
if ((this->**subtasks_iterator)())
return true;
chassert(subtasks_iterator != subtasks.end());
Stopwatch watch;
bool res = (this->**subtasks_iterator)();
ctx->elapsed_execute_ns += watch.elapsedNanoseconds();
if (res)
return res;
/// Move to the next subtask in an array of subtasks
++subtasks_iterator;
@ -695,7 +728,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
{
/// No need to execute this part if it is horizontal merge.
/// No need to execute this part if it is horizontal merge.
if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical)
return false;
@ -945,6 +978,9 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
/// Print overall profiling info. NOTE: it may duplicates previous messages
{
ProfileEvents::increment(ProfileEvents::MergedColumns, global_ctx->merging_columns.size());
ProfileEvents::increment(ProfileEvents::GatheredColumns, global_ctx->gathering_columns.size());
double elapsed_seconds = global_ctx->merge_list_element_ptr->watch.elapsedSeconds();
LOG_DEBUG(ctx->log,
"Merge sorted {} rows, containing {} columns ({} merged, {} gathered) in {} sec., {} rows/sec., {}/sec.",
@ -1042,12 +1078,29 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const
return false;
}
MergeTask::StageRuntimeContextPtr MergeTask::MergeProjectionsStage::getContextForNextStage()
{
/// Do not increment for projection stage because time is already accounted in main task.
/// The projection stage has its own empty projection stage which may add a drift of several milliseconds.
if (global_ctx->parent_part == nullptr)
{
ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
ProfileEvents::increment(ProfileEvents::MergeProjectionStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
}
return nullptr;
}
bool MergeTask::VerticalMergeStage::execute()
{
assert(subtasks_iterator != subtasks.end());
if ((this->**subtasks_iterator)())
return true;
chassert(subtasks_iterator != subtasks.end());
Stopwatch watch;
bool res = (this->**subtasks_iterator)();
ctx->elapsed_execute_ns += watch.elapsedNanoseconds();
if (res)
return res;
/// Move to the next subtask in an array of subtasks
++subtasks_iterator;
@ -1056,9 +1109,14 @@ bool MergeTask::VerticalMergeStage::execute()
bool MergeTask::MergeProjectionsStage::execute()
{
assert(subtasks_iterator != subtasks.end());
if ((this->**subtasks_iterator)())
return true;
chassert(subtasks_iterator != subtasks.end());
Stopwatch watch;
bool res = (this->**subtasks_iterator)();
ctx->elapsed_execute_ns += watch.elapsedNanoseconds();
if (res)
return res;
/// Move to the next subtask in an array of subtasks
++subtasks_iterator;
@ -1105,12 +1163,26 @@ bool MergeTask::VerticalMergeStage::executeVerticalMergeForAllColumns() const
bool MergeTask::execute()
{
assert(stages_iterator != stages.end());
if ((*stages_iterator)->execute())
chassert(stages_iterator != stages.end());
const auto & current_stage = *stages_iterator;
if (current_stage->execute())
return true;
/// Stage is finished, need initialize context for the next stage
auto next_stage_context = (*stages_iterator)->getContextForNextStage();
/// Stage is finished, need to initialize context for the next stage and update profile events.
UInt64 current_elapsed_ms = global_ctx->merge_list_element_ptr->watch.elapsedMilliseconds();
UInt64 stage_elapsed_ms = current_elapsed_ms - global_ctx->prev_elapsed_ms;
global_ctx->prev_elapsed_ms = current_elapsed_ms;
auto next_stage_context = current_stage->getContextForNextStage();
/// Do not increment for projection stage because time is already accounted in main task.
if (global_ctx->parent_part == nullptr)
{
ProfileEvents::increment(current_stage->getTotalTimeProfileEvent(), stage_elapsed_ms);
ProfileEvents::increment(ProfileEvents::MergeTotalMilliseconds, stage_elapsed_ms);
}
/// Move to the next stage in an array of stages
++stages_iterator;
@ -1235,7 +1307,6 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
/* limit_= */0,
/* always_read_till_end_= */false,
ctx->rows_sources_write_buf.get(),
true,
ctx->blocks_are_granules_size);
break;

View File

@ -3,6 +3,7 @@
#include <list>
#include <memory>
#include <Common/ProfileEvents.h>
#include <Common/filesystemHelpers.h>
#include <Compression/CompressedReadBuffer.h>
@ -27,6 +28,12 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
namespace ProfileEvents
{
extern const Event MergeHorizontalStageTotalMilliseconds;
extern const Event MergeVerticalStageTotalMilliseconds;
extern const Event MergeProjectionStageTotalMilliseconds;
}
namespace DB
{
@ -137,6 +144,7 @@ private:
{
virtual void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) = 0;
virtual StageRuntimeContextPtr getContextForNextStage() = 0;
virtual ProfileEvents::Event getTotalTimeProfileEvent() const = 0;
virtual bool execute() = 0;
virtual ~IStage() = default;
};
@ -203,6 +211,7 @@ private:
bool need_prefix;
scope_guard temporary_directory_lock;
UInt64 prev_elapsed_ms{0};
};
using GlobalRuntimeContextPtr = std::shared_ptr<GlobalRuntimeContext>;
@ -249,6 +258,7 @@ private:
/// Dependencies for next stages
std::list<DB::NameAndTypePair>::const_iterator it_name_and_type;
bool need_sync{false};
UInt64 elapsed_execute_ns{0};
};
using ExecuteAndFinalizeHorizontalPartRuntimeContextPtr = std::shared_ptr<ExecuteAndFinalizeHorizontalPartRuntimeContext>;
@ -290,6 +300,7 @@ private:
}
StageRuntimeContextPtr getContextForNextStage() override;
ProfileEvents::Event getTotalTimeProfileEvent() const override { return ProfileEvents::MergeHorizontalStageTotalMilliseconds; }
ExecuteAndFinalizeHorizontalPartRuntimeContextPtr ctx;
GlobalRuntimeContextPtr global_ctx;
@ -329,6 +340,7 @@ private:
QueryPipeline column_parts_pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
std::unique_ptr<CompressedReadBufferFromFile> rows_sources_read_buf{nullptr};
UInt64 elapsed_execute_ns{0};
};
using VerticalMergeRuntimeContextPtr = std::shared_ptr<VerticalMergeRuntimeContext>;
@ -343,6 +355,7 @@ private:
global_ctx = static_pointer_cast<GlobalRuntimeContext>(global);
}
StageRuntimeContextPtr getContextForNextStage() override;
ProfileEvents::Event getTotalTimeProfileEvent() const override { return ProfileEvents::MergeVerticalStageTotalMilliseconds; }
bool prepareVerticalMergeForAllColumns() const;
bool executeVerticalMergeForAllColumns() const;
@ -383,6 +396,7 @@ private:
MergeTasks::iterator projections_iterator;
LoggerPtr log{getLogger("MergeTask::MergeProjectionsStage")};
UInt64 elapsed_execute_ns{0};
};
using MergeProjectionsRuntimeContextPtr = std::shared_ptr<MergeProjectionsRuntimeContext>;
@ -390,12 +404,15 @@ private:
struct MergeProjectionsStage : public IStage
{
bool execute() override;
void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) override
{
ctx = static_pointer_cast<MergeProjectionsRuntimeContext>(local);
global_ctx = static_pointer_cast<GlobalRuntimeContext>(global);
}
StageRuntimeContextPtr getContextForNextStage() override { return nullptr; }
StageRuntimeContextPtr getContextForNextStage() override;
ProfileEvents::Event getTotalTimeProfileEvent() const override { return ProfileEvents::MergeProjectionStageTotalMilliseconds; }
bool mergeMinMaxIndexAndPrepareProjections() const;
bool executeProjections() const;

View File

@ -254,6 +254,7 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
LOG_ERROR(log, "{}. Data after mutation is not byte-identical to data on another replicas. "
"We will download merged part from replica to force byte-identical result.", getCurrentExceptionMessage(false));
mutate_task->updateProfileEvents();
write_part_log(ExecutionStatus::fromCurrentException("", true));
if (storage.getSettings()->detach_not_byte_identical_parts)
@ -281,6 +282,7 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
*/
finish_callback = [storage_ptr = &storage]() { storage_ptr->merge_selecting_task->schedule(); };
ProfileEvents::increment(ProfileEvents::ReplicatedPartMutations);
mutate_task->updateProfileEvents();
write_part_log({});
return true;

View File

@ -102,6 +102,7 @@ bool MutatePlainMergeTreeTask::executeStep()
transaction.commit();
storage.updateMutationEntriesErrors(future_part, true, "");
mutate_task->updateProfileEvents();
write_part_log({});
state = State::NEED_FINISH;
@ -114,6 +115,7 @@ bool MutatePlainMergeTreeTask::executeStep()
PreformattedMessage exception_message = getCurrentExceptionMessageAndPattern(/* with_stacktrace */ false);
LOG_ERROR(getLogger("MutatePlainMergeTreeTask"), exception_message);
storage.updateMutationEntriesErrors(future_part, false, exception_message.text);
mutate_task->updateProfileEvents();
write_part_log(ExecutionStatus::fromCurrentException("", true));
tryLogCurrentException(__PRETTY_FUNCTION__);
return false;

View File

@ -39,7 +39,13 @@
namespace ProfileEvents
{
extern const Event MutateTaskProjectionsCalculationMicroseconds;
extern const Event MutationTotalParts;
extern const Event MutationUntouchedParts;
extern const Event MutationTotalMilliseconds;
extern const Event MutationExecuteMilliseconds;
extern const Event MutationAllPartColumns;
extern const Event MutationSomePartColumns;
extern const Event MutateTaskProjectionsCalculationMicroseconds;
}
namespace CurrentMetrics
@ -1047,6 +1053,7 @@ struct MutationContext
/// Whether we need to count lightweight delete rows in this mutation
bool count_lightweight_deleted_rows;
UInt64 execute_elapsed_ns = 0;
};
using MutationContextPtr = std::shared_ptr<MutationContext>;
@ -1899,6 +1906,9 @@ MutateTask::MutateTask(
bool MutateTask::execute()
{
Stopwatch watch;
SCOPE_EXIT({ ctx->execute_elapsed_ns += watch.elapsedNanoseconds(); });
switch (state)
{
case State::NEED_PREPARE:
@ -1932,6 +1942,15 @@ bool MutateTask::execute()
return false;
}
void MutateTask::updateProfileEvents() const
{
UInt64 total_elapsed_ms = (*ctx->mutate_entry)->watch.elapsedMilliseconds();
UInt64 execute_elapsed_ms = ctx->execute_elapsed_ns / 1000000UL;
ProfileEvents::increment(ProfileEvents::MutationTotalMilliseconds, total_elapsed_ms);
ProfileEvents::increment(ProfileEvents::MutationExecuteMilliseconds, execute_elapsed_ms);
}
static bool canSkipConversionToNullable(const MergeTreeDataPartPtr & part, const MutationCommand & command)
{
if (command.type != MutationCommand::READ_COLUMN)
@ -1994,6 +2013,7 @@ static bool canSkipMutationCommandForPart(const MergeTreeDataPartPtr & part, con
bool MutateTask::prepare()
{
ProfileEvents::increment(ProfileEvents::MutationTotalParts);
MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry);
if (ctx->future_part->parts.size() != 1)
@ -2056,6 +2076,7 @@ bool MutateTask::prepare()
ctx->temporary_directory_lock = std::move(lock);
}
ProfileEvents::increment(ProfileEvents::MutationUntouchedParts);
promise.set_value(std::move(part));
return false;
}
@ -2182,6 +2203,7 @@ bool MutateTask::prepare()
ctx->new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS;
task = std::make_unique<MutateAllPartColumnsTask>(ctx);
ProfileEvents::increment(ProfileEvents::MutationAllPartColumns);
}
else /// TODO: check that we modify only non-key columns in this case.
{
@ -2241,6 +2263,7 @@ bool MutateTask::prepare()
ctx->new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::ASK_KEEPER;
task = std::make_unique<MutateSomePartColumnsTask>(ctx);
ProfileEvents::increment(ProfileEvents::MutationSomePartColumns);
}
return true;

View File

@ -39,6 +39,7 @@ public:
bool need_prefix_);
bool execute();
void updateProfileEvents() const;
std::future<MergeTreeData::MutableDataPartPtr> getFuture()
{

View File

@ -19,7 +19,6 @@ from env_helper import TEMP_PATH
from git_helper import git_runner, is_shallow
from github_helper import GitHub, PullRequest, PullRequests, Repository
from s3_helper import S3Helper
from get_robot_token import get_best_robot_token
from ci_utils import Shell
from version_helper import (
FILE_WITH_VERSION_PATH,
@ -172,7 +171,6 @@ def parse_args() -> argparse.Namespace:
parser.add_argument(
"--gh-user-or-token",
help="user name or GH token to authenticate",
default=get_best_robot_token(),
)
parser.add_argument(
"--gh-password",

View File

@ -484,7 +484,7 @@ class ReleaseInfo:
)
else:
if not dry_run:
assert not self.changelog_pr
assert not self.version_bump_pr
self.prs_merged = res

View File

@ -708,7 +708,7 @@ def test_no_key_found_disk(cluster, broken_s3):
"""
SELECT value
FROM system.metrics
WHERE metric = 'S3DiskNoKeyErrors'
WHERE metric = 'DiskS3NoSuchKeyErrors'
"""
).strip()
)

View File

@ -464,7 +464,7 @@ def test_restart_broken(started_cluster):
"""
SELECT value
FROM system.metrics
WHERE metric = 'S3DiskNoKeyErrors'
WHERE metric = 'DiskS3NoSuchKeyErrors'
"""
).strip()
)

View File

@ -121,21 +121,15 @@ def node_update_config(mode, setting, value=None):
node.restart_clickhouse()
def assert_took(took, should_took):
def assert_took(took, should_take):
# we need to decrease the lower limit because the server limits could
# be enforced by throttling some server background IO instead of query IO
# and we have no control over it
#
# and the same for upper limit, it can be slightly larger, due to for
# instance network latencies or CPU starvation
if should_took > 0:
assert took >= should_took * 0.85 and took <= should_took * 1.8
else:
assert took >= should_took * 0.85
assert took >= should_take * 0.85
@pytest.mark.parametrize(
"policy,backup_name,mode,setting,value,should_took",
"policy,backup_name,mode,setting,value,should_take",
[
#
# Local -> Local
@ -149,7 +143,7 @@ def assert_took(took, should_took):
0,
id="no_local_throttling",
),
# reading 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
# reading 1e6*8 bytes with 1M default bandwidth should take (8-1)/1=7 seconds
pytest.param(
"default",
next_backup_name("local"),
@ -159,7 +153,7 @@ def assert_took(took, should_took):
7,
id="user_local_throttling",
),
# reading 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
# reading 1e6*8 bytes with 2M default bandwidth should take (8-2)/2=3 seconds
pytest.param(
"default",
next_backup_name("local"),
@ -181,7 +175,7 @@ def assert_took(took, should_took):
0,
id="no_remote_to_local_throttling",
),
# reading 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
# reading 1e6*8 bytes with 1M default bandwidth should take (8-1)/1=7 seconds
pytest.param(
"s3",
next_backup_name("local"),
@ -191,7 +185,7 @@ def assert_took(took, should_took):
7,
id="user_remote_to_local_throttling",
),
# reading 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
# reading 1e6*8 bytes with 2M default bandwidth should take (8-2)/2=3 seconds
pytest.param(
"s3",
next_backup_name("local"),
@ -252,7 +246,7 @@ def assert_took(took, should_took):
0,
id="no_local_to_remote_throttling",
),
# reading 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
# reading 1e6*8 bytes with 1M default bandwidth should take (8-1)/1=7 seconds
pytest.param(
"default",
next_backup_name("remote"),
@ -262,7 +256,7 @@ def assert_took(took, should_took):
7,
id="user_local_to_remote_throttling",
),
# reading 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
# reading 1e6*8 bytes with 2M default bandwidth should take (8-2)/2=3 seconds
pytest.param(
"default",
next_backup_name("remote"),
@ -274,7 +268,7 @@ def assert_took(took, should_took):
),
],
)
def test_backup_throttling(policy, backup_name, mode, setting, value, should_took):
def test_backup_throttling(policy, backup_name, mode, setting, value, should_take):
node_update_config(mode, setting, value)
node.query(
f"""
@ -284,7 +278,7 @@ def test_backup_throttling(policy, backup_name, mode, setting, value, should_too
"""
)
_, took = elapsed(node.query, f"backup table data to {backup_name}")
assert_took(took, should_took)
assert_took(took, should_take)
def test_backup_throttling_override():
@ -305,18 +299,18 @@ def test_backup_throttling_override():
"max_backup_bandwidth": "500K",
},
)
# reading 1e6*8 bytes with 500Ki default bandwith should take (8-0.5)/0.5=15 seconds
# reading 1e6*8 bytes with 500Ki default bandwidth should take (8-0.5)/0.5=15 seconds
assert_took(took, 15)
@pytest.mark.parametrize(
"policy,mode,setting,value,should_took",
"policy,mode,setting,value,should_take",
[
#
# Local
#
pytest.param("default", None, None, None, 0, id="no_local_throttling"),
# reading 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
# reading 1e6*8 bytes with 1M default bandwidth should take (8-1)/1=7 seconds
pytest.param(
"default",
"user",
@ -325,7 +319,7 @@ def test_backup_throttling_override():
7,
id="user_local_throttling",
),
# reading 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
# reading 1e6*8 bytes with 2M default bandwidth should take (8-2)/2=3 seconds
pytest.param(
"default",
"server",
@ -338,7 +332,7 @@ def test_backup_throttling_override():
# Remote
#
pytest.param("s3", None, None, None, 0, id="no_remote_throttling"),
# reading 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
# reading 1e6*8 bytes with 1M default bandwidth should take (8-1)/1=7 seconds
pytest.param(
"s3",
"user",
@ -347,7 +341,7 @@ def test_backup_throttling_override():
7,
id="user_remote_throttling",
),
# reading 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
# reading 1e6*8 bytes with 2M default bandwidth should take (8-2)/2=3 seconds
pytest.param(
"s3",
"server",
@ -358,7 +352,7 @@ def test_backup_throttling_override():
),
],
)
def test_read_throttling(policy, mode, setting, value, should_took):
def test_read_throttling(policy, mode, setting, value, should_take):
node_update_config(mode, setting, value)
node.query(
f"""
@ -368,17 +362,17 @@ def test_read_throttling(policy, mode, setting, value, should_took):
"""
)
_, took = elapsed(node.query, f"select * from data")
assert_took(took, should_took)
assert_took(took, should_take)
@pytest.mark.parametrize(
"policy,mode,setting,value,should_took",
"policy,mode,setting,value,should_take",
[
#
# Local
#
pytest.param("default", None, None, None, 0, id="no_local_throttling"),
# reading 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
# reading 1e6*8 bytes with 1M default bandwidth should take (8-1)/1=7 seconds
pytest.param(
"default",
"user",
@ -387,7 +381,7 @@ def test_read_throttling(policy, mode, setting, value, should_took):
7,
id="local_user_throttling",
),
# reading 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
# reading 1e6*8 bytes with 2M default bandwidth should take (8-2)/2=3 seconds
pytest.param(
"default",
"server",
@ -400,7 +394,7 @@ def test_read_throttling(policy, mode, setting, value, should_took):
# Remote
#
pytest.param("s3", None, None, None, 0, id="no_remote_throttling"),
# writing 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
# writing 1e6*8 bytes with 1M default bandwidth should take (8-1)/1=7 seconds
pytest.param(
"s3",
"user",
@ -409,7 +403,7 @@ def test_read_throttling(policy, mode, setting, value, should_took):
7,
id="user_remote_throttling",
),
# writing 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
# writing 1e6*8 bytes with 2M default bandwidth should take (8-2)/2=3 seconds
pytest.param(
"s3",
"server",
@ -420,7 +414,7 @@ def test_read_throttling(policy, mode, setting, value, should_took):
),
],
)
def test_write_throttling(policy, mode, setting, value, should_took):
def test_write_throttling(policy, mode, setting, value, should_take):
node_update_config(mode, setting, value)
node.query(
f"""
@ -429,7 +423,7 @@ def test_write_throttling(policy, mode, setting, value, should_took):
"""
)
_, took = elapsed(node.query, f"insert into data select * from numbers(1e6)")
assert_took(took, should_took)
assert_took(took, should_take)
def test_max_mutations_bandwidth_for_server():
@ -444,7 +438,7 @@ def test_max_mutations_bandwidth_for_server():
node.query,
"alter table data update key = -key where 1 settings mutations_sync = 1",
)
# reading 1e6*8 bytes with 1M/s bandwith should take (8-1)/1=7 seconds
# reading 1e6*8 bytes with 1M/s bandwidth should take (8-1)/1=7 seconds
assert_took(took, 7)
@ -457,5 +451,5 @@ def test_max_merges_bandwidth_for_server():
)
node.query("insert into data select * from numbers(1e6)")
_, took = elapsed(node.query, "optimize table data final")
# reading 1e6*8 bytes with 1M/s bandwith should take (8-1)/1=7 seconds
# reading 1e6*8 bytes with 1M/s bandwidth should take (8-1)/1=7 seconds
assert_took(took, 7)

View File

@ -39,7 +39,7 @@ SYSTEM FLUSH LOGS;
SELECT
if(count() == 2, 'Ok', 'Error: ' || toString(count())),
if(SUM(ProfileEvents['MergedRows']) == 512, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MergedRows']))),
if(SUM(ProfileEvents['MutatedRows']) == 512, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MutatedRows']))),
if(SUM(ProfileEvents['FileOpen']) > 1, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['FileOpen'])))
FROM system.part_log
WHERE event_time > now() - INTERVAL 10 MINUTE

View File

@ -5,7 +5,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} "
${CLICKHOUSE_CLIENT} -q "
DROP TABLE IF EXISTS test;
CREATE TABLE test (a String, b String, c String) ENGINE = MergeTree ORDER BY (a, b, c) SETTINGS index_granularity = 11;
@ -37,8 +37,9 @@ WHERE a >= (round(pow(sipHash64(1, try), 1 / (3 + sipHash64(2, try) % 8))) AS a1
AND b <= (b1 + round(pow(sipHash64(7, try), 1 / (3 + sipHash64(8, try) % 8))))::String
AND c >= (round(pow(sipHash64(9, try), 1 / (3 + sipHash64(10, try) % 8))) AS c1)::String
AND c <= (c1 + round(pow(sipHash64(11, try), 1 / (3 + sipHash64(12, try) % 8))))::String
HAVING count() > 0;
"
HAVING count() > 0
SETTINGS trace_profile_events=0 -- test is too slow with profiling
;"
done | ${CLICKHOUSE_CLIENT}
${CLICKHOUSE_CLIENT} "DROP TABLE test"
${CLICKHOUSE_CLIENT} -q "DROP TABLE test"

View File

@ -0,0 +1,3 @@
Horizontal 1 20000 3 0 480000 1 1 1 1
Vertical 1 20000 1 2 480000 1 1 1 1 1 1
Vertical 2 400000 2 6 12800000 1 1 1 1 1 1 1 1 1 1

View File

@ -0,0 +1,90 @@
-- Tags: no-random-settings, no-random-merge-tree-settings
DROP TABLE IF EXISTS t_merge_profile_events_1;
CREATE TABLE t_merge_profile_events_1 (id UInt64, v1 UInt64, v2 UInt64)
ENGINE = MergeTree ORDER BY id
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO t_merge_profile_events_1 SELECT number, number, number FROM numbers(10000);
INSERT INTO t_merge_profile_events_1 SELECT number, number, number FROM numbers(10000);
OPTIMIZE TABLE t_merge_profile_events_1 FINAL;
SYSTEM FLUSH LOGS;
SELECT
merge_algorithm,
ProfileEvents['Merge'],
ProfileEvents['MergedRows'],
ProfileEvents['MergedColumns'],
ProfileEvents['GatheredColumns'],
ProfileEvents['MergedUncompressedBytes'],
ProfileEvents['MergeTotalMilliseconds'] > 0,
ProfileEvents['MergeExecuteMilliseconds'] > 0,
ProfileEvents['MergeHorizontalStageTotalMilliseconds'] > 0,
ProfileEvents['MergeHorizontalStageExecuteMilliseconds'] > 0
FROM system.part_log WHERE database = currentDatabase() AND table = 't_merge_profile_events_1' AND event_type = 'MergeParts' AND part_name = 'all_1_2_1';
DROP TABLE IF EXISTS t_merge_profile_events_1;
DROP TABLE IF EXISTS t_merge_profile_events_2;
CREATE TABLE t_merge_profile_events_2 (id UInt64, v1 UInt64, v2 UInt64)
ENGINE = MergeTree ORDER BY id
SETTINGS min_bytes_for_wide_part = 0, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1;
INSERT INTO t_merge_profile_events_2 SELECT number, number, number FROM numbers(10000);
INSERT INTO t_merge_profile_events_2 SELECT number, number, number FROM numbers(10000);
OPTIMIZE TABLE t_merge_profile_events_2 FINAL;
SYSTEM FLUSH LOGS;
SELECT
merge_algorithm,
ProfileEvents['Merge'],
ProfileEvents['MergedRows'],
ProfileEvents['MergedColumns'],
ProfileEvents['GatheredColumns'],
ProfileEvents['MergedUncompressedBytes'],
ProfileEvents['MergeTotalMilliseconds'] > 0,
ProfileEvents['MergeExecuteMilliseconds'] > 0,
ProfileEvents['MergeHorizontalStageTotalMilliseconds'] > 0,
ProfileEvents['MergeHorizontalStageExecuteMilliseconds'] > 0,
ProfileEvents['MergeVerticalStageTotalMilliseconds'] > 0,
ProfileEvents['MergeVerticalStageExecuteMilliseconds'] > 0,
FROM system.part_log WHERE database = currentDatabase() AND table = 't_merge_profile_events_2' AND event_type = 'MergeParts' AND part_name = 'all_1_2_1';
DROP TABLE IF EXISTS t_merge_profile_events_2;
DROP TABLE IF EXISTS t_merge_profile_events_3;
CREATE TABLE t_merge_profile_events_3 (id UInt64, v1 UInt64, v2 UInt64, PROJECTION p (SELECT v2, v2 * v2, v2 * 2, v2 * 10, v1 ORDER BY v1))
ENGINE = MergeTree ORDER BY id
SETTINGS min_bytes_for_wide_part = 0, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1;
INSERT INTO t_merge_profile_events_3 SELECT number, number, number FROM numbers(100000);
INSERT INTO t_merge_profile_events_3 SELECT number, number, number FROM numbers(100000);
OPTIMIZE TABLE t_merge_profile_events_3 FINAL;
SYSTEM FLUSH LOGS;
SELECT
merge_algorithm,
ProfileEvents['Merge'],
ProfileEvents['MergedRows'],
ProfileEvents['MergedColumns'],
ProfileEvents['GatheredColumns'],
ProfileEvents['MergedUncompressedBytes'],
ProfileEvents['MergeTotalMilliseconds'] > 0,
ProfileEvents['MergeExecuteMilliseconds'] > 0,
ProfileEvents['MergeHorizontalStageTotalMilliseconds'] > 0,
ProfileEvents['MergeHorizontalStageExecuteMilliseconds'] > 0,
ProfileEvents['MergeVerticalStageTotalMilliseconds'] > 0,
ProfileEvents['MergeVerticalStageExecuteMilliseconds'] > 0,
ProfileEvents['MergeProjectionStageTotalMilliseconds'] > 0,
ProfileEvents['MergeProjectionStageExecuteMilliseconds'] > 0,
ProfileEvents['MergeExecuteMilliseconds'] <= duration_ms,
ProfileEvents['MergeTotalMilliseconds'] <= duration_ms
FROM system.part_log WHERE database = currentDatabase() AND table = 't_merge_profile_events_3' AND event_type = 'MergeParts' AND part_name = 'all_1_2_1';
DROP TABLE IF EXISTS t_merge_profile_events_3;

View File

@ -0,0 +1,2 @@
3 2 1 10000 160000 0 1 1 1
4 2 1 10000 320000 1 0 1 1

View File

@ -0,0 +1,33 @@
-- Tags: no-random-settings, no-random-merge-tree-settings
DROP TABLE IF EXISTS t_mutate_profile_events;
CREATE TABLE t_mutate_profile_events (key UInt64, id UInt64, v1 UInt64, v2 UInt64)
ENGINE = MergeTree ORDER BY id PARTITION BY key
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO t_mutate_profile_events SELECT 1, number, number, number FROM numbers(10000);
INSERT INTO t_mutate_profile_events SELECT 2, number, number, number FROM numbers(10000);
SET mutations_sync = 2;
ALTER TABLE t_mutate_profile_events UPDATE v1 = 1000 WHERE key = 1;
ALTER TABLE t_mutate_profile_events DELETE WHERE key = 2 AND v2 % 10 = 0;
SYSTEM FLUSH LOGS;
SELECT
splitByChar('_', part_name)[-1] AS version,
sum(ProfileEvents['MutationTotalParts']),
sum(ProfileEvents['MutationUntouchedParts']),
sum(ProfileEvents['MutatedRows']),
sum(ProfileEvents['MutatedUncompressedBytes']),
sum(ProfileEvents['MutationAllPartColumns']),
sum(ProfileEvents['MutationSomePartColumns']),
sum(ProfileEvents['MutationTotalMilliseconds']) > 0,
sum(ProfileEvents['MutationExecuteMilliseconds']) > 0,
FROM system.part_log
WHERE database = currentDatabase() AND table = 't_mutate_profile_events' AND event_type = 'MutatePart'
GROUP BY version ORDER BY version;
DROP TABLE IF EXISTS t_mutate_profile_events

View File

@ -13,6 +13,7 @@ v24.4.4.113-stable 2024-08-02
v24.4.3.25-stable 2024-06-14
v24.4.2.141-stable 2024-06-07
v24.4.1.2088-stable 2024-05-01
v24.3.7.30-lts 2024-08-14
v24.3.6.48-lts 2024-08-02
v24.3.5.46-lts 2024-07-03
v24.3.4.147-lts 2024-06-13

1 v24.7.3.42-stable 2024-08-08
13 v24.4.3.25-stable 2024-06-14
14 v24.4.2.141-stable 2024-06-07
15 v24.4.1.2088-stable 2024-05-01
16 v24.3.7.30-lts 2024-08-14
17 v24.3.6.48-lts 2024-08-02
18 v24.3.5.46-lts 2024-07-03
19 v24.3.4.147-lts 2024-06-13