Add test and found bugs

This commit is contained in:
alesapin 2019-09-06 18:09:20 +03:00
parent ea8e543b1a
commit cfd753a1f3
8 changed files with 157 additions and 92 deletions

View File

@ -470,7 +470,7 @@ ReservationPtr StoragePolicy::reserve(UInt64 expected_size) const
}
ReservationPtr StoragePolicy::reserveOnMaxDiskWithoutReservation() const
ReservationPtr StoragePolicy::makeEmptyReservationOnLargestDisk() const
{
UInt64 max_space = 0;
DiskPtr max_disk;

View File

@ -278,7 +278,7 @@ public:
/// Reserves 0 bytes on disk with max available space
/// Do not use this function when it is possible to predict size!!!
ReservationPtr reserveOnMaxDiskWithoutReservation() const;
ReservationPtr makeEmptyReservationOnLargestDisk() const;
const Volumes & getVolumes() const { return volumes; }

View File

@ -233,8 +233,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
}
else
{
/// We don't know real size of part because sender server version is old
reservation = data.reserveOnMaxDiskWithoutReservation();
/// We don't know real size of part because sender server version is too old
reservation = data.makeEmptyReservationOnLargestDisk();
}
return downloadPart(part_name, replica_path, to_detached, tmp_prefix_, std::move(reservation), in);

View File

@ -1874,16 +1874,15 @@ MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
void MergeTreeData::PartsTemporaryRename::addPart(const String & old_name, const String & new_name)
{
old_and_new_names.push_back({old_name, new_name});
const auto disks = storage.getStoragePolicy()->getDisks();
for (const DiskSpace::DiskPtr & disk : disks)
const auto paths = storage.getDataPaths();
for (const auto & full_path : paths)
{
const auto full_path = storage.getFullPathOnDisk(disk);
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
{
String name = it.name();
if (name == old_name)
{
name_to_disk[old_name] = disk;
old_part_name_to_full_path[old_name] = full_path;
break;
}
}
@ -1900,7 +1899,7 @@ void MergeTreeData::PartsTemporaryRename::tryRenameAll()
const auto & names = old_and_new_names[i];
if (names.first.empty() || names.second.empty())
throw DB::Exception("Empty part name. Most likely it's a bug.", ErrorCodes::INCORRECT_FILE_NAME);
const auto full_path = storage.getFullPathOnDisk(name_to_disk[names.first]) + source_dir; /// old_name
const auto full_path = old_part_name_to_full_path[names.first] + source_dir; /// old_name
Poco::File(full_path + names.first).renameTo(full_path + names.second);
}
catch (...)
@ -1924,7 +1923,7 @@ MergeTreeData::PartsTemporaryRename::~PartsTemporaryRename()
try
{
const auto full_path = storage.getFullPathOnDisk(name_to_disk[names.first]) + source_dir; /// old_name
const auto full_path = old_part_name_to_full_path[names.first] + source_dir; /// old_name
Poco::File(full_path + names.second).renameTo(full_path + names.first);
}
catch (...)
@ -2943,11 +2942,11 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, const Cont
renamed_parts.tryRenameAll();
for (auto & names : renamed_parts.old_and_new_names)
for (auto & [old_name, new_name] : renamed_parts.old_and_new_names)
{
Poco::File(getFullPathOnDisk(renamed_parts.name_to_disk[names.first]) + "detached/" + names.second).remove(true);
LOG_DEBUG(log, "Dropped detached part " << names.first);
names.first.clear();
Poco::File(renamed_parts.old_part_name_to_full_path[old_name] + "detached/" + new_name).remove(true);
LOG_DEBUG(log, "Dropped detached part " << old_name);
old_name.clear();
}
}
@ -3402,7 +3401,10 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger()
{
/// Something went completely wrong
if (!data.currently_moving_parts.count(moving_part.part))
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "TERMINATING ON PART:" << moving_part.part->name);
std::terminate();
}
data.currently_moving_parts.erase(moving_part.part);
}
}
@ -3453,7 +3455,7 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::selectPartsForMove()
std::lock_guard moving_lock(moving_parts_mutex);
parts_mover.selectPartsForMove(parts_to_move, can_move);
parts_mover.selectPartsForMove(parts_to_move, can_move, moving_lock);
return CurrentlyMovingPartsTagger(std::move(parts_to_move), *this);
}

View File

@ -276,7 +276,7 @@ public:
const MergeTreeData & storage;
const String source_dir;
std::vector<std::pair<String, String>> old_and_new_names;
std::unordered_map<String, DiskSpace::DiskPtr> name_to_disk;
std::unordered_map<String, String> old_part_name_to_full_path;
bool renamed = false;
};
@ -655,8 +655,14 @@ public:
return storage_settings.get();
}
/// Get table path on disk
String getFullPathOnDisk(const DiskSpace::DiskPtr & disk) const;
/// Get disk for part. Looping through directories on FS because some parts maybe not in
/// active dataparts set (detached)
DiskSpace::DiskPtr getDiskForPart(const String & part_name, const String & relative_path = "") const;
/// Get full path for part. Uses getDiskForPart and returns the full path
String getFullPathForPart(const String & part_name, const String & relative_path = "") const;
Strings getDataPaths() const override;
@ -666,7 +672,7 @@ public:
/// Choose disk with max available free space
/// Reserves 0 bytes
DiskSpace::ReservationPtr reserveOnMaxDiskWithoutReservation() { return storage_policy->reserveOnMaxDiskWithoutReservation(); }
DiskSpace::ReservationPtr makeEmptyReservationOnLargestDisk() { return storage_policy->makeEmptyReservationOnLargestDisk(); }
MergeTreeDataFormatVersion format_version;
@ -733,7 +739,7 @@ public:
/// as result of merge or mutation.
DataParts currently_moving_parts;
/// Mutex for currenly_moving_parts
/// Mutex for currently_moving_parts
mutable std::mutex moving_parts_mutex;
protected:
@ -916,6 +922,7 @@ protected:
const DataPartsVector & source_parts,
const MergeListEntry * merge_entry);
/// If part is assigned to merge or mutation (possibly replicated)
virtual bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const = 0;
/// Moves part to specified space
@ -925,6 +932,7 @@ protected:
bool selectPartsAndMove();
private:
/// RAII Wrapper for atomic work with currently moving parts
struct CurrentlyMovingPartsTagger
{
MergeTreeMovingParts parts_to_move;
@ -935,9 +943,13 @@ private:
~CurrentlyMovingPartsTagger();
};
/// Move selected parts to corresponding volumes
bool moveParts(CurrentlyMovingPartsTagger && parts_to_move);
/// Select parts for move and disks for them. Used in background moving processes.
CurrentlyMovingPartsTagger selectPartsForMove();
/// Check selected parts for movements. Used ALTER ... MOVE queries/
CurrentlyMovingPartsTagger checkPartsForMove(const DataPartsVector & parts, DiskSpace::SpacePtr space);
};

View File

@ -67,11 +67,23 @@ public:
}
};
std::unordered_map<std::string, size_t> partsMovingFromDisksSize(MergeTreeData & data)
{
std::unordered_map<std::string, size_t> result;
for (const auto & moving_part : data.currently_moving_parts)
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "MOVING PART:" << moving_part->name);
result[moving_part->disk->getName()] += moving_part->bytes_on_disk;
}
return result;
}
}
bool MergeTreePartsMover::selectPartsForMove(
MergeTreeMovingParts & parts_to_move,
const AllowedMovingPredicate & can_move)
const AllowedMovingPredicate & can_move,
const std::lock_guard<std::mutex> & /* moving_parts_lock */)
{
MergeTreeData::DataPartsVector data_parts = data->getDataPartsVector();
@ -88,6 +100,9 @@ bool MergeTreePartsMover::selectPartsForMove(
return false;
}
auto parts_moving_from_disks = partsMovingFromDisksSize(*data);
if (parts_moving_from_disks.empty())
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "NOTHING IS MOVING");
/// Do not check last volume
for (size_t i = 0; i != volumes.size() - 1; ++i)
{
@ -95,11 +110,24 @@ bool MergeTreePartsMover::selectPartsForMove(
{
auto space_information = disk->getSpaceInformation();
UInt64 total_space_with_factor = space_information.getTotalSpace() * policy->getMoveFactor();
UInt64 required_available_space = space_information.getTotalSpace() * policy->getMoveFactor();
/// Do not take into account reserved space
if (total_space_with_factor > space_information.getAvailableSpace())
need_to_move.emplace(disk, total_space_with_factor - space_information.getAvailableSpace());
size_t future_available_space = 0;
if (parts_moving_from_disks.count(disk->getName()))
future_available_space = parts_moving_from_disks[disk->getName()];
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "Disk:" << disk->getName());
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "Total space:" << disk->getTotalSpace());
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "UNRESERVED SPACE:" << disk->getUnreservedSpace());
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "Available space:" << space_information.getAvailableSpace());
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "Required available space:" << required_available_space);
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "Future space:" << future_available_space);
if ( required_available_space > space_information.getAvailableSpace() + future_available_space)
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "Need to move something from disk:" << disk->getName());
need_to_move.emplace(disk, required_available_space - space_information.getAvailableSpace());
}
}
}
@ -130,6 +158,7 @@ bool MergeTreePartsMover::selectPartsForMove(
/// But it can be possible to move data from other disks
break;
}
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "Moving part " << part->name << " to disk " << reservation->getDisk()->getName());
parts_to_move.emplace_back(part, std::move(reservation));
}
}

View File

@ -45,7 +45,8 @@ public:
bool selectPartsForMove(
MergeTreeMovingParts & parts_to_move,
const AllowedMovingPredicate & can_move);
const AllowedMovingPredicate & can_move,
const std::lock_guard<std::mutex> & moving_parts_lock);
std::shared_ptr<const MergeTreeDataPart> clonePart(const MergeTreeMoveEntry & moving_part) const;

View File

@ -287,7 +287,8 @@ def get_paths_for_partition_from_part_log(node, table, partition_id):
@pytest.mark.parametrize("name,engine", [
("altering_mt","MergeTree()"),
("altering_replicated_mt","ReplicatedMergeTree('/clickhouse/altering_replicated_mt', '1')",),
#("altering_replicated_mt","ReplicatedMergeTree('/clickhouse/altering_replicated_mt', '1')",),
# SYSTEM STOP MERGES doesn't disable merges assignments
])
def test_alter_move(start_cluster, name, engine):
try:
@ -520,93 +521,113 @@ def test_mutate_to_another_disk(start_cluster, name, engine):
finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
@pytest.mark.parametrize("name,engine", [
("alter_modifying_mt","MergeTree()"),
("replicated_alter_modifying_mt","ReplicatedMergeTree('/clickhouse/replicated_alter_modifying_mt', '1')",),
])
def test_concurrent_alter_modify(start_cluster, name, engine):
try:
node1.query("""
CREATE TABLE {name} (
EventDate Date,
number UInt64
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY toYYYYMM(EventDate)
SETTINGS storage_policy_name='jbods_with_external'
""".format(name=name, engine=engine))
def insert(num):
for i in range(num):
day = random.randint(11, 30)
value = random.randint(1, 1000000)
month = '0' + str(random.choice([3, 4]))
node1.query("INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(name, m=month, d=day, v=value))
'''
## Test stand for multiple disks feature
def alter_move(num):
for i in range(num):
produce_alter_move(node1, name)
Currently for manual tests, can be easily scripted to be the part of integration tests.
def alter_modify(num):
for i in range(num):
column_type = random.choice(["UInt64", "String"])
node1.query("ALTER TABLE {} MODIFY COLUMN number {}".format(name, column_type))
To run you need to have docker & docker-compose.
insert(100)
```
(Check makefile)
make run
make ch1_shell
> clickhouse-client
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "100\n"
make logs # Ctrl+C
make cleup
```
p = Pool(50)
tasks = []
for i in range(5):
tasks.append(p.apply_async(alter_move, (100,)))
tasks.append(p.apply_async(alter_modify, (100,)))
### basic
for task in tasks:
task.get(timeout=60)
* allows to configure multiple disks & folumes & shemas
* clickhouse check that all disks are write-accessible
* clickhouse can create a table with provided storagepolicy
assert node1.query("SELECT 1") == "1\n"
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "100\n"
### one volume-one disk custom storagepolicy
finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
* clickhouse puts data to correct folder when storagepolicy is used
* clickhouse can do merges / detach / attach / freeze on that folder
def test_simple_replication_and_moves(start_cluster):
try:
for i, node in enumerate([node1, node2]):
node.query("""
CREATE TABLE replicated_table_for_moves (
s1 String
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_moves', '{}')
ORDER BY tuple()
SETTINGS storage_policy_name='moving_jbod_with_external', old_parts_lifetime=5
""".format(i + 1))
### one volume-multiple disks storagepolicy (JBOD scenario)
def insert(num):
for i in range(num):
node = random.choice([node1, node2])
data = [] # 1MB in total
for i in range(2):
data.append(get_random_string(512 * 1024)) # 500KB value
node.query("INSERT INTO replicated_table_for_moves VALUES {}".format(','.join(["('" + x + "')" for x in data])))
* clickhouse uses round-robin to place new parts
* clickhouse can do merges / detach / attach / freeze on that folder
def optimize(num):
for i in range(num):
node = random.choice([node1, node2])
node.query("OPTIMIZE TABLE replicated_table_for_moves FINAL")
### two volumes-one disk per volume (fast expensive / slow cheap storage)
p = Pool(50)
tasks = []
tasks.append(p.apply_async(insert, (20,)))
tasks.append(p.apply_async(optimize, (20,)))
* clickhouse uses round-robin to place new parts
* clickhouse can do merges / detach / attach / freeze on that folder
* clickhouse put parts to different volumes depending on part size
for task in tasks:
task.get(timeout=60)
### use 'default' storagepolicy for tables created without storagepolicy provided.
node1.query("SYSTEM SYNC REPLICA replicated_table_for_moves", timeout=5)
node2.query("SYSTEM SYNC REPLICA replicated_table_for_moves", timeout=5)
assert node1.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"
assert node2.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"
# ReplicatedMergeTree
data = [] # 1MB in total
for i in range(2):
data.append(get_random_string(512 * 1024)) # 500KB value
....
time.sleep(5) # wait until old parts will be deleted
For all above:
clickhouse respect free space limitation setting.
ClickHouse writes important disk-related information to logs.
node1.query("INSERT INTO replicated_table_for_moves VALUES {}".format(','.join(["('" + x + "')" for x in data])))
node2.query("INSERT INTO replicated_table_for_moves VALUES {}".format(','.join(["('" + x + "')" for x in data])))
## Queries
time.sleep(3) # nothing was moved
```
CREATE TABLE table_with_storage_policy_default (id UInt64) Engine=MergeTree() ORDER BY (id);
disks1 = get_used_disks_for_table(node1, "replicated_table_for_moves")
disks2 = get_used_disks_for_table(node2, "replicated_table_for_moves")
select name, data_paths, storage_policy from system.tables where name='table_with_storage_policy_default';
"table_with_storage_policy_default","['/mainstorage/default/table_with_storage_policy_default/']","default"
assert set(disks1) == set(["jbod1", "external"])
assert set(disks2) == set(["jbod1", "external"])
finally:
for node in [node1, node2]:
node.query("DROP TABLE IF EXISTS replicated_table_for_moves")
INSERT INTO table_with_storage_policy_default SELECT rand64() FROM numbers(100);
CREATE TABLE table_with_storage_policy_default_explicit (id UInt64) Engine=MergeTree() ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='default';
CREATE TABLE table_with_storage_policy_default_disk_with_external (id UInt64) Engine=MergeTree() ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='default_disk_with_external';
CREATE TABLE table_with_storage_policy_jbod_with_external (id UInt64) Engine=MergeTree() ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='jbods_with_external';
CREATE TABLE replicated_table_with_storage_policy_default (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id);
CREATE TABLE replicated_table_with_storage_policy_default_explicit (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='default';
CREATE TABLE replicated_table_with_storage_policy_default_disk_with_external (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='default_disk_with_external';
CREATE TABLE replicated_table_with_storage_policy_jbod_with_external (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='jbods_with_external';
```
## Extra acceptance criterias
* hardlinks problems. Thouse stetements should be able to work properly (or give a proper feedback) on multidisk scenarios
* ALTER TABLE ... UPDATE
* ALTER TABLE ... TABLE
* ALTER TABLE ... MODIFY COLUMN ...
* ALTER TABLE ... CLEAR COLUMN
* ALTER TABLE ... REPLACE PARTITION ...
* Maintainance - system tables show proper values:
* system.parts
* system.tables
* system.part_log (target disk?)
* New system table
* system.volumes
* system.disks
* system.storagepolicys
* chown / create needed disk folders in docker
'''
#def test_replica_download_to_appropriate_disk(start_cluster):