Revert "Merge pull request #57932 from ClickHouse/remove-shit-cleanup"

This reverts commit 2d58dc512c, reversing
changes made to 41873dc4a3.
This commit is contained in:
Alexander Tokmakov 2023-12-27 13:46:06 +01:00
parent 4c4623e3ab
commit 01d042c490
55 changed files with 754 additions and 49 deletions

View File

@ -25,7 +25,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
[SETTINGS name=value, clean_deleted_rows=value, ...]
```
For a description of request parameters, see [statement description](../../../sql-reference/statements/create/table.md).
@ -88,6 +88,53 @@ SELECT * FROM mySecondReplacingMT FINAL;
└─────┴─────────┴─────────────────────┘
```
### is_deleted
`is_deleted` — Name of a column used during a merge to determine whether the data in this row represents the state or is to be deleted; `1` is a “deleted“ row, `0` is a “state“ row.
Column data type — `UInt8`.
:::note
`is_deleted` can only be enabled when `ver` is used.
The row is deleted when `OPTIMIZE ... FINAL CLEANUP` or `OPTIMIZE ... FINAL` is used, or if the engine setting `clean_deleted_rows` has been set to `Always`.
No matter the operation on the data, the version must be increased. If two inserted rows have the same version number, the last inserted row is the one kept.
:::
Example:
```sql
-- with ver and is_deleted
CREATE OR REPLACE TABLE myThirdReplacingMT
(
`key` Int64,
`someCol` String,
`eventTime` DateTime,
`is_deleted` UInt8
)
ENGINE = ReplacingMergeTree(eventTime, is_deleted)
ORDER BY key;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 0);
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 1);
select * from myThirdReplacingMT final;
0 rows in set. Elapsed: 0.003 sec.
-- delete rows with is_deleted
OPTIMIZE TABLE myThirdReplacingMT FINAL CLEANUP;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 00:00:00', 0);
select * from myThirdReplacingMT final;
┌─key─┬─someCol─┬───────────eventTime─┬─is_deleted─┐
│ 1 │ first │ 2020-01-01 00:00:00 │ 0 │
└─────┴─────────┴─────────────────────┴────────────┘
```
## Query clauses
When creating a `ReplacingMergeTree` table the same [clauses](../../../engines/table-engines/mergetree-family/mergetree.md) are required, as when creating a `MergeTree` table.

View File

@ -852,6 +852,16 @@ If the file name for column is too long (more than `max_file_name_length` bytes)
The maximal length of the file name to keep it as is without hashing. Takes effect only if setting `replace_long_file_name_to_hash` is enabled. The value of this setting does not include the length of file extension. So, it is recommended to set it below the maximum filename length (usually 255 bytes) with some gap to avoid filesystem errors. Default value: 127.
## clean_deleted_rows
Enable/disable automatic deletion of rows flagged as `is_deleted` when perform `OPTIMIZE ... FINAL` on a table using the ReplacingMergeTree engine. When disabled, the `CLEANUP` keyword has to be added to the `OPTIMIZE ... FINAL` to have the same behaviour.
Possible values:
- `Always` or `Never`.
Default value: `Never`
## allow_experimental_block_number_column
Persists virtual column `_block_number` on merges.

View File

@ -86,6 +86,59 @@ SELECT * FROM mySecondReplacingMT FINAL;
│ 1 │ first │ 2020-01-01 01:01:01 │
└─────┴─────────┴─────────────────────┘
```
### is_deleted
`is_deleted` — Имя столбца, который используется во время слияния для обозначения того, нужно ли отображать строку или она подлежит удалению; `1` - для удаления строки, `0` - для отображения строки.
Тип данных столбца — `UInt8`.
:::note
`is_deleted` может быть использован, если `ver` используется.
Строка удаляется в следующих случаях:
- при использовании инструкции `OPTIMIZE ... FINAL CLEANUP`
- при использовании инструкции `OPTIMIZE ... FINAL`
- параметр движка `clean_deleted_rows` установлен в значение `Always` (по умолчанию - `Never`)
- есть новые версии строки
Не рекомендуется выполнять `FINAL CLEANUP` или использовать параметр движка `clean_deleted_rows` со значением `Always`, это может привести к неожиданным результатам, например удаленные строки могут вновь появиться.
Вне зависимости от производимых изменений над данными, версия должна увеличиваться. Если у двух строк одна и та же версия, то остается только последняя вставленная строка.
:::
Пример:
```sql
-- with ver and is_deleted
CREATE OR REPLACE TABLE myThirdReplacingMT
(
`key` Int64,
`someCol` String,
`eventTime` DateTime,
`is_deleted` UInt8
)
ENGINE = ReplacingMergeTree(eventTime, is_deleted)
ORDER BY key;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 0);
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 1);
select * from myThirdReplacingMT final;
0 rows in set. Elapsed: 0.003 sec.
-- delete rows with is_deleted
OPTIMIZE TABLE myThirdReplacingMT FINAL CLEANUP;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 00:00:00', 0);
select * from myThirdReplacingMT final;
┌─key─┬─someCol─┬───────────eventTime─┬─is_deleted─┐
│ 1 │ first │ 2020-01-01 00:00:00 │ 0 │
└─────┴─────────┴─────────────────────┴────────────┘
```
## Секции запроса

View File

@ -1 +0,0 @@
../../../tests/config/config.d/graphite_alternative.xml

View File

@ -98,6 +98,8 @@ IMPLEMENT_SETTING_AUTO_ENUM(DefaultDatabaseEngine, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_AUTO_ENUM(DefaultTableEngine, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_AUTO_ENUM(CleanDeletedRows, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_MULTI_ENUM(MySQLDataTypesSupport, ErrorCodes::UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL,
{{"decimal", MySQLDataTypesSupport::DECIMAL},
{"datetime64", MySQLDataTypesSupport::DATETIME64},

View File

@ -140,6 +140,14 @@ enum class DefaultTableEngine
DECLARE_SETTING_ENUM(DefaultTableEngine)
enum class CleanDeletedRows
{
Never = 0, /// Disable.
Always,
};
DECLARE_SETTING_ENUM(CleanDeletedRows)
enum class MySQLDataTypesSupport
{
DECIMAL, // convert MySQL's decimal and number to ClickHouse Decimal when applicable

View File

@ -15,6 +15,7 @@
#include <Common/thread_local_rng.h>
#include <Common/FieldVisitorToString.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/callOnce.h>
#include <Common/SharedLockGuard.h>
#include <Coordination/KeeperDispatcher.h>
@ -32,6 +33,7 @@
#include <Storages/StorageS3Settings.h>
#include <Disks/DiskLocal.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/StoragePolicy.h>
#include <Disks/IO/IOUringReader.h>
#include <IO/SynchronousReader.h>
@ -43,6 +45,7 @@
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/SessionTracker.h>
#include <Core/ServerSettings.h>
#include <Interpreters/PreparedSets.h>
#include <Core/Settings.h>
#include <Core/SettingsQuirks.h>
#include <Access/AccessControl.h>

View File

@ -79,7 +79,7 @@ BlockIO InterpreterOptimizeQuery::execute()
if (auto * snapshot_data = dynamic_cast<MergeTreeData::SnapshotData *>(storage_snapshot->data.get()))
snapshot_data->parts = {};
table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, getContext());
table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, ast.cleanup, getContext());
return {};
}

View File

@ -24,6 +24,9 @@ void ASTOptimizeQuery::formatQueryImpl(const FormatSettings & settings, FormatSt
if (deduplicate)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " DEDUPLICATE" << (settings.hilite ? hilite_none : "");
if (cleanup)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " CLEANUP" << (settings.hilite ? hilite_none : "");
if (deduplicate_by_columns)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " BY " << (settings.hilite ? hilite_none : "");

View File

@ -21,10 +21,12 @@ public:
bool deduplicate = false;
/// Deduplicate by columns.
ASTPtr deduplicate_by_columns;
/// Delete 'is_deleted' data
bool cleanup = false;
/** Get the text that identifies this element. */
String getID(char delim) const override
{
return "OptimizeQuery" + (delim + getDatabase()) + delim + getTable() + (final ? "_final" : "") + (deduplicate ? "_deduplicate" : "");
return "OptimizeQuery" + (delim + getDatabase()) + delim + getTable() + (final ? "_final" : "") + (deduplicate ? "_deduplicate" : "")+ (cleanup ? "_cleanup" : "");
}
ASTPtr clone() const override

View File

@ -28,6 +28,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
ParserKeyword s_partition("PARTITION");
ParserKeyword s_final("FINAL");
ParserKeyword s_deduplicate("DEDUPLICATE");
ParserKeyword s_cleanup("CLEANUP");
ParserKeyword s_by("BY");
ParserToken s_dot(TokenType::Dot);
ParserIdentifier name_p(true);
@ -38,6 +39,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
ASTPtr partition;
bool final = false;
bool deduplicate = false;
bool cleanup = false;
String cluster_str;
if (!s_optimize_table.ignore(pos, expected))
@ -68,6 +70,9 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
if (s_deduplicate.ignore(pos, expected))
deduplicate = true;
if (s_cleanup.ignore(pos, expected))
cleanup = true;
ASTPtr deduplicate_by_columns;
if (deduplicate && s_by.ignore(pos, expected))
{
@ -85,6 +90,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
query->final = final;
query->deduplicate = deduplicate;
query->deduplicate_by_columns = deduplicate_by_columns;
query->cleanup = cleanup;
query->database = database;
query->table = table;

View File

@ -3,22 +3,33 @@
#include <Columns/ColumnsNumber.h>
#include <IO/WriteBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
const Block & header_,
size_t num_inputs,
SortDescription description_,
const String & is_deleted_column,
const String & version_column,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
bool use_average_block_sizes,
bool cleanup_,
size_t * cleanedup_rows_count_)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows, max_block_size_bytes)
, cleanup(cleanup_)
, cleanedup_rows_count(cleanedup_rows_count_)
{
if (!is_deleted_column.empty())
is_deleted_column_number = header_.getPositionByName(is_deleted_column);
if (!version_column.empty())
version_column_number = header_.getPositionByName(version_column);
}
@ -65,7 +76,21 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/// Write the data for the previous primary key.
if (!selected_row.empty())
insertRow();
{
if (is_deleted_column_number != -1)
{
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
if (!cleanup || !value)
insertRow();
else if (cleanup && cleanedup_rows_count != nullptr)
{
*cleanedup_rows_count += current_row_sources.size();
current_row_sources.resize(0);
}
}
else
insertRow();
}
selected_row.clear();
}
@ -75,6 +100,13 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
if (out_row_sources_buf)
current_row_sources.emplace_back(current.impl->order, true);
if (is_deleted_column_number != -1)
{
const UInt8 is_deleted = assert_cast<const ColumnUInt8 &>(*current->all_columns[is_deleted_column_number]).getData()[current->getRow()];
if ((is_deleted != 1) && (is_deleted != 0))
throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect data: is_deleted = {} (must be 1 or 0).", toString(is_deleted));
}
/// A non-strict comparison, since we select the last row for the same version values.
if (version_column_number == -1
|| selected_row.empty()
@ -105,7 +137,21 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/// We will write the data for the last primary key.
if (!selected_row.empty())
insertRow();
{
if (is_deleted_column_number != -1)
{
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
if (!cleanup || !value)
insertRow();
else if (cleanup && cleanedup_rows_count != nullptr)
{
*cleanedup_rows_count += current_row_sources.size();
current_row_sources.resize(0);
}
}
else
insertRow();
}
return Status(merged_data.pull(), true);
}

View File

@ -21,11 +21,14 @@ public:
ReplacingSortedAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_,
const String & is_deleted_column,
const String & version_column,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
bool use_average_block_sizes = false,
bool cleanup = false,
size_t * cleanedup_rows_count = nullptr);
const char * getName() const override { return "ReplacingSortedAlgorithm"; }
Status merge() override;
@ -33,7 +36,10 @@ public:
private:
MergedData merged_data;
ssize_t is_deleted_column_number = -1;
ssize_t version_column_number = -1;
bool cleanup = false;
size_t * cleanedup_rows_count = nullptr;
using RowRef = detail::RowRefWithOwnedChunk;
static constexpr size_t max_row_refs = 2; /// last, current.

View File

@ -14,21 +14,26 @@ public:
ReplacingSortedTransform(
const Block & header, size_t num_inputs,
SortDescription description_,
const String & version_column,
const String & is_deleted_column, const String & version_column,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
bool use_average_block_sizes = false,
bool cleanup = false,
size_t * cleanedup_rows_count = nullptr)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),
is_deleted_column,
version_column,
max_block_size_rows,
max_block_size_bytes,
out_row_sources_buf_,
use_average_block_sizes)
use_average_block_sizes,
cleanup,
cleanedup_rows_count)
{
}

View File

@ -1025,7 +1025,7 @@ static void addMergingFinal(
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(header, num_outputs,
sort_description, merging_params.version_column, max_block_size_rows, /*max_block_size_bytes=*/0, /*out_row_sources_buf_*/ nullptr, /*use_average_block_sizes*/ false);
sort_description, merging_params.is_deleted_column, merging_params.version_column, max_block_size_rows, /*max_block_size_bytes=*/0, /*out_row_sources_buf_*/ nullptr, /*use_average_block_sizes*/ false, /*cleanup*/ !merging_params.is_deleted_column.empty());
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(header, num_outputs,
@ -1099,7 +1099,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
/// can use parallel select on such parts.
bool no_merging_final = settings.do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0;
parts_to_merge_ranges[range_index]->data_part->info.level > 0 &&
data.merging_params.is_deleted_column.empty();
Pipes pipes;
{
RangesInDataParts new_parts;
@ -1842,6 +1843,8 @@ Pipe ReadFromMergeTree::spreadMarkRanges(
}
}
if (!data.merging_params.is_deleted_column.empty() && !names.contains(data.merging_params.is_deleted_column))
column_names_to_read.push_back(data.merging_params.is_deleted_column);
if (!data.merging_params.sign_column.empty() && !names.contains(data.merging_params.sign_column))
column_names_to_read.push_back(data.merging_params.sign_column);
if (!data.merging_params.version_column.empty() && !names.contains(data.merging_params.version_column))

View File

@ -515,6 +515,7 @@ public:
bool /*final*/,
bool /*deduplicate*/,
const Names & /* deduplicate_by_columns */,
bool /*cleanup*/,
ContextPtr /*context*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method optimize is not supported by storage {}", getName());

View File

@ -312,6 +312,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
reserved_space,
entry.deduplicate,
entry.deduplicate_by_columns,
entry.cleanup,
storage.merging_params,
NO_TRANSACTION_PTR);

View File

@ -131,6 +131,7 @@ void MergePlainMergeTreeTask::prepare()
merge_mutate_entry->tagger->reserved_space,
deduplicate,
deduplicate_by_columns,
cleanup,
storage.merging_params,
txn);
}

View File

@ -20,6 +20,7 @@ public:
StorageMetadataPtr metadata_snapshot_,
bool deduplicate_,
Names deduplicate_by_columns_,
bool cleanup_,
MergeMutateSelectedEntryPtr merge_mutate_entry_,
TableLockHolder table_lock_holder_,
IExecutableTask::TaskResultCallback & task_result_callback_)
@ -27,6 +28,7 @@ public:
, metadata_snapshot(std::move(metadata_snapshot_))
, deduplicate(deduplicate_)
, deduplicate_by_columns(std::move(deduplicate_by_columns_))
, cleanup(cleanup_)
, merge_mutate_entry(std::move(merge_mutate_entry_))
, table_lock_holder(std::move(table_lock_holder_))
, task_result_callback(task_result_callback_)
@ -67,6 +69,7 @@ private:
StorageMetadataPtr metadata_snapshot;
bool deduplicate;
Names deduplicate_by_columns;
bool cleanup;
MergeMutateSelectedEntryPtr merge_mutate_entry{nullptr};
TableLockHolder table_lock_holder;
FutureMergedMutatedPartPtr future_part{nullptr};

View File

@ -68,7 +68,10 @@ static void extractMergingAndGatheringColumns(
/// Force version column for Replacing mode
if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
{
key_columns.emplace(merging_params.is_deleted_column);
key_columns.emplace(merging_params.version_column);
}
/// Force sign column for VersionedCollapsing mode. Version is already in primary key.
if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
@ -493,6 +496,7 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
size_t sum_input_rows_exact = global_ctx->merge_list_element_ptr->rows_read;
size_t input_rows_filtered = *global_ctx->input_rows_filtered;
size_t cleanedup_rows_count = global_ctx->cleanedup_rows_count;
global_ctx->merge_list_element_ptr->columns_written = global_ctx->merging_column_names.size();
global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed);
@ -506,12 +510,12 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
/// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
/// number of input rows.
if ((rows_sources_count > 0 || global_ctx->future_part->parts.size() > 1)
&& sum_input_rows_exact != rows_sources_count + input_rows_filtered)
&& sum_input_rows_exact != rows_sources_count + input_rows_filtered + cleanedup_rows_count)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Number of rows in source parts ({}) excluding filtered rows ({}) differs from number "
"Number of rows in source parts ({}) excluding filtered rows ({}) and cleaned up rows ({}) differs from number "
"of bytes written to rows_sources file ({}). It is a bug.",
sum_input_rows_exact, input_rows_filtered, rows_sources_count);
sum_input_rows_exact, input_rows_filtered, cleanedup_rows_count, rows_sources_count);
/// TemporaryDataOnDisk::createRawStream returns WriteBufferFromFile implementing IReadableWriteBuffer
/// and we expect to get ReadBufferFromFile here.
@ -755,6 +759,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
global_ctx->space_reservation,
global_ctx->deduplicate,
global_ctx->deduplicate_by_columns,
global_ctx->cleanup,
projection_merging_params,
global_ctx->need_prefix,
global_ctx->new_data_part.get(),
@ -1003,8 +1008,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
case MergeTreeData::MergingParams::Replacing:
merged_transform = std::make_shared<ReplacingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.version_column,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
header, pipes.size(), sort_description, ctx->merging_params.is_deleted_column, ctx->merging_params.version_column,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size,
(data_settings->clean_deleted_rows != CleanDeletedRows::Never) || global_ctx->cleanup, &global_ctx->cleanedup_rows_count);
break;
case MergeTreeData::MergingParams::Graphite:

View File

@ -67,6 +67,7 @@ public:
ReservationSharedPtr space_reservation_,
bool deduplicate_,
Names deduplicate_by_columns_,
bool cleanup_,
MergeTreeData::MergingParams merging_params_,
bool need_prefix,
IMergeTreeDataPart * parent_part_,
@ -90,6 +91,7 @@ public:
global_ctx->space_reservation = std::move(space_reservation_);
global_ctx->deduplicate = std::move(deduplicate_);
global_ctx->deduplicate_by_columns = std::move(deduplicate_by_columns_);
global_ctx->cleanup = std::move(cleanup_);
global_ctx->parent_part = std::move(parent_part_);
global_ctx->data = std::move(data_);
global_ctx->mutator = std::move(mutator_);
@ -158,6 +160,8 @@ private:
ReservationSharedPtr space_reservation{nullptr};
bool deduplicate{false};
Names deduplicate_by_columns{};
bool cleanup{false};
size_t cleanedup_rows_count{0};
NamesAndTypesList gathering_columns{};
NamesAndTypesList merging_columns{};

View File

@ -846,6 +846,10 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
{
const auto columns = metadata.getColumns().getAllPhysical();
if (!is_deleted_column.empty() && mode != MergingParams::Replacing)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"is_deleted column for MergeTree cannot be specified in modes except Replacing.");
if (!sign_column.empty() && mode != MergingParams::Collapsing && mode != MergingParams::VersionedCollapsing)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Sign column for MergeTree cannot be specified "
@ -915,6 +919,41 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "Version column {} does not exist in table declaration.", version_column);
};
/// Check that if the is_deleted column is needed, it exists and is of type UInt8. If exist, version column must be defined too but version checks are not done here.
auto check_is_deleted_column = [this, & columns](bool is_optional, const std::string & storage)
{
if (is_deleted_column.empty())
{
if (is_optional)
return;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: is_deleted ({}) column for storage {} is empty", is_deleted_column, storage);
}
else
{
if (version_column.empty() && !is_optional)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: Version column ({}) for storage {} is empty while is_deleted ({}) is not.",
version_column, storage, is_deleted_column);
bool miss_is_deleted_column = true;
for (const auto & column : columns)
{
if (column.name == is_deleted_column)
{
if (!typeid_cast<const DataTypeUInt8 *>(column.type.get()))
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "is_deleted column ({}) for storage {} must have type UInt8. Provided column of type {}.",
is_deleted_column, storage, column.type->getName());
miss_is_deleted_column = false;
break;
}
}
if (miss_is_deleted_column)
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "is_deleted column {} does not exist in table declaration.", is_deleted_column);
}
};
if (mode == MergingParams::Collapsing)
check_sign_column(false, "CollapsingMergeTree");
@ -951,6 +990,7 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
if (mode == MergingParams::Replacing)
{
check_is_deleted_column(true, "ReplacingMergeTree");
check_version_column(true, "ReplacingMergeTree");
}

View File

@ -349,6 +349,9 @@ public:
/// For Collapsing and VersionedCollapsing mode.
String sign_column;
/// For Replacing mode. Can be empty for Replacing.
String is_deleted_column;
/// For Summing mode. If empty - columns_to_sum is determined automatically.
Names columns_to_sum;

View File

@ -676,6 +676,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
ReservationSharedPtr space_reservation,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeData::MergingParams & merging_params,
const MergeTreeTransactionPtr & txn,
bool need_prefix,
@ -692,6 +693,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
space_reservation,
deduplicate,
deduplicate_by_columns,
cleanup,
merging_params,
need_prefix,
parent_part,

View File

@ -165,6 +165,7 @@ public:
ReservationSharedPtr space_reservation,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeData::MergingParams & merging_params,
const MergeTreeTransactionPtr & txn,
bool need_prefix = true,

View File

@ -325,7 +325,7 @@ Block MergeTreeDataWriter::mergeBlock(
return nullptr;
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedAlgorithm>(
block, 1, sort_description, merging_params.version_column, block_size + 1, /*block_size_bytes=*/0);
block, 1, sort_description, merging_params.is_deleted_column, merging_params.version_column, block_size + 1, /*block_size_bytes=*/0);
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedAlgorithm>(
block, 1, sort_description, merging_params.sign_column,

View File

@ -74,6 +74,7 @@ struct Settings;
M(Bool, min_age_to_force_merge_on_partition_only, false, "Whether min_age_to_force_merge_seconds should be applied only on the entire partition and not on subset.", false) \
M(UInt64, number_of_free_entries_in_pool_to_execute_optimize_entire_partition, 25, "When there is less than specified number of free entries in pool, do not try to execute optimize entire partition with a merge (this merge is created when set min_age_to_force_merge_seconds > 0 and min_age_to_force_merge_on_partition_only = true). This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \
M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \
M(CleanDeletedRows, clean_deleted_rows, CleanDeletedRows::Never, "Is the Replicated Merge cleanup has to be done automatically at each merge or manually (possible values are 'Always'/'Never' (default))", 0) \
M(UInt64, replicated_max_mutations_in_one_entry, 10000, "Max number of mutation commands that can be merged together and executed in one MUTATE_PART entry (0 means unlimited)", 0) \
M(UInt64, number_of_mutations_to_delay, 500, "If table has at least that many unfinished mutations, artificially slow down mutations of table. Disabled if set to 0", 0) \
M(UInt64, number_of_mutations_to_throw, 1000, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \
@ -232,7 +233,6 @@ struct Settings;
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, Seconds, replicated_fetches_http_send_timeout, 0) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, Seconds, replicated_fetches_http_receive_timeout, 0) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, UInt64, replicated_max_parallel_fetches_for_host, DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, String, clean_deleted_rows, "") \
/// Settings that should not change after the creation of a table.
/// NOLINTNEXTLINE

View File

@ -1057,6 +1057,7 @@ public:
ctx->space_reservation,
false, // TODO Do we need deduplicate for projections
{},
false, // no cleanup
projection_merging_params,
NO_TRANSACTION_PTR,
/* need_prefix */ true,

View File

@ -96,6 +96,9 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
}
}
if (cleanup)
out << "\ncleanup: " << cleanup;
break;
case DROP_RANGE:
@ -269,6 +272,8 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in, MergeTreeDataFor
deduplicate_by_columns = std::move(new_deduplicate_by_columns);
}
else if (checkString("cleanup: ", in))
in >> cleanup;
else
trailing_newline_found = true;
}

View File

@ -98,6 +98,7 @@ struct ReplicatedMergeTreeLogEntryData
Strings source_parts;
bool deduplicate = false; /// Do deduplicate on merge
Strings deduplicate_by_columns = {}; // Which columns should be checked for duplicates, empty means 'all' (default).
bool cleanup = false;
MergeType merge_type = MergeType::Regular;
String column_name;
String index_name;

View File

@ -52,6 +52,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
index_granularity = data_settings->index_granularity;
merging_params_mode = static_cast<int>(data.merging_params.mode);
sign_column = data.merging_params.sign_column;
is_deleted_column = data.merging_params.is_deleted_column;
columns_to_sum = fmt::format("{}", fmt::join(data.merging_params.columns_to_sum.begin(), data.merging_params.columns_to_sum.end(), ","));
version_column = data.merging_params.version_column;
if (data.merging_params.mode == MergeTreeData::MergingParams::Graphite)
@ -156,6 +157,8 @@ void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
out << "merge parameters format version: " << merge_params_version << "\n";
if (!version_column.empty())
out << "version column: " << version_column << "\n";
if (!is_deleted_column.empty())
out << "is_deleted column: " << is_deleted_column << "\n";
if (!columns_to_sum.empty())
out << "columns to sum: " << columns_to_sum << "\n";
if (!graphite_params_hash.empty())
@ -221,6 +224,9 @@ void ReplicatedMergeTreeTableMetadata::read(ReadBuffer & in)
if (checkString("version column: ", in))
in >> version_column >> "\n";
if (checkString("is_deleted column: ", in))
in >> is_deleted_column >> "\n";
if (checkString("columns to sum: ", in))
in >> columns_to_sum >> "\n";
@ -273,6 +279,10 @@ void ReplicatedMergeTreeTableMetadata::checkImmutableFieldsEquals(const Replicat
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in version column. "
"Stored in ZooKeeper: {}, local: {}", from_zk.version_column, version_column);
if (is_deleted_column != from_zk.is_deleted_column)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in is_deleted column. "
"Stored in ZooKeeper: {}, local: {}", from_zk.is_deleted_column, is_deleted_column);
if (columns_to_sum != from_zk.columns_to_sum)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in sum columns. "
"Stored in ZooKeeper: {}, local: {}", from_zk.columns_to_sum, columns_to_sum);

View File

@ -29,6 +29,7 @@ struct ReplicatedMergeTreeTableMetadata
int merge_params_version = REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS;
String sign_column;
String version_column;
String is_deleted_column;
String columns_to_sum;
String graphite_params_hash;
String primary_key;

View File

@ -138,7 +138,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
* CollapsingMergeTree(date, [sample_key], primary_key, index_granularity, sign)
* SummingMergeTree(date, [sample_key], primary_key, index_granularity, [columns_to_sum])
* AggregatingMergeTree(date, [sample_key], primary_key, index_granularity)
* ReplacingMergeTree(date, [sample_key], primary_key, index_granularity, [version_column])
* ReplacingMergeTree(date, [sample_key], primary_key, index_granularity, [version_column [, is_deleted_column]])
* GraphiteMergeTree(date, [sample_key], primary_key, index_granularity, 'config_element')
*
* Alternatively, you can specify:
@ -441,6 +441,15 @@ static StoragePtr create(const StorageFactory::Arguments & args)
}
else if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
{
// if there is args and number of optional parameter is higher than 1
// is_deleted is not allowed with the 'allow_deprecated_syntax_for_merge_tree' settings
if (arg_cnt - arg_num == 2 && !engine_args[arg_cnt - 1]->as<ASTLiteral>() && is_extended_storage_def)
{
if (!tryGetIdentifierNameInto(engine_args[arg_cnt - 1], merging_params.is_deleted_column))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "is_deleted column name must be an identifier {}", verbose_help_message);
--arg_cnt;
}
/// If the last element is not index_granularity or replica_name (a literal), then this is the name of the version column.
if (arg_cnt && !engine_args[arg_cnt - 1]->as<ASTLiteral>())
{

View File

@ -321,6 +321,7 @@ bool StorageEmbeddedRocksDB::optimize(
bool final,
bool deduplicate,
const Names & /* deduplicate_by_columns */,
bool cleanup,
ContextPtr /*context*/)
{
if (partition)
@ -332,6 +333,9 @@ bool StorageEmbeddedRocksDB::optimize(
if (deduplicate)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "DEDUPLICATE cannot be specified when optimizing table of type EmbeddedRocksDB");
if (cleanup)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "CLEANUP cannot be specified when optimizing table of type EmbeddedRocksDB");
std::shared_lock lock(rocksdb_ptr_mx);
rocksdb::CompactRangeOptions compact_options;
auto status = rocksdb_ptr->CompactRange(compact_options, nullptr, nullptr);

View File

@ -65,6 +65,7 @@ public:
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr context) override;
bool supportsParallelInsert() const override { return true; }

View File

@ -694,7 +694,7 @@ void StorageBuffer::flushAndPrepareForShutdown()
try
{
optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, getContext());
optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, false /*cleanup*/, getContext());
}
catch (...)
{
@ -720,6 +720,7 @@ bool StorageBuffer::optimize(
bool final,
bool deduplicate,
const Names & /* deduplicate_by_columns */,
bool cleanup,
ContextPtr /*context*/)
{
if (partition)
@ -731,6 +732,9 @@ bool StorageBuffer::optimize(
if (deduplicate)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "DEDUPLICATE cannot be specified when optimizing table of type Buffer");
if (cleanup)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "CLEANUP cannot be specified when optimizing table of type Buffer");
flushAllBuffers(false);
return true;
}
@ -1063,7 +1067,7 @@ void StorageBuffer::alter(const AlterCommands & params, ContextPtr local_context
auto metadata_snapshot = getInMemoryMetadataPtr();
/// Flush buffers to the storage because BufferSource skips buffers with old metadata_version.
optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, {}, local_context);
optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, {}, false /*cleanup*/, local_context);
StorageInMemoryMetadata new_metadata = *metadata_snapshot;
params.apply(new_metadata, local_context);

View File

@ -100,6 +100,7 @@ public:
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr context) override;
bool supportsSampling() const override { return true; }

View File

@ -262,12 +262,13 @@ bool StorageMaterializedView::optimize(
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr local_context)
{
checkStatementCanBeForwarded();
auto storage_ptr = getTargetTable();
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
return getTargetTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, local_context);
return getTargetTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context);
}
void StorageMaterializedView::alter(

View File

@ -54,6 +54,7 @@ public:
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr context) override;
void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & table_lock_holder) override;

View File

@ -1095,6 +1095,7 @@ bool StorageMergeTree::merge(
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason,
bool optimize_skip_merged_partitions)
@ -1134,7 +1135,7 @@ bool StorageMergeTree::merge(
/// Copying a vector of columns `deduplicate by columns.
IExecutableTask::TaskResultCallback f = [](bool) {};
auto task = std::make_shared<MergePlainMergeTreeTask>(
*this, metadata_snapshot, deduplicate, deduplicate_by_columns, merge_mutate_entry, table_lock_holder, f);
*this, metadata_snapshot, deduplicate, deduplicate_by_columns, cleanup, merge_mutate_entry, table_lock_holder, f);
task->setCurrentTransaction(MergeTreeTransactionHolder{}, MergeTreeTransactionPtr{txn});
@ -1372,7 +1373,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
if (merge_entry)
{
auto task = std::make_shared<MergePlainMergeTreeTask>(*this, metadata_snapshot, /* deduplicate */ false, Names{}, merge_entry, shared_lock, common_assignee_trigger);
auto task = std::make_shared<MergePlainMergeTreeTask>(*this, metadata_snapshot, /* deduplicate */ false, Names{}, /* cleanup */ false, merge_entry, shared_lock, common_assignee_trigger);
task->setCurrentTransaction(std::move(transaction_for_merge), std::move(txn));
bool scheduled = assignee.scheduleMergeMutateTask(task);
/// The problem that we already booked a slot for TTL merge, but a merge list entry will be created only in a prepare method
@ -1506,6 +1507,7 @@ bool StorageMergeTree::optimize(
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr local_context)
{
if (deduplicate)
@ -1521,6 +1523,13 @@ bool StorageMergeTree::optimize(
String disable_reason;
if (!partition && final)
{
if (cleanup && this->merging_params.mode != MergingParams::Mode::Replacing)
{
constexpr const char * message = "Cannot OPTIMIZE with CLEANUP table: {}";
disable_reason = "only ReplacingMergeTree can be CLEANUP";
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
}
DataPartsVector data_parts = getVisibleDataPartsVector(local_context);
std::unordered_set<String> partition_ids;
@ -1535,6 +1544,7 @@ bool StorageMergeTree::optimize(
true,
deduplicate,
deduplicate_by_columns,
cleanup,
txn,
disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
@ -1562,6 +1572,7 @@ bool StorageMergeTree::optimize(
final,
deduplicate,
deduplicate_by_columns,
cleanup,
txn,
disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))

View File

@ -83,6 +83,7 @@ public:
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr context) override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
@ -171,13 +172,14 @@ private:
* Returns true if merge is finished successfully.
*/
bool merge(
bool aggressive,
const String & partition_id,
bool final, bool deduplicate,
const Names & deduplicate_by_columns,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason,
bool optimize_skip_merged_partitions = false);
bool aggressive,
const String & partition_id,
bool final, bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason,
bool optimize_skip_merged_partitions = false);
void renameAndCommitEmptyParts(MutableDataPartsVector & new_parts, Transaction & transaction);

View File

@ -121,15 +121,16 @@ public:
}
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
ContextPtr context) override
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr context) override
{
return getNested()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, context);
return getNested()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, context);
}
void mutate(const MutationCommands & commands, ContextPtr context) override { getNested()->mutate(commands, context); }

View File

@ -3643,6 +3643,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
const auto storage_settings_ptr = getSettings();
const bool deduplicate = false; /// TODO: read deduplicate option from table config
const Names deduplicate_by_columns = {};
const bool cleanup = (storage_settings_ptr->clean_deleted_rows != CleanDeletedRows::Never);
CreateMergeEntryResult create_result = CreateMergeEntryResult::Other;
enum class AttemptStatus
@ -3726,10 +3727,12 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
future_merged_part->part_format,
deduplicate,
deduplicate_by_columns,
cleanup,
nullptr,
merge_pred->getVersion(),
future_merged_part->merge_type);
if (create_result == CreateMergeEntryResult::Ok)
return AttemptStatus::EntryCreated;
if (create_result == CreateMergeEntryResult::LogUpdated)
@ -3846,6 +3849,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
const MergeTreeDataPartFormat & merged_part_format,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ReplicatedMergeTreeLogEntryData * out_log_entry,
int32_t log_version,
MergeType merge_type)
@ -3885,6 +3889,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
entry.merge_type = merge_type;
entry.deduplicate = deduplicate;
entry.deduplicate_by_columns = deduplicate_by_columns;
entry.cleanup = cleanup;
entry.create_time = time(nullptr);
for (const auto & part : parts)
@ -5619,6 +5624,7 @@ bool StorageReplicatedMergeTree::optimize(
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr query_context)
{
/// NOTE: exclusive lock cannot be used here, since this may lead to deadlock (see comments below),
@ -5630,6 +5636,9 @@ bool StorageReplicatedMergeTree::optimize(
if (!is_leader)
throw Exception(ErrorCodes::NOT_A_LEADER, "OPTIMIZE cannot be done on this replica because it is not a leader");
if (cleanup)
LOG_DEBUG(log, "Cleanup the ReplicatedMergeTree.");
auto handle_noop = [&]<typename... Args>(FormatStringHelper<Args...> fmt_string, Args && ...args)
{
PreformattedMessage message = fmt_string.format(std::forward<Args>(args)...);
@ -5708,6 +5717,7 @@ bool StorageReplicatedMergeTree::optimize(
future_merged_part->uuid,
future_merged_part->part_format,
deduplicate, deduplicate_by_columns,
cleanup,
&merge_entry, can_merge.getVersion(),
future_merged_part->merge_type);
@ -5732,6 +5742,13 @@ bool StorageReplicatedMergeTree::optimize(
bool assigned = false;
if (!partition && final)
{
if (cleanup && this->merging_params.mode != MergingParams::Mode::Replacing)
{
constexpr const char * message = "Cannot OPTIMIZE with CLEANUP table: {}";
String disable_reason = "only ReplacingMergeTree can be CLEANUP";
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
}
DataPartsVector data_parts = getVisibleDataPartsVector(query_context);
std::unordered_set<String> partition_ids;

View File

@ -178,6 +178,7 @@ public:
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr query_context) override;
void alter(const AlterCommands & commands, ContextPtr query_context, AlterLockHolder & table_lock_holder) override;
@ -747,6 +748,7 @@ private:
const MergeTreeDataPartFormat & merged_part_format,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ReplicatedMergeTreeLogEntryData * out_log_entry,
int32_t log_version,
MergeType merge_type);

View File

@ -435,11 +435,12 @@ bool StorageWindowView::optimize(
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr local_context)
{
auto storage_ptr = getInnerTable();
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
return getInnerTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, local_context);
return getInnerTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context);
}
void StorageWindowView::alter(

View File

@ -134,6 +134,7 @@ public:
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr context) override;
void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & table_lock_holder) override;

View File

@ -3,7 +3,5 @@
2018-01-01 2 2
2018-01-01 2 2
== (Replicas) Test optimize ==
d1 2 1
d2 1 0
d3 2 1
d4 1 0

View File

@ -7,14 +7,14 @@ create table tab_00577 (date Date, version UInt64, val UInt64) engine = Replacin
insert into tab_00577 values ('2018-01-01', 2, 2), ('2018-01-01', 1, 1);
insert into tab_00577 values ('2018-01-01', 0, 0);
select * from tab_00577 order by version;
OPTIMIZE TABLE tab_00577 FINAL;
OPTIMIZE TABLE tab_00577 FINAL CLEANUP;
select * from tab_00577;
drop table tab_00577;
DROP TABLE IF EXISTS testCleanupR1;
CREATE TABLE testCleanupR1 (uid String, version UInt32, is_deleted UInt8)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/test_cleanup/', 'r1', version)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/test_cleanup/', 'r1', version, is_deleted)
ORDER BY uid SETTINGS enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 0, vertical_merge_algorithm_min_columns_to_activate = 0, min_rows_for_wide_part = 0,
min_bytes_for_wide_part = 0;
INSERT INTO testCleanupR1 (*) VALUES ('d1', 1, 0),('d2', 1, 0),('d3', 1, 0),('d4', 1, 0);
@ -22,9 +22,9 @@ INSERT INTO testCleanupR1 (*) VALUES ('d3', 2, 1);
INSERT INTO testCleanupR1 (*) VALUES ('d1', 2, 1);
SYSTEM SYNC REPLICA testCleanupR1; -- Avoid "Cannot select parts for optimization: Entry for part all_2_2_0 hasn't been read from the replication log yet"
OPTIMIZE TABLE testCleanupR1 FINAL;
OPTIMIZE TABLE testCleanupR1 FINAL CLEANUP;
-- Only d3 to d5 remain
SELECT '== (Replicas) Test optimize ==';
SELECT * FROM testCleanupR1 order by uid;
DROP TABLE IF EXISTS testCleanupR1
DROP TABLE IF EXISTS testCleanupR1

View File

@ -0,0 +1,99 @@
== Test SELECT ... FINAL - no is_deleted ==
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
== Test SELECT ... FINAL - no is_deleted SETTINGS clean_deleted_rows=Always ==
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
== Test SELECT ... FINAL ==
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
== Insert backups ==
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
== Insert a second batch with overlaping data ==
d1 5 0
d2 3 0
d3 3 0
d4 3 0
d5 1 0
== Only last version remains after OPTIMIZE W/ CLEANUP ==
d1 5 0
d2 1 0
d3 1 0
d4 1 0
d5 1 0
d6 3 0
== OPTIMIZE W/ CLEANUP (remove d6) ==
d1 5 0
d2 1 0
d3 1 0
d4 1 0
d5 1 0
== Test of the SETTINGS clean_deleted_rows as Always ==
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
== Test of the SETTINGS clean_deleted_rows as Never ==
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
== (Replicas) Test optimize ==
d2 1 0
d4 1 0
== (Replicas) Test settings ==
c2 1 0
c4 1 0
== Check cleanup & settings for other merge trees ==
d1 1 1
d1 1 1
d1 1 1
d1 1 1 1
d1 1 1 1

View File

@ -0,0 +1,160 @@
-- Tags: zookeeper
-- Settings allow_deprecated_syntax_for_merge_tree prevent to enable the is_deleted column
set allow_deprecated_syntax_for_merge_tree=0;
-- Test the bahaviour without the is_deleted column
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version) Order by (uid);
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
SELECT '== Test SELECT ... FINAL - no is_deleted ==';
select * from test FINAL order by uid;
OPTIMIZE TABLE test FINAL CLEANUP;
select * from test order by uid;
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version) Order by (uid) SETTINGS clean_deleted_rows='Always';
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
SELECT '== Test SELECT ... FINAL - no is_deleted SETTINGS clean_deleted_rows=Always ==';
select * from test FINAL order by uid;
OPTIMIZE TABLE test FINAL CLEANUP;
select * from test order by uid;
-- Test the new behaviour
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid);
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
SELECT '== Test SELECT ... FINAL ==';
select * from test FINAL order by uid;
select * from test order by uid;
SELECT '== Insert backups ==';
INSERT INTO test (*) VALUES ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1);
select * from test FINAL order by uid;
SELECT '== Insert a second batch with overlaping data ==';
INSERT INTO test (*) VALUES ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 1), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0), ('d2', 2, 1), ('d2', 3, 0), ('d3', 2, 1), ('d3', 3, 0);
select * from test FINAL order by uid;
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid);
-- Expect d6 to be version=3 is_deleted=false
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 0);
-- Insert previous version of 'd6' but only v=3 is_deleted=false will remain
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 2, 1);
SELECT '== Only last version remains after OPTIMIZE W/ CLEANUP ==';
OPTIMIZE TABLE test FINAL CLEANUP;
select * from test order by uid;
-- insert d6 v=3 is_deleted=true (timestamp more recent so this version should be the one take into acount)
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 1);
SELECT '== OPTIMIZE W/ CLEANUP (remove d6) ==';
OPTIMIZE TABLE test FINAL CLEANUP;
-- No d6 anymore
select * from test order by uid;
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) SETTINGS clean_deleted_rows='Always';
SELECT '== Test of the SETTINGS clean_deleted_rows as Always ==';
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
-- Even if the setting is set to Always, the SELECT FINAL doesn't delete rows
select * from test FINAL order by uid;
select * from test order by uid;
OPTIMIZE TABLE test FINAL;
-- d6 has to be removed since we set clean_deleted_rows as 'Always'
select * from test order by uid;
SELECT '== Test of the SETTINGS clean_deleted_rows as Never ==';
ALTER TABLE test MODIFY SETTING clean_deleted_rows='Never';
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
OPTIMIZE TABLE test FINAL;
-- d6 has NOT to be removed since we set clean_deleted_rows as 'Never'
select * from test order by uid;
DROP TABLE IF EXISTS testCleanupR1;
CREATE TABLE testCleanupR1 (uid String, version UInt32, is_deleted UInt8)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/test_cleanup/', 'r1', version, is_deleted)
ORDER BY uid;
INSERT INTO testCleanupR1 (*) VALUES ('d1', 1, 0),('d2', 1, 0),('d3', 1, 0),('d4', 1, 0);
INSERT INTO testCleanupR1 (*) VALUES ('d3', 2, 1);
INSERT INTO testCleanupR1 (*) VALUES ('d1', 2, 1);
SYSTEM SYNC REPLICA testCleanupR1; -- Avoid "Cannot select parts for optimization: Entry for part all_2_2_0 hasn't been read from the replication log yet"
OPTIMIZE TABLE testCleanupR1 FINAL CLEANUP;
-- Only d3 to d5 remain
SELECT '== (Replicas) Test optimize ==';
SELECT * FROM testCleanupR1 order by uid;
------------------------------
DROP TABLE IF EXISTS testSettingsR1;
CREATE TABLE testSettingsR1 (col1 String, version UInt32, is_deleted UInt8)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/test_setting/', 'r1', version, is_deleted)
ORDER BY col1
SETTINGS clean_deleted_rows = 'Always';
INSERT INTO testSettingsR1 (*) VALUES ('c1', 1, 1),('c2', 1, 0),('c3', 1, 1),('c4', 1, 0);
SYSTEM SYNC REPLICA testSettingsR1; -- Avoid "Cannot select parts for optimization: Entry for part all_2_2_0 hasn't been read from the replication log yet"
OPTIMIZE TABLE testSettingsR1 FINAL;
-- Only d3 to d5 remain
SELECT '== (Replicas) Test settings ==';
SELECT * FROM testSettingsR1 order by col1;
------------------------------
-- Check errors
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid);
-- is_deleted == 0/1
INSERT INTO test (*) VALUES ('d1', 1, 2); -- { serverError INCORRECT_DATA }
DROP TABLE IF EXISTS test;
-- checkis_deleted type
CREATE TABLE test (uid String, version UInt32, is_deleted String) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid); -- { serverError BAD_TYPE_OF_FIELD }
-- is_deleted column for other mergeTrees - ErrorCodes::LOGICAL_ERROR)
-- Check clean_deleted_rows='Always' for other MergeTrees
SELECT '== Check cleanup & settings for other merge trees ==';
CREATE TABLE testMT (uid String, version UInt32, is_deleted UInt8) ENGINE = MergeTree() Order by (uid) SETTINGS clean_deleted_rows='Always';
INSERT INTO testMT (*) VALUES ('d1', 1, 1);
OPTIMIZE TABLE testMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testMT FINAL;
SELECT * FROM testMT order by uid;
CREATE TABLE testSummingMT (uid String, version UInt32, is_deleted UInt8) ENGINE = SummingMergeTree() Order by (uid) SETTINGS clean_deleted_rows='Always';
INSERT INTO testSummingMT (*) VALUES ('d1', 1, 1);
OPTIMIZE TABLE testSummingMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testSummingMT FINAL;
SELECT * FROM testSummingMT order by uid;
CREATE TABLE testAggregatingMT (uid String, version UInt32, is_deleted UInt8) ENGINE = AggregatingMergeTree() Order by (uid) SETTINGS clean_deleted_rows='Always';
INSERT INTO testAggregatingMT (*) VALUES ('d1', 1, 1);
OPTIMIZE TABLE testAggregatingMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testAggregatingMT FINAL;
SELECT * FROM testAggregatingMT order by uid;
CREATE TABLE testCollapsingMT (uid String, version UInt32, is_deleted UInt8, sign Int8) ENGINE = CollapsingMergeTree(sign) Order by (uid) SETTINGS clean_deleted_rows='Always';
INSERT INTO testCollapsingMT (*) VALUES ('d1', 1, 1, 1);
OPTIMIZE TABLE testCollapsingMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testCollapsingMT FINAL;
SELECT * FROM testCollapsingMT order by uid;
CREATE TABLE testVersionedCMT (uid String, version UInt32, is_deleted UInt8, sign Int8) ENGINE = VersionedCollapsingMergeTree(sign, version) Order by (uid) SETTINGS clean_deleted_rows='Always';
INSERT INTO testVersionedCMT (*) VALUES ('d1', 1, 1, 1);
OPTIMIZE TABLE testVersionedCMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testVersionedCMT FINAL;
SELECT * FROM testVersionedCMT order by uid;

View File

@ -0,0 +1,31 @@
--- Based on https://github.com/ClickHouse/ClickHouse/issues/49685
--- Verify that ReplacingMergeTree properly handles _is_deleted:
--- SELECT FINAL should take `_is_deleted` into consideration when there is only one partition.
-- { echoOn }
DROP TABLE IF EXISTS t;
CREATE TABLE t
(
`account_id` UInt64,
`_is_deleted` UInt8,
`_version` UInt64
)
ENGINE = ReplacingMergeTree(_version, _is_deleted)
ORDER BY (account_id);
INSERT INTO t SELECT number, 0, 1 FROM numbers(1e3);
-- Mark the first 100 rows as deleted.
INSERT INTO t SELECT number, 1, 1 FROM numbers(1e2);
-- Put everything in one partition
OPTIMIZE TABLE t FINAL;
SELECT count() FROM t;
1000
SELECT count() FROM t FINAL;
900
-- Both should produce the same number of rows.
-- Previously, `do_not_merge_across_partitions_select_final = 1` showed more rows,
-- as if no rows were deleted.
SELECT count() FROM t FINAL SETTINGS do_not_merge_across_partitions_select_final = 1;
900
SELECT count() FROM t FINAL SETTINGS do_not_merge_across_partitions_select_final = 0;
900
DROP TABLE t;

View File

@ -0,0 +1,32 @@
--- Based on https://github.com/ClickHouse/ClickHouse/issues/49685
--- Verify that ReplacingMergeTree properly handles _is_deleted:
--- SELECT FINAL should take `_is_deleted` into consideration when there is only one partition.
-- { echoOn }
DROP TABLE IF EXISTS t;
CREATE TABLE t
(
`account_id` UInt64,
`_is_deleted` UInt8,
`_version` UInt64
)
ENGINE = ReplacingMergeTree(_version, _is_deleted)
ORDER BY (account_id);
INSERT INTO t SELECT number, 0, 1 FROM numbers(1e3);
-- Mark the first 100 rows as deleted.
INSERT INTO t SELECT number, 1, 1 FROM numbers(1e2);
-- Put everything in one partition
OPTIMIZE TABLE t FINAL;
SELECT count() FROM t;
SELECT count() FROM t FINAL;
-- Both should produce the same number of rows.
-- Previously, `do_not_merge_across_partitions_select_final = 1` showed more rows,
-- as if no rows were deleted.
SELECT count() FROM t FINAL SETTINGS do_not_merge_across_partitions_select_final = 1;
SELECT count() FROM t FINAL SETTINGS do_not_merge_across_partitions_select_final = 0;
DROP TABLE t;

View File

@ -0,0 +1,13 @@
== Only last version remains after OPTIMIZE W/ CLEANUP ==
d1 5 0
d2 1 0
d3 1 0
d4 1 0
d5 1 0
d6 3 0
== OPTIMIZE W/ CLEANUP (remove d6) ==
d1 5 0
d2 1 0
d3 1 0
d4 1 0
d5 1 0

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) SETTINGS vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 0,
min_rows_for_wide_part = 1,
min_bytes_for_wide_part = 1;
-- Expect d6 to be version=3 is_deleted=false
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 0);
-- Insert previous version of 'd6' but only v=3 is_deleted=false will remain
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 2, 1);
SELECT '== Only last version remains after OPTIMIZE W/ CLEANUP ==';
OPTIMIZE TABLE test FINAL CLEANUP;
select * from test order by uid;
-- insert d6 v=3 is_deleted=true (timestamp more recent so this version should be the one take into acount)
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 1);
SELECT '== OPTIMIZE W/ CLEANUP (remove d6) ==';
OPTIMIZE TABLE test FINAL CLEANUP;
-- No d6 anymore
select * from test order by uid;
DROP TABLE IF EXISTS test;

View File

@ -17,6 +17,26 @@ CREATE TABLE t_r
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t/', 'r2')
ORDER BY id; -- { serverError METADATA_MISMATCH }
CREATE TABLE t2
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64,
`deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r1', legacy_ver)
ORDER BY id;
CREATE TABLE t2_r
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64,
`deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r2', legacy_ver, deleted)
ORDER BY id; -- { serverError METADATA_MISMATCH }
CREATE TABLE t3
(
`key` UInt64,