initial ver

This commit is contained in:
MikhailBurdukov 2024-09-03 08:24:58 +00:00
parent 1126ae3e5a
commit 0056d04b9c
23 changed files with 665 additions and 19 deletions

View File

@ -68,7 +68,14 @@ public:
/// Check if we have any volume with stopped merges
virtual bool hasAnyVolumeWithDisabledMerges() const = 0;
virtual bool containsVolume(const String & volume_name) const = 0;
/// Returns disks by type ordered by volumes priority
enum class MovePolicy : uint8_t
{
BY_PART_SIZE,
BY_INSERT_DATA_TIME
};
/// Returns policy of how to choose parts for move to the next volume.
virtual IStoragePolicy::MovePolicy getMovePolicy() const = 0;
};
}

View File

@ -111,6 +111,22 @@ StoragePolicy::StoragePolicy(
"Disk move factor have to be in [0., 1.] interval, but set to {} in storage policy {}",
toString(move_factor), backQuote(name));
auto move_policy_str = config.getString(config_prefix + ".move_policy", "by_part_size");
if (move_policy_str == "by_part_size")
{
move_policy = IStoragePolicy::MovePolicy::BY_PART_SIZE;
}
else if (move_policy_str == "by_insert_data_time")
{
move_policy = IStoragePolicy::MovePolicy::BY_INSERT_DATA_TIME;
}
else
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"Unknown values of move_policy parameter.");
}
buildVolumeIndices();
LOG_TRACE(log, "Storage policy {} created, total volumes {}", name, volumes.size());
}

View File

@ -92,6 +92,8 @@ public:
bool containsVolume(const String & volume_name) const override;
IStoragePolicy::MovePolicy getMovePolicy() const override { return move_policy; }
private:
Volumes volumes;
const String name;
@ -103,6 +105,8 @@ private:
/// filled more than total_size * move_factor
double move_factor = 0.1; /// by default move factor is 10%
MovePolicy move_policy = MovePolicy::BY_PART_SIZE;
void buildVolumeIndices();
LoggerPtr log;

View File

@ -885,6 +885,7 @@ void DataPartStorageOnDiskBase::clearDirectory(
request.emplace_back(fs::path(dir) / "delete-on-destroy.txt", true);
request.emplace_back(fs::path(dir) / "txn_version.txt", true);
request.emplace_back(fs::path(dir) / "metadata_version.txt", true);
request.emplace_back(fs::path(dir) / IMergeTreeDataPart::MIN_MAX_TIME_OF_DATA_INSERT_FILE, true);
disk->removeSharedFiles(request, !can_remove_shared_data, names_not_to_remove);
disk->removeDirectory(dir);

View File

@ -465,6 +465,31 @@ std::pair<time_t, time_t> IMergeTreeDataPart::getMinMaxTime() const
return {};
}
time_t IMergeTreeDataPart::getMinTimeOfDataInsertion() const
{
if (min_time_of_data_insert.has_value())
{
return *min_time_of_data_insert;
}
if (modification_time == static_cast<time_t>(0))
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent state of the part {}: min_time_of_data_insert doesn't contains value and modification_time is zero.", name);
}
return modification_time;
}
time_t IMergeTreeDataPart::getMaxTimeOfDataInsertion() const
{
if (max_time_of_data_insert.has_value())
{
return *max_time_of_data_insert;
}
if (modification_time == static_cast<time_t>(0))
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent state of the part {}: max_time_of_data_insert doesn't contains value and modification_time is zero.", name);
}
return modification_time;
}
void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos, int32_t metadata_version_)
{
@ -736,6 +761,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
checkConsistency(require_columns_checksums);
loadDefaultCompressionCodec();
loadInsertTimeInfo();
}
catch (...)
{
@ -1033,6 +1059,38 @@ void IMergeTreeDataPart::loadDefaultCompressionCodec()
}
}
void IMergeTreeDataPart::loadInsertTimeInfo()
{
bool exists = metadata_manager->exists(MIN_MAX_TIME_OF_DATA_INSERT_FILE);
if (!exists)
{
min_time_of_data_insert = {};
max_time_of_data_insert = {};
return;
}
try
{
auto file_buf = metadata_manager->read(MIN_MAX_TIME_OF_DATA_INSERT_FILE);
/// Escape undefined behavior:
/// "The behavior is undefined if *this does not contain a value"
min_time_of_data_insert = static_cast<time_t>(0);
max_time_of_data_insert = static_cast<time_t>(0);
tryReadText(*min_time_of_data_insert, *file_buf);
checkString(" ", *file_buf);
tryReadText(*max_time_of_data_insert, *file_buf);
}
catch (const DB::Exception & ex)
{
String path = fs::path(getDataPartStorage().getRelativePath()) / MIN_MAX_TIME_OF_DATA_INSERT_FILE;
LOG_WARNING(storage.log, "Cannot parse min/max time of data insert for part {} from file {}, error '{}'."
, name, path, ex.what());
min_time_of_data_insert = {};
max_time_of_data_insert = {};
}
}
template <typename Writer>
void IMergeTreeDataPart::writeMetadata(const String & filename, const WriteSettings & settings, Writer && writer)
{

View File

@ -208,6 +208,12 @@ public:
/// otherwise, if the partition key includes dateTime column (also a common case), this function will return min and max values for that column.
std::pair<time_t, time_t> getMinMaxTime() const;
/// Returns two timespamps with min/max time of when data was added in this part.
/// These values doesn't require the special partition key in part schema.
/// Just keeping for each part two variable and update them with inserts, merges and mutations.
time_t getMinTimeOfDataInsertion() const;
time_t getMaxTimeOfDataInsertion() const;
bool isEmpty() const { return rows_count == 0; }
/// Compute part block id for zero level part. Otherwise throws an exception.
@ -235,6 +241,10 @@ public:
std::optional<size_t> existing_rows_count;
time_t modification_time = 0;
std::optional<time_t> min_time_of_data_insert;
std::optional<time_t> max_time_of_data_insert;
/// When the part is removed from the working set. Changes once.
mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() };
@ -496,6 +506,9 @@ public:
/// reference counter locally.
static constexpr auto FILE_FOR_REFERENCES_CHECK = "checksums.txt";
/// File with info about min/max time when data was added in the part.
static constexpr auto MIN_MAX_TIME_OF_DATA_INSERT_FILE = "min_max_time_of_data_insert.txt";
/// Checks that all TTLs (table min/max, column ttls, so on) for part
/// calculated. Part without calculated TTL may exist if TTL was added after
/// part creation (using alter query with materialize_ttl setting).
@ -721,6 +734,8 @@ private:
/// any specifial compression.
void loadDefaultCompressionCodec();
void loadInsertTimeInfo();
void writeColumns(const NamesAndTypesList & columns_, const WriteSettings & settings);
void writeVersionMetadata(const VersionMetadata & version_, bool fsync_part_dir) const;

View File

@ -266,6 +266,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
};
SerializationInfoByName infos(global_ctx->storage_columns, info_settings);
time_t min_insert_time_res = global_ctx->future_part->parts.front()->getMinTimeOfDataInsertion();
time_t max_insert_time_res = global_ctx->future_part->parts.front()->getMaxTimeOfDataInsertion();
for (const auto & part : global_ctx->future_part->parts)
{
@ -290,8 +292,14 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
infos.add(part_infos);
}
min_insert_time_res = std::min(min_insert_time_res, part->getMinTimeOfDataInsertion());
max_insert_time_res = std::max(max_insert_time_res, part->getMaxTimeOfDataInsertion());
}
global_ctx->new_data_part->max_time_of_data_insert = max_insert_time_res;
global_ctx->new_data_part->min_time_of_data_insert = min_insert_time_res;
const auto & local_part_min_ttl = global_ctx->new_data_part->ttl_infos.part_min_ttl;
if (local_part_min_ttl && local_part_min_ttl <= global_ctx->time_of_merge)
ctx->need_remove_expired_values = true;

View File

@ -7412,6 +7412,9 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true);
dst_data_part->modification_time = dst_part_storage->getLastModified().epochTime();
// dst_data_part->min_time_of_data_insert = src_part->getMinTimeOfDataInsertion();
// dst_data_part->max_time_of_data_insert = src_part->getMaxTimeOfDataInsertion();
return std::make_pair(dst_data_part, std::move(temporary_directory_lock));
}

View File

@ -13,27 +13,38 @@ namespace ErrorCodes
{
extern const int ABORTED;
extern const int DIRECTORY_ALREADY_EXISTS;
extern const int BAD_ARGUMENTS;
}
namespace
{
struct PartsComparatorBySizeOnDisk
{
bool operator()(const MergeTreeData::DataPartPtr & f, const MergeTreeData::DataPartPtr & s) const
{
/// If parts have equal sizes, than order them by names (names are unique)
UInt64 first_part_size = f->getBytesOnDisk();
UInt64 second_part_size = s->getBytesOnDisk();
return std::tie(first_part_size, f->name) < std::tie(second_part_size, s->name);
}
};
struct PartsComparatorByOldestData
{
bool operator()(const MergeTreeData::DataPartPtr & f, const MergeTreeData::DataPartPtr & s) const
{
return std::forward_as_tuple(f->getMinTimeOfDataInsertion(), f->getMaxTimeOfDataInsertion()) >
std::forward_as_tuple(s->getMinTimeOfDataInsertion(), s->getMaxTimeOfDataInsertion());
}
};
/// Contains minimal number of heaviest parts, which sum size on disk is greater than required.
/// If there are not enough summary size, than contains all parts.
template<typename PartsComparator>
class LargestPartsWithRequiredSize
{
struct PartsSizeOnDiskComparator
{
bool operator()(const MergeTreeData::DataPartPtr & f, const MergeTreeData::DataPartPtr & s) const
{
/// If parts have equal sizes, than order them by names (names are unique)
UInt64 first_part_size = f->getBytesOnDisk();
UInt64 second_part_size = s->getBytesOnDisk();
return std::tie(first_part_size, f->name) < std::tie(second_part_size, s->name);
}
};
std::set<MergeTreeData::DataPartPtr, PartsSizeOnDiskComparator> elems;
std::set<MergeTreeData::DataPartPtr, PartsComparator> elems;
UInt64 required_size_sum;
UInt64 current_size_sum = 0;
@ -50,7 +61,7 @@ public:
}
/// Adding smaller element
if (!elems.empty() && (*elems.begin())->getBytesOnDisk() >= part->getBytesOnDisk())
if (!elems.empty() && PartsComparator()(part, *elems.begin()))
return;
elems.emplace(part);
@ -88,7 +99,8 @@ private:
}
bool MergeTreePartsMover::selectPartsForMove(
template<typename Comparator>
bool MergeTreePartsMover::selectPartsForMoveImpl(
MergeTreeMovingParts & parts_to_move,
const AllowedMovingPredicate & can_move,
const std::lock_guard<std::mutex> & /* moving_parts_lock */)
@ -102,10 +114,9 @@ bool MergeTreePartsMover::selectPartsForMove(
if (data_parts.empty())
return false;
std::unordered_map<DiskPtr, LargestPartsWithRequiredSize> need_to_move;
std::unordered_map<DiskPtr, LargestPartsWithRequiredSize<Comparator>> need_to_move;
const auto policy = data->getStoragePolicy();
const auto & volumes = policy->getVolumes();
if (!volumes.empty())
{
/// Do not check last volume
@ -209,6 +220,26 @@ bool MergeTreePartsMover::selectPartsForMove(
return false;
}
bool MergeTreePartsMover::selectPartsForMove(
MergeTreeMovingParts & parts_to_move,
const AllowedMovingPredicate & can_move,
const std::lock_guard<std::mutex> & moving_parts_lock)
{
IStoragePolicy::MovePolicy move_policy = data->getStoragePolicy()->getMovePolicy();
if (move_policy == IStoragePolicy::MovePolicy::BY_PART_SIZE)
{
return selectPartsForMoveImpl<PartsComparatorBySizeOnDisk>(parts_to_move, can_move, moving_parts_lock);
}
else if (move_policy == IStoragePolicy::MovePolicy::BY_INSERT_DATA_TIME)
{
return selectPartsForMoveImpl<PartsComparatorByOldestData>(parts_to_move, can_move, moving_parts_lock);
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown move policy.");
}
}
MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part, const ReadSettings & read_settings, const WriteSettings & write_settings) const
{
auto cancellation_hook = [&moves_blocker_ = moves_blocker]()

View File

@ -4,8 +4,9 @@
#include <optional>
#include <vector>
#include <base/scope_guard.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MovesList.h>
#include <Common/ActionBlocker.h>
@ -44,6 +45,12 @@ private:
/// Callback tells that part is not participating in background process
using AllowedMovingPredicate = std::function<bool(const std::shared_ptr<const IMergeTreeDataPart> &, String * reason)>;
template<class PartsComparator>
bool selectPartsForMoveImpl(
MergeTreeMovingParts & parts_to_move,
const AllowedMovingPredicate & can_move,
const std::lock_guard<std::mutex> & moving_parts_lock);
public:
explicit MergeTreePartsMover(MergeTreeData * data_)

View File

@ -195,11 +195,18 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
}
auto finalizer = std::make_unique<Finalizer::Impl>(*writer, new_part, files_to_remove_after_sync, sync);
auto current_time = time(nullptr);
if (!new_part->min_time_of_data_insert.has_value() && !new_part->max_time_of_data_insert.has_value())
{
new_part->min_time_of_data_insert = current_time;
new_part->max_time_of_data_insert = current_time;
}
new_part->modification_time = current_time;
if (new_part->isStoredOnDisk())
finalizer->written_files = finalizePartOnDisk(new_part, checksums);
new_part->rows_count = rows_count;
new_part->modification_time = time(nullptr);
new_part->setIndex(writer->releaseIndexColumns());
new_part->checksums = checksums;
new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk());
@ -308,6 +315,20 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
written_files.emplace_back(std::move(out));
}
{
auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::MIN_MAX_TIME_OF_DATA_INSERT_FILE, 4096, write_settings);
HashingWriteBuffer out_hashing(*out);
DB::writeIntText(*new_part->min_time_of_data_insert, out_hashing);
DB::writeText(" ", out_hashing);
DB::writeIntText(*new_part->max_time_of_data_insert, out_hashing);
out_hashing.finalize();
checksums.files[IMergeTreeDataPart::MIN_MAX_TIME_OF_DATA_INSERT_FILE].file_size = out_hashing.count();
checksums.files[IMergeTreeDataPart::MIN_MAX_TIME_OF_DATA_INSERT_FILE].file_hash = out_hashing.getHash();
out->preFinalize();
written_files.emplace_back(std::move(out));
}
{
/// Write a file with a description of columns.
auto out = new_part->getDataPartStorage().writeFile("columns.txt", 4096, write_settings);

View File

@ -2236,6 +2236,9 @@ bool MutateTask::prepare()
ctx->new_data_part->is_temp = true;
ctx->new_data_part->ttl_infos = ctx->source_part->ttl_infos;
ctx->new_data_part->min_time_of_data_insert = ctx->future_part->parts.front()->getMinTimeOfDataInsertion();
ctx->new_data_part->max_time_of_data_insert = ctx->future_part->parts.front()->getMaxTimeOfDataInsertion();
/// It shouldn't be changed by mutation.
ctx->new_data_part->index_granularity_info = ctx->source_part->index_granularity_info;

View File

@ -62,6 +62,8 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
{"secondary_indices_uncompressed_bytes", std::make_shared<DataTypeUInt64>(), "Total size of uncompressed data for secondary indices in the data part. All the auxiliary files (for example, files with marks) are not included."},
{"secondary_indices_marks_bytes", std::make_shared<DataTypeUInt64>(), "The size of the file with marks for secondary indices."},
{"modification_time", std::make_shared<DataTypeDateTime>(), "The time the directory with the data part was modified. This usually corresponds to the time of data part creation."},
{"min_time_of_data_insert", std::make_shared<DataTypeDateTime>(), "min_time_of_data_insert."},
{"max_time_of_data_insert", std::make_shared<DataTypeDateTime>(), "max_time_of_data_insert."},
{"remove_time", std::make_shared<DataTypeDateTime>(), "The time when the data part became inactive."},
{"refcount", std::make_shared<DataTypeUInt32>(), "The number of places where the data part is used. A value greater than 2 indicates that the data part is used in queries or merges."},
{"min_date", std::make_shared<DataTypeDate>(), "The minimum value of the date key in the data part."},
@ -181,6 +183,11 @@ void StorageSystemParts::processNextStorage(
if (columns_mask[src_index++])
columns[res_index++]->insert(static_cast<UInt64>(part->modification_time));
if (columns_mask[src_index++])
columns[res_index++]->insert(static_cast<UInt64>(part->getMinTimeOfDataInsertion()));
if (columns_mask[src_index++])
columns[res_index++]->insert(static_cast<UInt64>(part->getMaxTimeOfDataInsertion()));
if (columns_mask[src_index++])
{
time_t remove_time = part->remove_time.load(std::memory_order_relaxed);

View File

@ -44,6 +44,7 @@ StorageSystemStoragePolicies::StorageSystemStoragePolicies(const StorageID & tab
{"volume_type", std::make_shared<DataTypeEnum8>(getTypeEnumValues<VolumeType>()), "The type of the volume - JBOD or a single disk."},
{"max_data_part_size", std::make_shared<DataTypeUInt64>(), "the maximum size of a part that can be stored on any of the volumes disks."},
{"move_factor", std::make_shared<DataTypeFloat32>(), "When the amount of available space gets lower than this factor, data automatically starts to move on the next volume if any (by default, 0.1)."},
{"move_policy", std::make_shared<DataTypeEnum8>(getTypeEnumValues<IStoragePolicy::MovePolicy>())},
{"prefer_not_to_merge", std::make_shared<DataTypeUInt8>(), "You should not use this setting. Disables merging of data parts on this volume (this is harmful and leads to performance degradation)."},
{"perform_ttl_move_on_insert", std::make_shared<DataTypeUInt8>(), "Disables TTL move on data part INSERT. By default (if enabled) if we insert a data part that already expired by the TTL move rule it immediately goes to a volume/disk declared in move rule."},
{"load_balancing", std::make_shared<DataTypeEnum8>(getTypeEnumValues<VolumeLoadBalancing>()), "Policy for disk balancing, `round_robin` or `least_used`."}
@ -70,6 +71,7 @@ Pipe StorageSystemStoragePolicies::read(
MutableColumnPtr col_volume_type = ColumnInt8::create();
MutableColumnPtr col_max_part_size = ColumnUInt64::create();
MutableColumnPtr col_move_factor = ColumnFloat32::create();
MutableColumnPtr col_move_policy = ColumnInt8::create();
MutableColumnPtr col_prefer_not_to_merge = ColumnUInt8::create();
MutableColumnPtr col_perform_ttl_move_on_insert = ColumnUInt8::create();
MutableColumnPtr col_load_balancing = ColumnInt8::create();
@ -90,6 +92,7 @@ Pipe StorageSystemStoragePolicies::read(
col_volume_type->insert(static_cast<Int8>(volumes[i]->getType()));
col_max_part_size->insert(volumes[i]->max_data_part_size);
col_move_factor->insert(policy_ptr->getMoveFactor());
col_move_policy->insert(static_cast<Int8>(policy_ptr->getMovePolicy()));
col_prefer_not_to_merge->insert(volumes[i]->areMergesAvoided() ? 1 : 0);
col_perform_ttl_move_on_insert->insert(volumes[i]->perform_ttl_move_on_insert);
col_load_balancing->insert(static_cast<Int8>(volumes[i]->load_balancing));
@ -104,6 +107,7 @@ Pipe StorageSystemStoragePolicies::read(
res_columns.emplace_back(std::move(col_volume_type));
res_columns.emplace_back(std::move(col_max_part_size));
res_columns.emplace_back(std::move(col_move_factor));
res_columns.emplace_back(std::move(col_move_policy));
res_columns.emplace_back(std::move(col_prefer_not_to_merge));
res_columns.emplace_back(std::move(col_perform_ttl_move_on_insert));
res_columns.emplace_back(std::move(col_load_balancing));

View File

@ -0,0 +1,16 @@
<clickhouse>
<remote_servers>
<cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,5 @@
<clickhouse>
<macros>
<cluster>cluster</cluster>
</macros>
</clickhouse>

View File

@ -0,0 +1,200 @@
import pytest
import logging
import time
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
with_zookeeper=True,
main_configs=["configs/cluster.xml", "configs/macro.xml"],
macros={"replica": "node1"},
)
node2 = cluster.add_instance(
"node2",
with_zookeeper=True,
main_configs=["configs/cluster.xml", "configs/macro.xml"],
macros={"replica": "node2"},
)
node_old = cluster.add_instance(
"node_with_old_ch",
image="clickhouse/clickhouse-server",
tag="24.3",
with_installed_binary=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_max_min_time_of_data_insert(node, db_name, table_name):
return (
node.query(
f"SELECT min(min_time_of_data_insert), max(max_time_of_data_insert) FROM system.parts WHERE database='{db_name}' AND table='{table_name}' AND active=1"
)
.strip()
.split("\t")
)
def test_merge(started_cluster):
db_name = "test_db"
table_name = "test_table"
node = node1
node.query(f"DROP DATABASE IF EXISTS {db_name}")
node.query(f"CREATE DATABASE {db_name}")
node.query(
f"CREATE TABLE {db_name}.{table_name} (a int) ENGINE = MergeTree() ORDER BY a"
)
node.query(f"INSERT INTO {db_name}.{table_name} SELECT 1")
time.sleep(1)
node.query(f"INSERT INTO {db_name}.{table_name} SELECT 2")
[min_time, max_time] = get_max_min_time_of_data_insert(node, db_name, table_name)
print(min_time, max_time)
assert min_time != max_time
node.query(f"OPTIMIZE TABLE {db_name}.{table_name}")
[min_time_new, max_time_new] = get_max_min_time_of_data_insert(
node, db_name, table_name
)
assert min_time_new == min_time and max_time_new == max_time
def test_mutations(started_cluster):
db_name = "test_db"
table_name = "test_table"
node = node1
node.query(f"DROP DATABASE IF EXISTS {db_name}")
node.query(f"CREATE DATABASE {db_name}")
node.query(
f"CREATE TABLE {db_name}.{table_name} (a int, b int) ENGINE = MergeTree() ORDER BY a"
)
node.query(f"INSERT INTO {db_name}.{table_name} SELECT 1, 1")
[min_time, max_time] = get_max_min_time_of_data_insert(node, db_name, table_name)
print(min_time, max_time)
assert min_time == max_time
time.sleep(1)
node.query(f"ALTER TABLE {db_name}.{table_name} UPDATE b = 2 WHERE b = 1")
[min_time_new, max_time_new] = get_max_min_time_of_data_insert(
node, db_name, table_name
)
assert min_time == min_time_new and max_time == max_time_new
def test_move_partition(started_cluster):
db_name = "test_db"
table_name1 = "test_table1"
table_name2 = "test_table2"
node = node1
node.query(f"DROP DATABASE IF EXISTS {db_name}")
node.query(f"CREATE DATABASE {db_name}")
node.query(
f"CREATE TABLE {db_name}.{table_name1} (a int, b int) ENGINE = MergeTree() ORDER BY a PARTITION BY a"
)
node.query(
f"CREATE TABLE {db_name}.{table_name2} (a int, b int) ENGINE = MergeTree() ORDER BY a PARTITION BY a"
)
node.query(f"INSERT INTO {db_name}.{table_name1} SELECT 1, 1")
[min_time, max_time] = get_max_min_time_of_data_insert(node, db_name, table_name1)
partition_name = (
node.query(
f"SELECT partition FROM system.parts where database='{db_name}' AND table='{table_name1}' AND active=1"
)
.strip()
.split("\t")
)[0]
assert min_time == max_time
time.sleep(1)
node.query(
f"ALTER TABLE {db_name}.{table_name1} MOVE PARTITION '{partition_name}' TO TABLE {db_name}.{table_name2}"
)
[min_time_new, max_time_new] = get_max_min_time_of_data_insert(
node, db_name, table_name2
)
assert min_time == min_time_new and max_time == max_time_new
def test_replicated_fetch(started_cluster):
db_name = "test_db"
table_name = "test_table"
node1.query(f"DROP DATABASE IF EXISTS {db_name} ON CLUSTER '{{cluster}}'")
node1.query(f"CREATE DATABASE {db_name} ON CLUSTER '{{cluster}}'")
node1.query(
f"CREATE TABLE {db_name}.{table_name} ON CLUSTER '{{cluster}}' (a int) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_table/replicated', '{{replica}}') ORDER BY a"
)
node1.query(f"INSERT INTO {db_name}.{table_name} SELECT 1")
[min_time_node1, max_time_node1] = get_max_min_time_of_data_insert(
node1, db_name, table_name
)
[min_time_node2, max_time_node2] = get_max_min_time_of_data_insert(
node2, db_name, table_name
)
assert min_time_node1 == min_time_node2 and max_time_node1 == max_time_node2
node2.query(f"INSERT INTO {db_name}.{table_name} SELECT 2")
node2.query(f"OPTIMIZE TABLE {db_name}.{table_name}")
[min_time_node1, max_time_node1] = get_max_min_time_of_data_insert(
node1, db_name, table_name
)
[min_time_node2, max_time_node2] = get_max_min_time_of_data_insert(
node2, db_name, table_name
)
assert min_time_node1 == min_time_node2 and max_time_node1 == max_time_node2
def test_version_compatibility(started_cluster):
db_name = "test_db"
table_name = "test_table"
node = node_old
node.query(f"DROP DATABASE IF EXISTS {db_name}")
node.query(f"CREATE DATABASE {db_name}")
node.query(
f"CREATE TABLE {db_name}.{table_name} (a int) ENGINE = MergeTree() ORDER BY a"
)
node.query(f"INSERT INTO {db_name}.{table_name} SELECT 1")
modification_time = (
node.query(
f"SELECT modification_time FROM system.parts WHERE database='{db_name}' AND table='{table_name}' AND active=1"
)
.strip()
.split("\t")
)[0]
node.restart_with_latest_version()
# For old parts modification time will be equal modification time.
[min_time_node, max_time_node] = get_max_min_time_of_data_insert(
node, db_name, table_name
)
assert min_time_node == modification_time and max_time_node == modification_time
node.query(f"INSERT INTO {db_name}.{table_name} SELECT 2")
node.restart_with_original_version()
assert node.query(f"SELECT count() FROM {db_name}.{table_name}") == "2\n"

View File

@ -0,0 +1,57 @@
<clickhouse>
<storage_configuration>
<disks>
<default>
</default>
<hot>
<path>/hot/</path>
</hot>
<warm>
<path>/warm/</path>
</warm>
<cold>
<path>/cold/</path>
</cold>
</disks>
<policies>
<jbod_by_size_policy>
<volumes>
<hot>
<disk>hot</disk>
</hot>
<warm>
<disk>warm</disk>
</warm>
<cold>
<disk>cold</disk>
</cold>
</volumes>
<move_factor>0.5</move_factor>
<move_policy>by_part_size</move_policy>
</jbod_by_size_policy>
<jbod_time_policy>
<volumes>
<hot>
<disk>hot</disk>
</hot>
<warm>
<disk>warm</disk>
</warm>
<cold>
<disk>cold</disk>
</cold>
</volumes>
<move_factor>0.5</move_factor>
<move_policy>by_insert_data_time</move_policy>
</jbod_time_policy>
</policies>
</storage_configuration>
<background_move_pool_size>1</background_move_pool_size>
<background_move_processing_pool_thread_sleep_seconds>0.1</background_move_processing_pool_thread_sleep_seconds>
<background_move_processing_pool_task_sleep_seconds_when_no_work_min>0.1</background_move_processing_pool_task_sleep_seconds_when_no_work_min>
<background_move_processing_pool_task_sleep_seconds_when_no_work_max>0.1</background_move_processing_pool_task_sleep_seconds_when_no_work_max>
</clickhouse>

View File

@ -0,0 +1,17 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,150 @@
import random
import time
from multiprocessing.dummy import Pool
import datetime
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
hot_volume_size_mb = 5
warm_volume_size_mb = 10
cold_volume_size_mb = 15
mb_in_bytes = 1024 * 1024
node_options = dict(
with_zookeeper=True,
main_configs=[
"configs/remote_servers.xml",
"configs/config.d/storage_configuration.xml",
],
tmpfs=[
f"/hot:size={hot_volume_size_mb}M",
f"/warm:size={warm_volume_size_mb}M",
f"/cold:size={cold_volume_size_mb}M",
],
)
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", macros={"shard": 0, "replica": 1}, **node_options)
node2 = cluster.add_instance("node2", macros={"shard": 0, "replica": 2}, **node_options)
nodes = [node1, node2]
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
finally:
cluster.shutdown()
def wait_until_moves_finished(node, requred_part_count, disk):
retry_count = 20
sleep_time = 1
for _ in range(retry_count):
try:
parts_on_disk = int(
node.query(f"SELECT count() FROM system.parts WHERE disk_name='{disk}'")
)
if parts_on_disk <= requred_part_count:
return True
except Exception:
pass
time.sleep(sleep_time)
return False
def check_by_insert_time_parts_disks(node, database):
res = node.query(
f"SELECT disk_name, toUnixTimestamp(min(min_time_of_data_insert)) AS min_time, toUnixTimestamp(max(min_time_of_data_insert)) AS max_time FROM system.parts WHERE database ='{database}' GROUP BY disk_name"
)
times_of_parts = {}
for line in res.splitlines():
[disk_name, min_time, max_time] = line.split("\t")
times_of_parts[disk_name] = (int(min_time), int(max_time))
# min_time at i disks must be >= max_time at j disk. Where i > j.
assert (
times_of_parts["cold"][0] <= times_of_parts["hot"][1]
and times_of_parts["cold"][0] <= times_of_parts["warm"][1]
)
assert times_of_parts["warm"][0] <= times_of_parts["hot"][1]
@pytest.mark.parametrize(
"storage_policy,additional_check",
[
("jbod_by_size_policy", None),
("jbod_time_policy", check_by_insert_time_parts_disks),
],
)
def test_simple_moves(started_cluster, storage_policy, additional_check):
node = node1
node.query("DROP DATABASE IF EXISTS test_db SYNC;")
node.query("CREATE DATABASE test_db;")
node.query(
f"CREATE TABLE test_db.table (a Int, b String) ENGINE=MergeTree() ORDER BY a SETTINGS storage_policy='{storage_policy}'"
)
node.query(f"SYSTEM STOP MERGES test_db.table;")
for _ in range(15):
node.query(
f"INSERT INTO test_db.table SELECT rand()%10, randomString({mb_in_bytes});"
)
time_last_data_insert = int(time.time())
assert wait_until_moves_finished(node, hot_volume_size_mb // 2, "hot")
assert wait_until_moves_finished(node, warm_volume_size_mb // 2, "warm")
# Make sure that times of data inserts are unique
if int(time.time()) == time_last_data_insert:
time.sleep(1)
if additional_check:
additional_check(node, "test_db")
node.query(f"DROP DATABASE test_db SYNC;")
@pytest.mark.parametrize(
"storage_policy,additional_check",
[
("jbod_by_size_policy", None),
("jbod_time_policy", check_by_insert_time_parts_disks),
],
)
def test_moves_replicated(started_cluster, storage_policy, additional_check):
node1.query("DROP DATABASE IF EXISTS test_db ON CLUSTER 'test_cluster' SYNC;")
node1.query("CREATE DATABASE test_db ON CLUSTER 'test_cluster';")
# Here we need to block merges the execution and scheduling, otherwise parts will be in the `virtual` state
# and moves of theese parts will be blocked, until merge is completed.
node1.query(
f"""
CREATE TABLE test_db.table ON CLUSTER 'test_cluster' (a Int, b String) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{{uuid}}', '{{replica}}') ORDER BY a
SETTINGS storage_policy='{storage_policy}', max_replicated_merges_in_queue=0;
"""
)
node1.query(f"SYSTEM STOP MERGES ON CLUSTER 'test_cluster' test_db.table; ")
for _ in range(15):
node1.query(
f"INSERT INTO test_db.table SELECT rand()%10, randomString({mb_in_bytes});"
)
time_last_data_insert = int(time.time())
assert wait_until_moves_finished(node1, hot_volume_size_mb // 2, "hot")
assert wait_until_moves_finished(node2, hot_volume_size_mb // 2, "hot")
assert wait_until_moves_finished(node1, warm_volume_size_mb // 2, "warm")
assert wait_until_moves_finished(node2, warm_volume_size_mb // 2, "warm")
if time_last_data_insert == int(time.time()):
time.sleep(1)
if additional_check:
additional_check(node1, "test_db")
additional_check(node2, "test_db")
node1.query(f"DROP DATABASE test_db ON CLUSTER 'test_cluster' SYNC;")

View File

@ -101,6 +101,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -113,6 +114,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -125,6 +127,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -137,6 +140,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 1,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -149,6 +153,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -161,6 +166,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -173,6 +179,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "10485760",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -185,6 +192,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -197,6 +205,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.7,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -209,6 +218,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.7,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -221,6 +231,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "2097152",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -233,6 +244,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "20971520",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -245,6 +257,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -257,6 +270,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -269,6 +283,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "1024",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
@ -281,6 +296,7 @@ def test_system_tables(start_cluster):
"volume_type": "JBOD",
"max_data_part_size": "1024000000",
"move_factor": 0.1,
"move_policy": "BY_PART_SIZE",
"prefer_not_to_merge": 0,
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",