Merge pull request #58316 from ClickHouse/reintroduce_is_deleted

Re-introduce `is_deleted` column for ReplacingMergeTree
This commit is contained in:
Alexey Milovidov 2023-12-31 00:57:19 +01:00 committed by GitHub
commit 7bded0a5e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
59 changed files with 795 additions and 80 deletions

View File

@ -25,7 +25,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
[SETTINGS name=value, clean_deleted_rows=value, ...]
```
For a description of request parameters, see [statement description](../../../sql-reference/statements/create/table.md).
@ -88,6 +88,53 @@ SELECT * FROM mySecondReplacingMT FINAL;
└─────┴─────────┴─────────────────────┘
```
### is_deleted
`is_deleted` — Name of a column used during a merge to determine whether the data in this row represents the state or is to be deleted; `1` is a “deleted“ row, `0` is a “state“ row.
Column data type — `UInt8`.
:::note
`is_deleted` can only be enabled when `ver` is used.
The row is deleted when `OPTIMIZE ... FINAL CLEANUP` or `OPTIMIZE ... FINAL` is used, or if the engine setting `clean_deleted_rows` has been set to `Always`.
No matter the operation on the data, the version must be increased. If two inserted rows have the same version number, the last inserted row is the one kept.
:::
Example:
```sql
-- with ver and is_deleted
CREATE OR REPLACE TABLE myThirdReplacingMT
(
`key` Int64,
`someCol` String,
`eventTime` DateTime,
`is_deleted` UInt8
)
ENGINE = ReplacingMergeTree(eventTime, is_deleted)
ORDER BY key;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 0);
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 1);
select * from myThirdReplacingMT final;
0 rows in set. Elapsed: 0.003 sec.
-- delete rows with is_deleted
OPTIMIZE TABLE myThirdReplacingMT FINAL CLEANUP;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 00:00:00', 0);
select * from myThirdReplacingMT final;
┌─key─┬─someCol─┬───────────eventTime─┬─is_deleted─┐
│ 1 │ first │ 2020-01-01 00:00:00 │ 0 │
└─────┴─────────┴─────────────────────┴────────────┘
```
## Query clauses
When creating a `ReplacingMergeTree` table the same [clauses](../../../engines/table-engines/mergetree-family/mergetree.md) are required, as when creating a `MergeTree` table.

View File

@ -852,6 +852,16 @@ If the file name for column is too long (more than `max_file_name_length` bytes)
The maximal length of the file name to keep it as is without hashing. Takes effect only if setting `replace_long_file_name_to_hash` is enabled. The value of this setting does not include the length of file extension. So, it is recommended to set it below the maximum filename length (usually 255 bytes) with some gap to avoid filesystem errors. Default value: 127.
## clean_deleted_rows
Enable/disable automatic deletion of rows flagged as `is_deleted` when perform `OPTIMIZE ... FINAL` on a table using the ReplacingMergeTree engine. When disabled, the `CLEANUP` keyword has to be added to the `OPTIMIZE ... FINAL` to have the same behaviour.
Possible values:
- `Always` or `Never`.
Default value: `Never`
## allow_experimental_block_number_column
Persists virtual column `_block_number` on merges.

View File

@ -86,6 +86,59 @@ SELECT * FROM mySecondReplacingMT FINAL;
│ 1 │ first │ 2020-01-01 01:01:01 │
└─────┴─────────┴─────────────────────┘
```
### is_deleted
`is_deleted` — Имя столбца, который используется во время слияния для обозначения того, нужно ли отображать строку или она подлежит удалению; `1` - для удаления строки, `0` - для отображения строки.
Тип данных столбца — `UInt8`.
:::note
`is_deleted` может быть использован, если `ver` используется.
Строка удаляется в следующих случаях:
- при использовании инструкции `OPTIMIZE ... FINAL CLEANUP`
- при использовании инструкции `OPTIMIZE ... FINAL`
- параметр движка `clean_deleted_rows` установлен в значение `Always` (по умолчанию - `Never`)
- есть новые версии строки
Не рекомендуется выполнять `FINAL CLEANUP` или использовать параметр движка `clean_deleted_rows` со значением `Always`, это может привести к неожиданным результатам, например удаленные строки могут вновь появиться.
Вне зависимости от производимых изменений над данными, версия должна увеличиваться. Если у двух строк одна и та же версия, то остается только последняя вставленная строка.
:::
Пример:
```sql
-- with ver and is_deleted
CREATE OR REPLACE TABLE myThirdReplacingMT
(
`key` Int64,
`someCol` String,
`eventTime` DateTime,
`is_deleted` UInt8
)
ENGINE = ReplacingMergeTree(eventTime, is_deleted)
ORDER BY key;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 0);
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 1);
select * from myThirdReplacingMT final;
0 rows in set. Elapsed: 0.003 sec.
-- delete rows with is_deleted
OPTIMIZE TABLE myThirdReplacingMT FINAL CLEANUP;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 00:00:00', 0);
select * from myThirdReplacingMT final;
┌─key─┬─someCol─┬───────────eventTime─┬─is_deleted─┐
│ 1 │ first │ 2020-01-01 00:00:00 │ 0 │
└─────┴─────────┴─────────────────────┴────────────┘
```
## Секции запроса

View File

@ -1 +0,0 @@
../../../tests/config/config.d/graphite_alternative.xml

View File

@ -98,6 +98,8 @@ IMPLEMENT_SETTING_AUTO_ENUM(DefaultDatabaseEngine, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_AUTO_ENUM(DefaultTableEngine, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_AUTO_ENUM(CleanDeletedRows, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_MULTI_ENUM(MySQLDataTypesSupport, ErrorCodes::UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL,
{{"decimal", MySQLDataTypesSupport::DECIMAL},
{"datetime64", MySQLDataTypesSupport::DATETIME64},

View File

@ -140,6 +140,14 @@ enum class DefaultTableEngine
DECLARE_SETTING_ENUM(DefaultTableEngine)
enum class CleanDeletedRows
{
Never = 0, /// Disable.
Always,
};
DECLARE_SETTING_ENUM(CleanDeletedRows)
enum class MySQLDataTypesSupport
{
DECIMAL, // convert MySQL's decimal and number to ClickHouse Decimal when applicable

View File

@ -15,6 +15,7 @@
#include <Common/thread_local_rng.h>
#include <Common/FieldVisitorToString.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/callOnce.h>
#include <Common/SharedLockGuard.h>
#include <Coordination/KeeperDispatcher.h>
@ -32,6 +33,7 @@
#include <Storages/StorageS3Settings.h>
#include <Disks/DiskLocal.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/StoragePolicy.h>
#include <Disks/IO/IOUringReader.h>
#include <IO/SynchronousReader.h>
@ -43,6 +45,7 @@
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/SessionTracker.h>
#include <Core/ServerSettings.h>
#include <Interpreters/PreparedSets.h>
#include <Core/Settings.h>
#include <Core/SettingsQuirks.h>
#include <Access/AccessControl.h>

View File

@ -79,7 +79,7 @@ BlockIO InterpreterOptimizeQuery::execute()
if (auto * snapshot_data = dynamic_cast<MergeTreeData::SnapshotData *>(storage_snapshot->data.get()))
snapshot_data->parts = {};
table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, getContext());
table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, ast.cleanup, getContext());
return {};
}

View File

@ -24,6 +24,9 @@ void ASTOptimizeQuery::formatQueryImpl(const FormatSettings & settings, FormatSt
if (deduplicate)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " DEDUPLICATE" << (settings.hilite ? hilite_none : "");
if (cleanup)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " CLEANUP" << (settings.hilite ? hilite_none : "");
if (deduplicate_by_columns)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " BY " << (settings.hilite ? hilite_none : "");

View File

@ -21,10 +21,12 @@ public:
bool deduplicate = false;
/// Deduplicate by columns.
ASTPtr deduplicate_by_columns;
/// Delete 'is_deleted' data
bool cleanup = false;
/** Get the text that identifies this element. */
String getID(char delim) const override
{
return "OptimizeQuery" + (delim + getDatabase()) + delim + getTable() + (final ? "_final" : "") + (deduplicate ? "_deduplicate" : "");
return "OptimizeQuery" + (delim + getDatabase()) + delim + getTable() + (final ? "_final" : "") + (deduplicate ? "_deduplicate" : "")+ (cleanup ? "_cleanup" : "");
}
ASTPtr clone() const override

View File

@ -39,6 +39,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
ASTPtr partition;
bool final = false;
bool deduplicate = false;
bool cleanup = false;
String cluster_str;
if (!s_optimize_table.ignore(pos, expected))
@ -69,6 +70,9 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
if (s_deduplicate.ignore(pos, expected))
deduplicate = true;
if (s_cleanup.ignore(pos, expected))
cleanup = true;
ASTPtr deduplicate_by_columns;
if (deduplicate && s_by.ignore(pos, expected))
{
@ -77,9 +81,6 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
return false;
}
/// Obsolete feature, ignored for backward compatibility.
s_cleanup.ignore(pos, expected);
auto query = std::make_shared<ASTOptimizeQuery>();
node = query;
@ -89,6 +90,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
query->final = final;
query->deduplicate = deduplicate;
query->deduplicate_by_columns = deduplicate_by_columns;
query->cleanup = cleanup;
query->database = database;
query->table = table;

View File

@ -3,22 +3,30 @@
#include <Columns/ColumnsNumber.h>
#include <IO/WriteBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
const Block & header_,
size_t num_inputs,
SortDescription description_,
const String & is_deleted_column,
const String & version_column,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
bool use_average_block_sizes,
bool cleanup_)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows, max_block_size_bytes)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows, max_block_size_bytes), cleanup(cleanup_)
{
if (!is_deleted_column.empty())
is_deleted_column_number = header_.getPositionByName(is_deleted_column);
if (!version_column.empty())
version_column_number = header_.getPositionByName(version_column);
}
@ -65,7 +73,15 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/// Write the data for the previous primary key.
if (!selected_row.empty())
insertRow();
{
if (is_deleted_column_number!=-1)
{
if (!(cleanup && assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num]))
insertRow();
}
else
insertRow();
}
selected_row.clear();
}
@ -75,6 +91,13 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
if (out_row_sources_buf)
current_row_sources.emplace_back(current.impl->order, true);
if ((is_deleted_column_number!=-1))
{
const UInt8 is_deleted = assert_cast<const ColumnUInt8 &>(*current->all_columns[is_deleted_column_number]).getData()[current->getRow()];
if ((is_deleted != 1) && (is_deleted != 0))
throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect data: is_deleted = {} (must be 1 or 0).", toString(is_deleted));
}
/// A non-strict comparison, since we select the last row for the same version values.
if (version_column_number == -1
|| selected_row.empty()
@ -105,7 +128,15 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/// We will write the data for the last primary key.
if (!selected_row.empty())
insertRow();
{
if (is_deleted_column_number!=-1)
{
if (!(cleanup && assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num]))
insertRow();
}
else
insertRow();
}
return Status(merged_data.pull(), true);
}

View File

@ -21,11 +21,13 @@ public:
ReplacingSortedAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_,
const String & is_deleted_column,
const String & version_column,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
bool use_average_block_sizes = false,
bool cleanup = false);
const char * getName() const override { return "ReplacingSortedAlgorithm"; }
Status merge() override;
@ -33,7 +35,9 @@ public:
private:
MergedData merged_data;
ssize_t is_deleted_column_number = -1;
ssize_t version_column_number = -1;
bool cleanup = false;
using RowRef = detail::RowRefWithOwnedChunk;
static constexpr size_t max_row_refs = 2; /// last, current.

View File

@ -14,21 +14,24 @@ public:
ReplacingSortedTransform(
const Block & header, size_t num_inputs,
SortDescription description_,
const String & version_column,
const String & is_deleted_column, const String & version_column,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
bool use_average_block_sizes = false,
bool cleanup = false)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),
is_deleted_column,
version_column,
max_block_size_rows,
max_block_size_bytes,
out_row_sources_buf_,
use_average_block_sizes)
use_average_block_sizes,
cleanup)
{
}

View File

@ -228,7 +228,7 @@ struct SplitPartsRangesResult
RangesInDataParts intersecting_parts_ranges;
};
SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts)
SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts, bool force_process_all_ranges)
{
/** Split ranges in data parts into intersecting ranges in data parts and non intersecting ranges in data parts.
*
@ -349,7 +349,7 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts)
if (previous_part_range.event == PartsRangesIterator::EventType::RangeStart)
{
/// If part level is 0, we must process whole previous part because it can contain duplicate primary keys
if (ranges_in_data_parts[previous_part_range.part_index].data_part->info.level == 0)
if (force_process_all_ranges || ranges_in_data_parts[previous_part_range.part_index].data_part->info.level == 0)
continue;
/// Case 1 Range Start after Range Start
@ -384,7 +384,7 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts)
MarkRange other_interval_range = other_interval_it->second;
/// If part level is 0, we must process whole other intersecting part because it can contain duplicate primary keys
if (ranges_in_data_parts[other_interval_part_index].data_part->info.level == 0)
if (force_process_all_ranges || ranges_in_data_parts[other_interval_part_index].data_part->info.level == 0)
continue;
/// Case 2 Range Start after Range End
@ -419,7 +419,7 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts)
*
* If part level is 0, we must process whole part because it can contain duplicate primary keys.
*/
if (intersecting_parts != 1 || ranges_in_data_parts[current_part_range.part_index].data_part->info.level == 0)
if (intersecting_parts != 1 || force_process_all_ranges || ranges_in_data_parts[current_part_range.part_index].data_part->info.level == 0)
{
add_intersecting_range(current_part_range.part_index, part_index_start_to_range[current_part_range.part_index]);
part_index_start_to_range.erase(current_part_range.part_index);
@ -711,14 +711,15 @@ SplitPartsWithRangesByPrimaryKeyResult splitPartsWithRangesByPrimaryKey(
RangesInDataParts parts,
size_t max_layers,
ContextPtr context,
ReadingInOrderStepGetter && in_order_reading_step_getter)
ReadingInOrderStepGetter && in_order_reading_step_getter,
bool force_process_all_ranges)
{
if (max_layers <= 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "max_layer should be greater than 1");
SplitPartsWithRangesByPrimaryKeyResult result;
SplitPartsRangesResult split_result = splitPartsRanges(std::move(parts));
SplitPartsRangesResult split_result = splitPartsRanges(std::move(parts), force_process_all_ranges);
result.non_intersecting_parts_ranges = std::move(split_result.non_intersecting_parts_ranges);
auto && [layers, borders] = splitIntersectingPartsRangesIntoLayers(std::move(split_result.intersecting_parts_ranges), max_layers);

View File

@ -33,5 +33,6 @@ SplitPartsWithRangesByPrimaryKeyResult splitPartsWithRangesByPrimaryKey(
RangesInDataParts parts,
size_t max_layers,
ContextPtr context,
ReadingInOrderStepGetter && in_order_reading_step_getter);
ReadingInOrderStepGetter && in_order_reading_step_getter,
bool force_process_all_ranges);
}

View File

@ -1027,7 +1027,7 @@ static void addMergingFinal(
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(header, num_outputs,
sort_description, merging_params.version_column, max_block_size_rows, /*max_block_size_bytes=*/0, /*out_row_sources_buf_*/ nullptr, /*use_average_block_sizes*/ false);
sort_description, merging_params.is_deleted_column, merging_params.version_column, max_block_size_rows, /*max_block_size_bytes=*/0, /*out_row_sources_buf_*/ nullptr, /*use_average_block_sizes*/ false, /*cleanup*/ !merging_params.is_deleted_column.empty());
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(header, num_outputs,
@ -1130,7 +1130,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
/// can use parallel select on such parts.
bool no_merging_final = do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0;
parts_to_merge_ranges[range_index]->data_part->info.level > 0 &&
data.merging_params.is_deleted_column.empty();
if (no_merging_final)
{
@ -1162,13 +1163,18 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
info.use_uncompressed_cache);
};
/// Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column,
/// so we have to process all ranges. It would be more optimal to remove this flag and add an extra filtering step.
bool force_process_all_ranges = !data.merging_params.is_deleted_column.empty();
SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
metadata_for_reading->getPrimaryKey(),
sorting_expr,
std::move(new_parts),
num_streams,
context,
std::move(in_order_reading_step_getter));
std::move(in_order_reading_step_getter),
force_process_all_ranges);
for (auto && non_intersecting_parts_range : split_ranges_result.non_intersecting_parts_ranges)
non_intersecting_parts_by_primary_key.push_back(std::move(non_intersecting_parts_range));
@ -1839,6 +1845,8 @@ Pipe ReadFromMergeTree::spreadMarkRanges(
}
}
if (!data.merging_params.is_deleted_column.empty() && !names.contains(data.merging_params.is_deleted_column))
column_names_to_read.push_back(data.merging_params.is_deleted_column);
if (!data.merging_params.sign_column.empty() && !names.contains(data.merging_params.sign_column))
column_names_to_read.push_back(data.merging_params.sign_column);
if (!data.merging_params.version_column.empty() && !names.contains(data.merging_params.version_column))

View File

@ -515,6 +515,7 @@ public:
bool /*final*/,
bool /*deduplicate*/,
const Names & /* deduplicate_by_columns */,
bool /*cleanup*/,
ContextPtr /*context*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method optimize is not supported by storage {}", getName());

View File

@ -312,6 +312,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
reserved_space,
entry.deduplicate,
entry.deduplicate_by_columns,
entry.cleanup,
storage.merging_params,
NO_TRANSACTION_PTR);

View File

@ -131,6 +131,7 @@ void MergePlainMergeTreeTask::prepare()
merge_mutate_entry->tagger->reserved_space,
deduplicate,
deduplicate_by_columns,
cleanup,
storage.merging_params,
txn);
}

View File

@ -20,6 +20,7 @@ public:
StorageMetadataPtr metadata_snapshot_,
bool deduplicate_,
Names deduplicate_by_columns_,
bool cleanup_,
MergeMutateSelectedEntryPtr merge_mutate_entry_,
TableLockHolder table_lock_holder_,
IExecutableTask::TaskResultCallback & task_result_callback_)
@ -27,6 +28,7 @@ public:
, metadata_snapshot(std::move(metadata_snapshot_))
, deduplicate(deduplicate_)
, deduplicate_by_columns(std::move(deduplicate_by_columns_))
, cleanup(cleanup_)
, merge_mutate_entry(std::move(merge_mutate_entry_))
, table_lock_holder(std::move(table_lock_holder_))
, task_result_callback(task_result_callback_)
@ -67,6 +69,7 @@ private:
StorageMetadataPtr metadata_snapshot;
bool deduplicate;
Names deduplicate_by_columns;
bool cleanup;
MergeMutateSelectedEntryPtr merge_mutate_entry{nullptr};
TableLockHolder table_lock_holder;
FutureMergedMutatedPartPtr future_part{nullptr};

View File

@ -42,6 +42,7 @@ namespace ErrorCodes
extern const int ABORTED;
extern const int DIRECTORY_ALREADY_EXISTS;
extern const int LOGICAL_ERROR;
extern const int SUPPORT_IS_DISABLED;
}
@ -69,7 +70,10 @@ static void extractMergingAndGatheringColumns(
/// Force version column for Replacing mode
if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
{
key_columns.emplace(merging_params.is_deleted_column);
key_columns.emplace(merging_params.version_column);
}
/// Force sign column for VersionedCollapsing mode. Version is already in primary key.
if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
@ -506,13 +510,12 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
/// In special case, when there is only one source part, and no rows were skipped, we may have
/// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
/// number of input rows.
if ((rows_sources_count > 0 || global_ctx->future_part->parts.size() > 1)
&& sum_input_rows_exact != rows_sources_count + input_rows_filtered)
if ((rows_sources_count > 0 || global_ctx->future_part->parts.size() > 1) && sum_input_rows_exact != rows_sources_count + input_rows_filtered)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Number of rows in source parts ({}) excluding filtered rows ({}) differs from number "
"of bytes written to rows_sources file ({}). It is a bug.",
sum_input_rows_exact, input_rows_filtered, rows_sources_count);
ErrorCodes::LOGICAL_ERROR,
"Number of rows in source parts ({}) excluding filtered rows ({}) differs from number "
"of bytes written to rows_sources file ({}). It is a bug.",
sum_input_rows_exact, input_rows_filtered, rows_sources_count);
/// TemporaryDataOnDisk::createRawStream returns WriteBufferFromFile implementing IReadableWriteBuffer
/// and we expect to get ReadBufferFromFile here.
@ -756,6 +759,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
global_ctx->space_reservation,
global_ctx->deduplicate,
global_ctx->deduplicate_by_columns,
global_ctx->cleanup,
projection_merging_params,
global_ctx->need_prefix,
global_ctx->new_data_part.get(),
@ -1019,9 +1023,13 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
break;
case MergeTreeData::MergingParams::Replacing:
if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed");
merged_transform = std::make_shared<ReplacingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.version_column,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
header, pipes.size(), sort_description, ctx->merging_params.is_deleted_column, ctx->merging_params.version_column,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size,
global_ctx->cleanup);
break;
case MergeTreeData::MergingParams::Graphite:
@ -1110,6 +1118,8 @@ MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm
return MergeAlgorithm::Horizontal;
if (global_ctx->future_part->part_format.storage_type != MergeTreeDataPartStorageType::Full)
return MergeAlgorithm::Horizontal;
if (global_ctx->cleanup)
return MergeAlgorithm::Horizontal;
if (!data_settings->allow_vertical_merges_from_compact_to_wide_parts)
{

View File

@ -67,6 +67,7 @@ public:
ReservationSharedPtr space_reservation_,
bool deduplicate_,
Names deduplicate_by_columns_,
bool cleanup_,
MergeTreeData::MergingParams merging_params_,
bool need_prefix,
IMergeTreeDataPart * parent_part_,
@ -90,6 +91,7 @@ public:
global_ctx->space_reservation = std::move(space_reservation_);
global_ctx->deduplicate = std::move(deduplicate_);
global_ctx->deduplicate_by_columns = std::move(deduplicate_by_columns_);
global_ctx->cleanup = std::move(cleanup_);
global_ctx->parent_part = std::move(parent_part_);
global_ctx->data = std::move(data_);
global_ctx->mutator = std::move(mutator_);
@ -158,6 +160,7 @@ private:
ReservationSharedPtr space_reservation{nullptr};
bool deduplicate{false};
Names deduplicate_by_columns{};
bool cleanup{false};
NamesAndTypesList gathering_columns{};
NamesAndTypesList merging_columns{};

View File

@ -846,6 +846,10 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
{
const auto columns = metadata.getColumns().getAllPhysical();
if (!is_deleted_column.empty() && mode != MergingParams::Replacing)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"is_deleted column for MergeTree cannot be specified in modes except Replacing.");
if (!sign_column.empty() && mode != MergingParams::Collapsing && mode != MergingParams::VersionedCollapsing)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Sign column for MergeTree cannot be specified "
@ -915,6 +919,41 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "Version column {} does not exist in table declaration.", version_column);
};
/// Check that if the is_deleted column is needed, it exists and is of type UInt8. If exist, version column must be defined too but version checks are not done here.
auto check_is_deleted_column = [this, & columns](bool is_optional, const std::string & storage)
{
if (is_deleted_column.empty())
{
if (is_optional)
return;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: is_deleted ({}) column for storage {} is empty", is_deleted_column, storage);
}
else
{
if (version_column.empty() && !is_optional)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: Version column ({}) for storage {} is empty while is_deleted ({}) is not.",
version_column, storage, is_deleted_column);
bool miss_is_deleted_column = true;
for (const auto & column : columns)
{
if (column.name == is_deleted_column)
{
if (!typeid_cast<const DataTypeUInt8 *>(column.type.get()))
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "is_deleted column ({}) for storage {} must have type UInt8. Provided column of type {}.",
is_deleted_column, storage, column.type->getName());
miss_is_deleted_column = false;
break;
}
}
if (miss_is_deleted_column)
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "is_deleted column {} does not exist in table declaration.", is_deleted_column);
}
};
if (mode == MergingParams::Collapsing)
check_sign_column(false, "CollapsingMergeTree");
@ -951,6 +990,7 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
if (mode == MergingParams::Replacing)
{
check_is_deleted_column(true, "ReplacingMergeTree");
check_version_column(true, "ReplacingMergeTree");
}

View File

@ -349,6 +349,9 @@ public:
/// For Collapsing and VersionedCollapsing mode.
String sign_column;
/// For Replacing mode. Can be empty for Replacing.
String is_deleted_column;
/// For Summing mode. If empty - columns_to_sum is determined automatically.
Names columns_to_sum;

View File

@ -676,6 +676,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
ReservationSharedPtr space_reservation,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeData::MergingParams & merging_params,
const MergeTreeTransactionPtr & txn,
bool need_prefix,
@ -692,6 +693,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
space_reservation,
deduplicate,
deduplicate_by_columns,
cleanup,
merging_params,
need_prefix,
parent_part,

View File

@ -165,6 +165,7 @@ public:
ReservationSharedPtr space_reservation,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeData::MergingParams & merging_params,
const MergeTreeTransactionPtr & txn,
bool need_prefix = true,

View File

@ -325,7 +325,7 @@ Block MergeTreeDataWriter::mergeBlock(
return nullptr;
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedAlgorithm>(
block, 1, sort_description, merging_params.version_column, block_size + 1, /*block_size_bytes=*/0);
block, 1, sort_description, merging_params.is_deleted_column, merging_params.version_column, block_size + 1, /*block_size_bytes=*/0);
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedAlgorithm>(
block, 1, sort_description, merging_params.sign_column,

View File

@ -192,6 +192,7 @@ struct Settings;
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
M(Bool, cache_populated_by_fetch, false, "Only available in ClickHouse Cloud", 0) \
M(Bool, allow_experimental_block_number_column, false, "Enable persisting column _block_number for each row.", 0) \
M(Bool, allow_experimental_replacing_merge_with_cleanup, false, "Allow experimental CLEANUP merges for ReplacingMergeTree with is_deleted column.", 0) \
\
/** Compress marks and primary key. */ \
M(Bool, compress_marks, true, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \
@ -232,7 +233,7 @@ struct Settings;
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, Seconds, replicated_fetches_http_send_timeout, 0) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, Seconds, replicated_fetches_http_receive_timeout, 0) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, UInt64, replicated_max_parallel_fetches_for_host, DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, String, clean_deleted_rows, "") \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, CleanDeletedRows, clean_deleted_rows, CleanDeletedRows::Never) \
/// Settings that should not change after the creation of a table.
/// NOLINTNEXTLINE

View File

@ -1057,6 +1057,7 @@ public:
ctx->space_reservation,
false, // TODO Do we need deduplicate for projections
{},
false, // no cleanup
projection_merging_params,
NO_TRANSACTION_PTR,
/* need_prefix */ true,

View File

@ -96,6 +96,9 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
}
}
if (cleanup)
out << "\ncleanup: " << cleanup;
break;
case DROP_RANGE:
@ -270,11 +273,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in, MergeTreeDataFor
deduplicate_by_columns = std::move(new_deduplicate_by_columns);
}
else if (checkString("cleanup: ", in))
{
/// Obsolete option, does nothing.
bool cleanup = false;
in >> cleanup;
}
else
trailing_newline_found = true;
}

View File

@ -98,6 +98,7 @@ struct ReplicatedMergeTreeLogEntryData
Strings source_parts;
bool deduplicate = false; /// Do deduplicate on merge
Strings deduplicate_by_columns = {}; // Which columns should be checked for duplicates, empty means 'all' (default).
bool cleanup = false;
MergeType merge_type = MergeType::Regular;
String column_name;
String index_name;

View File

@ -52,6 +52,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
index_granularity = data_settings->index_granularity;
merging_params_mode = static_cast<int>(data.merging_params.mode);
sign_column = data.merging_params.sign_column;
is_deleted_column = data.merging_params.is_deleted_column;
columns_to_sum = fmt::format("{}", fmt::join(data.merging_params.columns_to_sum.begin(), data.merging_params.columns_to_sum.end(), ","));
version_column = data.merging_params.version_column;
if (data.merging_params.mode == MergeTreeData::MergingParams::Graphite)
@ -156,6 +157,8 @@ void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
out << "merge parameters format version: " << merge_params_version << "\n";
if (!version_column.empty())
out << "version column: " << version_column << "\n";
if (!is_deleted_column.empty())
out << "is_deleted column: " << is_deleted_column << "\n";
if (!columns_to_sum.empty())
out << "columns to sum: " << columns_to_sum << "\n";
if (!graphite_params_hash.empty())
@ -221,6 +224,9 @@ void ReplicatedMergeTreeTableMetadata::read(ReadBuffer & in)
if (checkString("version column: ", in))
in >> version_column >> "\n";
if (checkString("is_deleted column: ", in))
in >> is_deleted_column >> "\n";
if (checkString("columns to sum: ", in))
in >> columns_to_sum >> "\n";
@ -273,6 +279,10 @@ void ReplicatedMergeTreeTableMetadata::checkImmutableFieldsEquals(const Replicat
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in version column. "
"Stored in ZooKeeper: {}, local: {}", from_zk.version_column, version_column);
if (is_deleted_column != from_zk.is_deleted_column)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in is_deleted column. "
"Stored in ZooKeeper: {}, local: {}", from_zk.is_deleted_column, is_deleted_column);
if (columns_to_sum != from_zk.columns_to_sum)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in sum columns. "
"Stored in ZooKeeper: {}, local: {}", from_zk.columns_to_sum, columns_to_sum);

View File

@ -29,6 +29,7 @@ struct ReplicatedMergeTreeTableMetadata
int merge_params_version = REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS;
String sign_column;
String version_column;
String is_deleted_column;
String columns_to_sum;
String graphite_params_hash;
String primary_key;

View File

@ -138,7 +138,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
* CollapsingMergeTree(date, [sample_key], primary_key, index_granularity, sign)
* SummingMergeTree(date, [sample_key], primary_key, index_granularity, [columns_to_sum])
* AggregatingMergeTree(date, [sample_key], primary_key, index_granularity)
* ReplacingMergeTree(date, [sample_key], primary_key, index_granularity, [version_column])
* ReplacingMergeTree(date, [sample_key], primary_key, index_granularity, [version_column [, is_deleted_column]])
* GraphiteMergeTree(date, [sample_key], primary_key, index_granularity, 'config_element')
*
* Alternatively, you can specify:
@ -441,11 +441,11 @@ static StoragePtr create(const StorageFactory::Arguments & args)
}
else if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
{
/// Due to a misfortune, there could be an extra obsolete parameter.
/// We ignore it for backward compatibility.
// if there is args and number of optional parameter is higher than 1
// is_deleted is not allowed with the 'allow_deprecated_syntax_for_merge_tree' settings
if (arg_cnt - arg_num == 2 && !engine_args[arg_cnt - 1]->as<ASTLiteral>() && is_extended_storage_def)
{
if (!tryGetIdentifierName(engine_args[arg_cnt - 1]))
if (!tryGetIdentifierNameInto(engine_args[arg_cnt - 1], merging_params.is_deleted_column))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "is_deleted column name must be an identifier {}", verbose_help_message);
--arg_cnt;
}

View File

@ -321,6 +321,7 @@ bool StorageEmbeddedRocksDB::optimize(
bool final,
bool deduplicate,
const Names & /* deduplicate_by_columns */,
bool cleanup,
ContextPtr /*context*/)
{
if (partition)
@ -332,6 +333,9 @@ bool StorageEmbeddedRocksDB::optimize(
if (deduplicate)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "DEDUPLICATE cannot be specified when optimizing table of type EmbeddedRocksDB");
if (cleanup)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "CLEANUP cannot be specified when optimizing table of type EmbeddedRocksDB");
std::shared_lock lock(rocksdb_ptr_mx);
rocksdb::CompactRangeOptions compact_options;
auto status = rocksdb_ptr->CompactRange(compact_options, nullptr, nullptr);

View File

@ -65,6 +65,7 @@ public:
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr context) override;
bool supportsParallelInsert() const override { return true; }

View File

@ -685,7 +685,7 @@ void StorageBuffer::flushAndPrepareForShutdown()
try
{
optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, getContext());
optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, false /*cleanup*/, getContext());
}
catch (...)
{
@ -711,6 +711,7 @@ bool StorageBuffer::optimize(
bool final,
bool deduplicate,
const Names & /* deduplicate_by_columns */,
bool cleanup,
ContextPtr /*context*/)
{
if (partition)
@ -722,6 +723,9 @@ bool StorageBuffer::optimize(
if (deduplicate)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "DEDUPLICATE cannot be specified when optimizing table of type Buffer");
if (cleanup)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "CLEANUP cannot be specified when optimizing table of type Buffer");
flushAllBuffers(false);
return true;
}
@ -1054,7 +1058,7 @@ void StorageBuffer::alter(const AlterCommands & params, ContextPtr local_context
auto metadata_snapshot = getInMemoryMetadataPtr();
/// Flush buffers to the storage because BufferSource skips buffers with old metadata_version.
optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, {}, local_context);
optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, {}, false /*cleanup*/, local_context);
StorageInMemoryMetadata new_metadata = *metadata_snapshot;
params.apply(new_metadata, local_context);

View File

@ -100,6 +100,7 @@ public:
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr context) override;
bool supportsSampling() const override { return true; }

View File

@ -278,12 +278,13 @@ bool StorageMaterializedView::optimize(
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr local_context)
{
checkStatementCanBeForwarded();
auto storage_ptr = getTargetTable();
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
return storage_ptr->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, local_context);
return storage_ptr->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context);
}
std::tuple<ContextMutablePtr, std::shared_ptr<ASTInsertQuery>> StorageMaterializedView::prepareRefresh() const

View File

@ -48,6 +48,7 @@ public:
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr context) override;
void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & table_lock_holder) override;

View File

@ -62,6 +62,7 @@ namespace ErrorCodes
extern const int UNKNOWN_POLICY;
extern const int NO_SUCH_DATA_PART;
extern const int ABORTED;
extern const int SUPPORT_IS_DISABLED;
}
namespace ActionLocks
@ -1095,6 +1096,7 @@ bool StorageMergeTree::merge(
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason,
bool optimize_skip_merged_partitions)
@ -1134,7 +1136,7 @@ bool StorageMergeTree::merge(
/// Copying a vector of columns `deduplicate by columns.
IExecutableTask::TaskResultCallback f = [](bool) {};
auto task = std::make_shared<MergePlainMergeTreeTask>(
*this, metadata_snapshot, deduplicate, deduplicate_by_columns, merge_mutate_entry, table_lock_holder, f);
*this, metadata_snapshot, deduplicate, deduplicate_by_columns, cleanup, merge_mutate_entry, table_lock_holder, f);
task->setCurrentTransaction(MergeTreeTransactionHolder{}, MergeTreeTransactionPtr{txn});
@ -1372,7 +1374,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
if (merge_entry)
{
auto task = std::make_shared<MergePlainMergeTreeTask>(*this, metadata_snapshot, /* deduplicate */ false, Names{}, merge_entry, shared_lock, common_assignee_trigger);
auto task = std::make_shared<MergePlainMergeTreeTask>(*this, metadata_snapshot, /* deduplicate */ false, Names{}, /* cleanup */ false, merge_entry, shared_lock, common_assignee_trigger);
task->setCurrentTransaction(std::move(transaction_for_merge), std::move(txn));
bool scheduled = assignee.scheduleMergeMutateTask(task);
/// The problem that we already booked a slot for TTL merge, but a merge list entry will be created only in a prepare method
@ -1506,6 +1508,7 @@ bool StorageMergeTree::optimize(
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr local_context)
{
if (deduplicate)
@ -1521,6 +1524,16 @@ bool StorageMergeTree::optimize(
String disable_reason;
if (!partition && final)
{
if (cleanup && this->merging_params.mode != MergingParams::Mode::Replacing)
{
constexpr const char * message = "Cannot OPTIMIZE with CLEANUP table: {}";
disable_reason = "only ReplacingMergeTree can be CLEANUP";
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
}
if (cleanup && !getSettings()->allow_experimental_replacing_merge_with_cleanup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed");
DataPartsVector data_parts = getVisibleDataPartsVector(local_context);
std::unordered_set<String> partition_ids;
@ -1535,6 +1548,7 @@ bool StorageMergeTree::optimize(
true,
deduplicate,
deduplicate_by_columns,
cleanup,
txn,
disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
@ -1562,6 +1576,7 @@ bool StorageMergeTree::optimize(
final,
deduplicate,
deduplicate_by_columns,
cleanup,
txn,
disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))

View File

@ -81,6 +81,7 @@ public:
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr context) override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
@ -169,13 +170,14 @@ private:
* Returns true if merge is finished successfully.
*/
bool merge(
bool aggressive,
const String & partition_id,
bool final, bool deduplicate,
const Names & deduplicate_by_columns,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason,
bool optimize_skip_merged_partitions = false);
bool aggressive,
const String & partition_id,
bool final, bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeTransactionPtr & txn,
String & out_disable_reason,
bool optimize_skip_merged_partitions = false);
void renameAndCommitEmptyParts(MutableDataPartsVector & new_parts, Transaction & transaction);

View File

@ -121,15 +121,16 @@ public:
}
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
ContextPtr context) override
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr context) override
{
return getNested()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, context);
return getNested()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, context);
}
void mutate(const MutationCommands & commands, ContextPtr context) override { getNested()->mutate(commands, context); }

View File

@ -3730,10 +3730,12 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
future_merged_part->part_format,
deduplicate,
deduplicate_by_columns,
/*cleanup*/ false,
nullptr,
merge_pred->getVersion(),
future_merged_part->merge_type);
if (create_result == CreateMergeEntryResult::Ok)
return AttemptStatus::EntryCreated;
if (create_result == CreateMergeEntryResult::LogUpdated)
@ -3850,6 +3852,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
const MergeTreeDataPartFormat & merged_part_format,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ReplicatedMergeTreeLogEntryData * out_log_entry,
int32_t log_version,
MergeType merge_type)
@ -3889,6 +3892,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
entry.merge_type = merge_type;
entry.deduplicate = deduplicate;
entry.deduplicate_by_columns = deduplicate_by_columns;
entry.cleanup = cleanup;
entry.create_time = time(nullptr);
for (const auto & part : parts)
@ -5623,6 +5627,7 @@ bool StorageReplicatedMergeTree::optimize(
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr query_context)
{
/// NOTE: exclusive lock cannot be used here, since this may lead to deadlock (see comments below),
@ -5634,6 +5639,13 @@ bool StorageReplicatedMergeTree::optimize(
if (!is_leader)
throw Exception(ErrorCodes::NOT_A_LEADER, "OPTIMIZE cannot be done on this replica because it is not a leader");
if (cleanup)
{
if (!getSettings()->allow_experimental_replacing_merge_with_cleanup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed");
LOG_DEBUG(log, "Cleanup the ReplicatedMergeTree.");
}
auto handle_noop = [&]<typename... Args>(FormatStringHelper<Args...> fmt_string, Args && ...args)
{
PreformattedMessage message = fmt_string.format(std::forward<Args>(args)...);
@ -5712,6 +5724,7 @@ bool StorageReplicatedMergeTree::optimize(
future_merged_part->uuid,
future_merged_part->part_format,
deduplicate, deduplicate_by_columns,
cleanup,
&merge_entry, can_merge.getVersion(),
future_merged_part->merge_type);
@ -5736,6 +5749,13 @@ bool StorageReplicatedMergeTree::optimize(
bool assigned = false;
if (!partition && final)
{
if (cleanup && this->merging_params.mode != MergingParams::Mode::Replacing)
{
constexpr const char * message = "Cannot OPTIMIZE with CLEANUP table: {}";
String disable_reason = "only ReplacingMergeTree can be CLEANUP";
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
}
DataPartsVector data_parts = getVisibleDataPartsVector(query_context);
std::unordered_set<String> partition_ids;

View File

@ -178,6 +178,7 @@ public:
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr query_context) override;
void alter(const AlterCommands & commands, ContextPtr query_context, AlterLockHolder & table_lock_holder) override;
@ -745,6 +746,7 @@ private:
const MergeTreeDataPartFormat & merged_part_format,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ReplicatedMergeTreeLogEntryData * out_log_entry,
int32_t log_version,
MergeType merge_type);

View File

@ -435,11 +435,12 @@ bool StorageWindowView::optimize(
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr local_context)
{
auto storage_ptr = getInnerTable();
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
return getInnerTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, local_context);
return getInnerTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context);
}
void StorageWindowView::alter(

View File

@ -134,6 +134,7 @@ public:
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr context) override;
void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & table_lock_holder) override;

View File

@ -3,7 +3,5 @@
2018-01-01 2 2
2018-01-01 2 2
== (Replicas) Test optimize ==
d1 2 1
d2 1 0
d3 2 1
d4 1 0

View File

@ -3,28 +3,28 @@ set optimize_on_insert = 0;
drop table if exists tab_00577;
create table tab_00577 (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date settings enable_vertical_merge_algorithm = 1,
vertical_merge_algorithm_min_rows_to_activate = 0, vertical_merge_algorithm_min_columns_to_activate = 0, min_rows_for_wide_part = 0,
min_bytes_for_wide_part = 0;
min_bytes_for_wide_part = 0, allow_experimental_replacing_merge_with_cleanup=1;
insert into tab_00577 values ('2018-01-01', 2, 2), ('2018-01-01', 1, 1);
insert into tab_00577 values ('2018-01-01', 0, 0);
select * from tab_00577 order by version;
OPTIMIZE TABLE tab_00577 FINAL;
OPTIMIZE TABLE tab_00577 FINAL CLEANUP;
select * from tab_00577;
drop table tab_00577;
DROP TABLE IF EXISTS testCleanupR1;
CREATE TABLE testCleanupR1 (uid String, version UInt32, is_deleted UInt8)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/test_cleanup/', 'r1', version)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/test_cleanup/', 'r1', version, is_deleted)
ORDER BY uid SETTINGS enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 0, vertical_merge_algorithm_min_columns_to_activate = 0, min_rows_for_wide_part = 0,
min_bytes_for_wide_part = 0;
min_bytes_for_wide_part = 0, allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testCleanupR1 (*) VALUES ('d1', 1, 0),('d2', 1, 0),('d3', 1, 0),('d4', 1, 0);
INSERT INTO testCleanupR1 (*) VALUES ('d3', 2, 1);
INSERT INTO testCleanupR1 (*) VALUES ('d1', 2, 1);
SYSTEM SYNC REPLICA testCleanupR1; -- Avoid "Cannot select parts for optimization: Entry for part all_2_2_0 hasn't been read from the replication log yet"
OPTIMIZE TABLE testCleanupR1 FINAL;
OPTIMIZE TABLE testCleanupR1 FINAL CLEANUP;
-- Only d3 to d5 remain
SELECT '== (Replicas) Test optimize ==';
SELECT * FROM testCleanupR1 order by uid;
DROP TABLE IF EXISTS testCleanupR1
DROP TABLE IF EXISTS testCleanupR1

View File

@ -0,0 +1,121 @@
== Test SELECT ... FINAL - no is_deleted ==
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
== Test SELECT ... FINAL - no is_deleted SETTINGS clean_deleted_rows=Always ==
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
== Test SELECT ... FINAL ==
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
== Insert backups ==
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
== Insert a second batch with overlaping data ==
d1 5 0
d2 3 0
d3 3 0
d4 3 0
d5 1 0
== Only last version remains after OPTIMIZE W/ CLEANUP ==
d1 5 0
d2 1 0
d3 1 0
d4 1 0
d5 1 0
d6 3 0
== OPTIMIZE W/ CLEANUP (remove d6) ==
d1 5 0
d2 1 0
d3 1 0
d4 1 0
d5 1 0
== Test of the SETTINGS clean_deleted_rows as Always ==
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
== Test of the SETTINGS clean_deleted_rows as Never ==
d1 5 0
d2 1 0
d3 1 0
d4 3 0
d5 1 0
d6 2 1
== (Replicas) Test optimize ==
d2 1 0
d4 1 0
== (Replicas) Test settings ==
c2 1 0
c4 1 0
no cleanup 1 d1 5 0
no cleanup 1 d2 1 0
no cleanup 1 d3 1 0
no cleanup 1 d4 3 0
no cleanup 1 d5 1 0
no cleanup 2 d1 5 0
no cleanup 2 d2 1 0
no cleanup 2 d3 1 0
no cleanup 2 d4 3 0
no cleanup 2 d5 1 0
no cleanup 2 d6 2 1
no cleanup 3 d1 5 0
no cleanup 3 d2 1 0
no cleanup 3 d3 1 0
no cleanup 3 d4 3 0
no cleanup 3 d5 1 0
no cleanup 4 d1 5 0
no cleanup 4 d2 1 0
no cleanup 4 d3 1 0
no cleanup 4 d4 3 0
no cleanup 4 d5 1 0
no cleanup 4 d6 2 1
== Check cleanup & settings for other merge trees ==
d1 1 1
d1 1 1
d1 1 1
d1 1 1 1
d1 1 1 1

View File

@ -0,0 +1,174 @@
-- Tags: zookeeper
-- Settings allow_deprecated_syntax_for_merge_tree prevent to enable the is_deleted column
set allow_deprecated_syntax_for_merge_tree=0;
-- Test the bahaviour without the is_deleted column
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version) Order by (uid) settings allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
SELECT '== Test SELECT ... FINAL - no is_deleted ==';
select * from test FINAL order by uid;
OPTIMIZE TABLE test FINAL CLEANUP;
select * from test order by uid;
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version) Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
SELECT '== Test SELECT ... FINAL - no is_deleted SETTINGS clean_deleted_rows=Always ==';
select * from test FINAL order by uid;
OPTIMIZE TABLE test FINAL CLEANUP;
select * from test order by uid;
-- Test the new behaviour
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) settings allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
SELECT '== Test SELECT ... FINAL ==';
select * from test FINAL order by uid;
select * from test order by uid;
SELECT '== Insert backups ==';
INSERT INTO test (*) VALUES ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1);
select * from test FINAL order by uid;
SELECT '== Insert a second batch with overlaping data ==';
INSERT INTO test (*) VALUES ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 1), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0), ('d2', 2, 1), ('d2', 3, 0), ('d3', 2, 1), ('d3', 3, 0);
select * from test FINAL order by uid;
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) settings allow_experimental_replacing_merge_with_cleanup=1;
-- Expect d6 to be version=3 is_deleted=false
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 0);
-- Insert previous version of 'd6' but only v=3 is_deleted=false will remain
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 2, 1);
SELECT '== Only last version remains after OPTIMIZE W/ CLEANUP ==';
OPTIMIZE TABLE test FINAL CLEANUP;
select * from test order by uid;
-- insert d6 v=3 is_deleted=true (timestamp more recent so this version should be the one take into acount)
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 1);
SELECT '== OPTIMIZE W/ CLEANUP (remove d6) ==';
OPTIMIZE TABLE test FINAL CLEANUP;
-- No d6 anymore
select * from test order by uid;
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
SELECT '== Test of the SETTINGS clean_deleted_rows as Always ==';
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
-- Even if the setting is set to Always, the SELECT FINAL doesn't delete rows
select * from test FINAL order by uid;
select * from test order by uid;
OPTIMIZE TABLE test FINAL;
-- d6 has to be removed since we set clean_deleted_rows as 'Always'
select * from test where is_deleted=0 order by uid;
SELECT '== Test of the SETTINGS clean_deleted_rows as Never ==';
ALTER TABLE test MODIFY SETTING clean_deleted_rows='Never';
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
OPTIMIZE TABLE test FINAL;
-- d6 has NOT to be removed since we set clean_deleted_rows as 'Never'
select * from test order by uid;
DROP TABLE IF EXISTS testCleanupR1;
CREATE TABLE testCleanupR1 (uid String, version UInt32, is_deleted UInt8)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/test_cleanup/', 'r1', version, is_deleted)
ORDER BY uid settings allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testCleanupR1 (*) VALUES ('d1', 1, 0),('d2', 1, 0),('d3', 1, 0),('d4', 1, 0);
INSERT INTO testCleanupR1 (*) VALUES ('d3', 2, 1);
INSERT INTO testCleanupR1 (*) VALUES ('d1', 2, 1);
SYSTEM SYNC REPLICA testCleanupR1; -- Avoid "Cannot select parts for optimization: Entry for part all_2_2_0 hasn't been read from the replication log yet"
OPTIMIZE TABLE testCleanupR1 FINAL CLEANUP;
-- Only d3 to d5 remain
SELECT '== (Replicas) Test optimize ==';
SELECT * FROM testCleanupR1 order by uid;
------------------------------
DROP TABLE IF EXISTS testSettingsR1;
CREATE TABLE testSettingsR1 (col1 String, version UInt32, is_deleted UInt8)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/test_setting/', 'r1', version, is_deleted)
ORDER BY col1
SETTINGS clean_deleted_rows = 'Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testSettingsR1 (*) VALUES ('c1', 1, 1),('c2', 1, 0),('c3', 1, 1),('c4', 1, 0);
SYSTEM SYNC REPLICA testSettingsR1; -- Avoid "Cannot select parts for optimization: Entry for part all_2_2_0 hasn't been read from the replication log yet"
OPTIMIZE TABLE testSettingsR1 FINAL;
-- Only d3 to d5 remain
SELECT '== (Replicas) Test settings ==';
SELECT * FROM testSettingsR1 where is_deleted=0 order by col1;
------------------------------
-- Check errors
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) settings allow_experimental_replacing_merge_with_cleanup=1;
-- is_deleted == 0/1
INSERT INTO test (*) VALUES ('d1', 1, 2); -- { serverError INCORRECT_DATA }
DROP TABLE IF EXISTS test;
-- checkis_deleted type
CREATE TABLE test (uid String, version UInt32, is_deleted String) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid); -- { serverError BAD_TYPE_OF_FIELD }
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid);
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
select 'no cleanup 1', * from test FINAL order by uid;
OPTIMIZE TABLE test FINAL CLEANUP; -- { serverError SUPPORT_IS_DISABLED }
select 'no cleanup 2', * from test order by uid;
DROP TABLE test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/no_cleanup/', 'r1', version, is_deleted) Order by (uid);
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d2', 1, 0), ('d6', 1, 0), ('d4', 1, 0), ('d6', 2, 1), ('d3', 1, 0), ('d1', 2, 1), ('d5', 1, 0), ('d4', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d4', 3, 0), ('d1', 5, 0);
select 'no cleanup 3', * from test FINAL order by uid;
OPTIMIZE TABLE test FINAL CLEANUP; -- { serverError SUPPORT_IS_DISABLED }
select 'no cleanup 4', * from test order by uid;
DROP TABLE test;
-- is_deleted column for other mergeTrees - ErrorCodes::LOGICAL_ERROR)
-- Check clean_deleted_rows='Always' for other MergeTrees
SELECT '== Check cleanup & settings for other merge trees ==';
CREATE TABLE testMT (uid String, version UInt32, is_deleted UInt8) ENGINE = MergeTree() Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testMT (*) VALUES ('d1', 1, 1);
OPTIMIZE TABLE testMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testMT FINAL;
SELECT * FROM testMT order by uid;
CREATE TABLE testSummingMT (uid String, version UInt32, is_deleted UInt8) ENGINE = SummingMergeTree() Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testSummingMT (*) VALUES ('d1', 1, 1);
OPTIMIZE TABLE testSummingMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testSummingMT FINAL;
SELECT * FROM testSummingMT order by uid;
CREATE TABLE testAggregatingMT (uid String, version UInt32, is_deleted UInt8) ENGINE = AggregatingMergeTree() Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testAggregatingMT (*) VALUES ('d1', 1, 1);
OPTIMIZE TABLE testAggregatingMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testAggregatingMT FINAL;
SELECT * FROM testAggregatingMT order by uid;
CREATE TABLE testCollapsingMT (uid String, version UInt32, is_deleted UInt8, sign Int8) ENGINE = CollapsingMergeTree(sign) Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testCollapsingMT (*) VALUES ('d1', 1, 1, 1);
OPTIMIZE TABLE testCollapsingMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testCollapsingMT FINAL;
SELECT * FROM testCollapsingMT order by uid;
CREATE TABLE testVersionedCMT (uid String, version UInt32, is_deleted UInt8, sign Int8) ENGINE = VersionedCollapsingMergeTree(sign, version) Order by (uid) SETTINGS clean_deleted_rows='Always', allow_experimental_replacing_merge_with_cleanup=1;
INSERT INTO testVersionedCMT (*) VALUES ('d1', 1, 1, 1);
OPTIMIZE TABLE testVersionedCMT FINAL CLEANUP; -- { serverError CANNOT_ASSIGN_OPTIMIZE }
OPTIMIZE TABLE testVersionedCMT FINAL;
SELECT * FROM testVersionedCMT order by uid;

View File

@ -0,0 +1,31 @@
--- Based on https://github.com/ClickHouse/ClickHouse/issues/49685
--- Verify that ReplacingMergeTree properly handles _is_deleted:
--- SELECT FINAL should take `_is_deleted` into consideration when there is only one partition.
-- { echoOn }
DROP TABLE IF EXISTS t;
CREATE TABLE t
(
`account_id` UInt64,
`_is_deleted` UInt8,
`_version` UInt64
)
ENGINE = ReplacingMergeTree(_version, _is_deleted)
ORDER BY (account_id);
INSERT INTO t SELECT number, 0, 1 FROM numbers(1e3);
-- Mark the first 100 rows as deleted.
INSERT INTO t SELECT number, 1, 1 FROM numbers(1e2);
-- Put everything in one partition
OPTIMIZE TABLE t FINAL;
SELECT count() FROM t;
1000
SELECT count() FROM t FINAL;
900
-- Both should produce the same number of rows.
-- Previously, `do_not_merge_across_partitions_select_final = 1` showed more rows,
-- as if no rows were deleted.
SELECT count() FROM t FINAL SETTINGS do_not_merge_across_partitions_select_final = 1;
900
SELECT count() FROM t FINAL SETTINGS do_not_merge_across_partitions_select_final = 0;
900
DROP TABLE t;

View File

@ -0,0 +1,32 @@
--- Based on https://github.com/ClickHouse/ClickHouse/issues/49685
--- Verify that ReplacingMergeTree properly handles _is_deleted:
--- SELECT FINAL should take `_is_deleted` into consideration when there is only one partition.
-- { echoOn }
DROP TABLE IF EXISTS t;
CREATE TABLE t
(
`account_id` UInt64,
`_is_deleted` UInt8,
`_version` UInt64
)
ENGINE = ReplacingMergeTree(_version, _is_deleted)
ORDER BY (account_id);
INSERT INTO t SELECT number, 0, 1 FROM numbers(1e3);
-- Mark the first 100 rows as deleted.
INSERT INTO t SELECT number, 1, 1 FROM numbers(1e2);
-- Put everything in one partition
OPTIMIZE TABLE t FINAL;
SELECT count() FROM t;
SELECT count() FROM t FINAL;
-- Both should produce the same number of rows.
-- Previously, `do_not_merge_across_partitions_select_final = 1` showed more rows,
-- as if no rows were deleted.
SELECT count() FROM t FINAL SETTINGS do_not_merge_across_partitions_select_final = 1;
SELECT count() FROM t FINAL SETTINGS do_not_merge_across_partitions_select_final = 0;
DROP TABLE t;

View File

@ -0,0 +1,13 @@
== Only last version remains after OPTIMIZE W/ CLEANUP ==
d1 5 0
d2 1 0
d3 1 0
d4 1 0
d5 1 0
d6 3 0
== OPTIMIZE W/ CLEANUP (remove d6) ==
d1 5 0
d2 1 0
d3 1 0
d4 1 0
d5 1 0

View File

@ -0,0 +1,24 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) SETTINGS vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 0,
min_rows_for_wide_part = 1,
min_bytes_for_wide_part = 1,
allow_experimental_replacing_merge_with_cleanup=1;
-- Expect d6 to be version=3 is_deleted=false
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 0);
-- Insert previous version of 'd6' but only v=3 is_deleted=false will remain
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 2, 1);
SELECT '== Only last version remains after OPTIMIZE W/ CLEANUP ==';
OPTIMIZE TABLE test FINAL CLEANUP;
select * from test order by uid;
-- insert d6 v=3 is_deleted=true (timestamp more recent so this version should be the one take into acount)
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 1);
SELECT '== OPTIMIZE W/ CLEANUP (remove d6) ==';
OPTIMIZE TABLE test FINAL CLEANUP;
-- No d6 anymore
select * from test order by uid;
DROP TABLE IF EXISTS test;

View File

@ -17,6 +17,26 @@ CREATE TABLE t_r
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t/', 'r2')
ORDER BY id; -- { serverError METADATA_MISMATCH }
CREATE TABLE t2
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64,
`deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r1', legacy_ver)
ORDER BY id;
CREATE TABLE t2_r
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64,
`deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r2', legacy_ver, deleted)
ORDER BY id; -- { serverError METADATA_MISMATCH }
CREATE TABLE t3
(
`key` UInt64,

View File

@ -1,7 +0,0 @@
# There was a wrong, harmful feature, leading to bugs and data corruption.
# This feature is removed, but we take care to maintain compatibility on the syntax level, so now it works as a no-op.
DROP TABLE IF EXISTS t;
CREATE TABLE t (x UInt8, PRIMARY KEY x) ENGINE = ReplacingMergeTree;
OPTIMIZE TABLE t CLEANUP;
DROP TABLE t;