This commit is contained in:
Alexey Milovidov 2016-01-29 05:24:50 +03:00
commit 061306d04b
14 changed files with 426 additions and 521 deletions

View File

@ -565,7 +565,7 @@ public:
/// Для перешардирования.
using MutableDataParts = std::set<MutableDataPartPtr, DataPartPtrLess>;
using PerShardDataParts = std::unordered_map<size_t, MutableDataParts>;
using PerShardDataParts = std::unordered_map<size_t, MutableDataPartPtr>;
/// Некоторые операции над множеством кусков могут возвращать такой объект.
/// Если не был вызван commit или rollback, деструктор откатывает операцию.

View File

@ -40,19 +40,22 @@ public:
bool only_small,
const AllowedMergingPredicate & can_merge);
/** Выбрать все куски принадлежащие одной партиции.
*/
MergeTreeData::DataPartsVector selectAllPartsFromPartition(DayNum_t partition);
/** Сливает куски.
* Если reservation != nullptr, то и дело уменьшает размер зарезервированного места
* приблизительно пропорционально количеству уже выписанных данных.
*/
MergeTreeData::MutableDataPartPtr mergeParts(
MergeTreeData::DataPartPtr mergeParts(
const MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeListEntry & merge_entry,
size_t aio_threshold, MergeTreeData::Transaction * out_transaction = nullptr,
DiskSpaceMonitor::Reservation * disk_reservation = nullptr);
/** Перешардирует заданную партицию.
*/
MergeTreeData::PerShardDataParts reshardPartition(
const ReshardingJob & job,
size_t aio_threshold,
DiskSpaceMonitor::Reservation * disk_reservation = nullptr);
/// Примерное количество места на диске, нужное для мерджа. С запасом.
static size_t estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts);
@ -63,6 +66,13 @@ public:
void uncancel() { cancelled = false; }
bool isCancelled() const { return cancelled; }
void abortIfRequested() const;
private:
/** Выбрать все куски принадлежащие одной партиции.
*/
MergeTreeData::DataPartsVector selectAllPartsFromPartition(DayNum_t partition);
private:
MergeTreeData & data;

View File

@ -42,12 +42,6 @@ public:
*/
ShardedBlocksWithDateIntervals shardBlock(const Block & block);
/** Все строки должны относиться к одному месяцу.
* temp_index - значение left и right для нового куска. Можно будет изменить при переименовании.
* Возвращает временный кусок с именем, начинающимся с tmp_.
*/
MergeTreeData::MutableDataPartPtr writeTempPart(ShardedBlockWithDateInterval & sharded_block_with_dates, Int64 temp_index);
private:
std::vector<IColumn::Filter> createFilters(Block block);

View File

@ -282,6 +282,11 @@ public:
}
}
std::string getPartPath() const
{
return part_path;
}
/// Если данные заранее отсортированы.
void write(const Block & block) override
{

View File

@ -37,7 +37,7 @@ public:
Client(const Client &) = delete;
Client & operator=(const Client &) = delete;
bool send(const InterserverIOEndpointLocation & to_location, const InterserverIOEndpointLocation & from_location,
const std::vector<std::string> & parts, size_t shard_no);
const std::string & part, size_t shard_no);
void cancel() { is_cancelled = true; }
private:

View File

@ -498,35 +498,4 @@ private:
PartitionToMergeLock partition_to_merge_lock;
};
/** Рекурсивная блокировка, которая защищает заданную партицию от задачи слияния.
*/
class ScopedPartitionMergeLock final
{
public:
ScopedPartitionMergeLock(StorageReplicatedMergeTree & storage_, const std::string & partition_name_)
: storage(storage_), partition_name(partition_name_)
{
fake_part_name = storage.acquirePartitionMergeLock(partition_name);
}
ScopedPartitionMergeLock(const ScopedPartitionMergeLock &) = delete;
ScopedPartitionMergeLock & operator=(const ScopedPartitionMergeLock &) = delete;
/// Получить уникальное название блокировки.
std::string getId() const
{
return fake_part_name;
}
~ScopedPartitionMergeLock()
{
storage.releasePartitionMergeLock(partition_name);
}
private:
StorageReplicatedMergeTree & storage;
const std::string partition_name;
std::string fake_part_name;
};
}

View File

@ -323,6 +323,7 @@ namespace ErrorCodes
extern const int INVALID_SHARD_WEIGHT = 317;
extern const int INVALID_CONFIG_PARAMETER = 318;
extern const int UNKNOWN_STATUS_OF_INSERT = 319;
extern const int DUPLICATE_SHARD_PATHS = 320;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -826,7 +826,7 @@ void Context::setReshardingWorker(std::shared_ptr<ReshardingWorker> resharding_w
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (shared->resharding_worker)
throw Exception("Resharding background thread has already been set.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Resharding background thread has already been initialized.", ErrorCodes::LOGICAL_ERROR);
shared->resharding_worker = resharding_worker;
}
@ -834,7 +834,8 @@ ReshardingWorker & Context::getReshardingWorker()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->resharding_worker)
throw Exception("Resharding background thread not set.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Resharding background thread not initialized: resharding missing in configuration file.",
ErrorCodes::LOGICAL_ERROR);
return *shared->resharding_worker;
}

View File

@ -1037,15 +1037,12 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
MergeTreeData::DataPartPtr MergeTreeData::getShardedPartIfExists(const String & part_name, size_t shard_no)
{
MutableDataPartPtr tmp_part(new DataPart(*this));
ActiveDataPartSet::parsePartName(part_name, *tmp_part);
const MutableDataPartPtr & part_from_shard = per_shard_data_parts.at(shard_no);
const MutableDataParts & sharded_parts = per_shard_data_parts.at(shard_no);
MutableDataParts::const_iterator it = sharded_parts.lower_bound(tmp_part);
if ((it != sharded_parts.end()) && ((*it)->name == part_name))
return *it;
return nullptr;
if (part_from_shard->name == part_name)
return part_from_shard;
else
return nullptr;
}
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & relative_path)

View File

@ -3,6 +3,7 @@
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <DB/Storages/MergeTree/MergeList.h>
#include <DB/Storages/MergeTree/MergeTreeSharder.h>
#include <DB/Storages/MergeTree/ReshardingJob.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
@ -11,7 +12,7 @@
#include <DB/DataStreams/AggregatingSortedBlockInputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/ConcatBlockInputStream.h>
#include <DB/Common/Increment.h>
namespace DB
{
@ -21,6 +22,27 @@ namespace ErrorCodes
extern const int ABORTED;
}
namespace
{
std::string createMergedPartName(const MergeTreeData::DataPartsVector & parts)
{
DayNum_t left_date = DayNum_t(std::numeric_limits<UInt16>::max());
DayNum_t right_date = DayNum_t(std::numeric_limits<UInt16>::min());
UInt32 level = 0;
for (const MergeTreeData::DataPartPtr & part : parts)
{
level = std::max(level, part->level);
left_date = std::min(left_date, part->left_date);
right_date = std::max(right_date, part->right_date);
}
return ActiveDataPartSet::getPartName(left_date, right_date, parts.front()->left, parts.back()->right, level + 1);
}
}
/// Не будем соглашаться мерджить куски, если места на диске менее чем во столько раз больше суммарного размера кусков.
static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 1.6;
@ -299,39 +321,16 @@ MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition(
}
/// parts должны быть отсортированы.
MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergeParts(
MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
const MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry,
size_t aio_threshold, MergeTreeData::Transaction * out_transaction,
DiskSpaceMonitor::Reservation * disk_reservation)
{
bool is_sharded = parts[0]->is_sharded;
for (size_t i = 1; i < parts.size(); ++i)
{
if (parts[i]->is_sharded != is_sharded)
throw Exception("Inconsistent set of parts provided for merging", ErrorCodes::LOGICAL_ERROR);
}
size_t shard_no = 0;
if (is_sharded)
{
shard_no = parts[0]->shard_no;
for (size_t i = 1; i < parts.size(); ++i)
{
if (parts[i]->shard_no != shard_no)
throw Exception("Inconsistent set of parts provided for merging", ErrorCodes::LOGICAL_ERROR);
}
}
merge_entry->num_parts = parts.size();
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name << " into " << merged_name);
String merged_dir;
if (is_sharded)
merged_dir = data.getFullPath() + "reshard/" + toString(shard_no) + merged_name;
else
merged_dir = data.getFullPath() + merged_name;
String merged_dir = data.getFullPath() + merged_name;
if (Poco::File(merged_dir).exists())
throw Exception("Directory " + merged_dir + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
@ -375,12 +374,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergeParts(
{
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
String part_path;
if (is_sharded)
part_path = data.getFullPath() + "reshard/" + toString(shard_no) + "/" + parts[i]->name + '/';
else
part_path = data.getFullPath() + parts[i]->name + '/';
String part_path = data.getFullPath() + parts[i]->name + '/';
auto input = std::make_unique<MergeTreeBlockInputStream>(
part_path, DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data,
parts[i], ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
@ -436,12 +430,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergeParts(
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR);
}
String new_part_tmp_path;
if (is_sharded)
new_part_tmp_path = data.getFullPath() + "reshard/" + toString(shard_no) + "/tmp_" + merged_name + "/";
else
new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
auto compression_method = data.context.chooseCompressionMethod(
merge_entry->total_size_bytes_compressed,
@ -483,45 +472,41 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergeParts(
new_data_part->size = to.marksCount();
new_data_part->modification_time = time(0);
new_data_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(new_part_tmp_path);
new_data_part->is_sharded = is_sharded;
new_data_part->shard_no = shard_no;
new_data_part->is_sharded = false;
if (!is_sharded)
/// Переименовываем новый кусок, добавляем в набор и убираем исходные куски.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, nullptr, out_transaction);
if (new_data_part->name != merged_name)
throw Exception("Unexpected part name: " + new_data_part->name + " instead of " + merged_name, ErrorCodes::LOGICAL_ERROR);
/// Проверим, что удалились все исходные куски и только они.
if (replaced_parts.size() != parts.size())
{
/// Переименовываем новый кусок, добавляем в набор и убираем исходные куски.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, nullptr, out_transaction);
if (new_data_part->name != merged_name)
throw Exception("Unexpected part name: " + new_data_part->name + " instead of " + merged_name, ErrorCodes::LOGICAL_ERROR);
/// Проверим, что удалились все исходные куски и только они.
if (replaced_parts.size() != parts.size())
{
/** Это нормально, хотя такое бывает редко.
* Ситуация - было заменено 0 кусков вместо N может быть, например, в следующем случае:
* - у нас был кусок A, но не было куска B и C;
* - в очереди был мердж A, B -> AB, но его не делали, так как куска B нет;
* - в очереди был мердж AB, C -> ABC, но его не делали, так как куска AB и C нет;
* - мы выполнили задачу на скачивание куска B;
* - мы начали делать мердж A, B -> AB, так как все куски появились;
* - мы решили скачать с другой реплики кусок ABC, так как невозможно было сделать мердж AB, C -> ABC;
* - кусок ABC появился, при его добавлении, были удалены старые куски A, B, C;
* - мердж AB закончился. Добавился кусок AB. Но это устаревший кусок. В логе будет сообщение Obsolete part added,
* затем попадаем сюда.
* Ситуация - было заменено M > N кусков тоже нормальная.
*
* Хотя это должно предотвращаться проверкой в методе StorageReplicatedMergeTree::shouldExecuteLogEntry.
*/
LOG_WARNING(log, "Unexpected number of parts removed when adding " << new_data_part->name << ": " << replaced_parts.size()
<< " instead of " << parts.size());
}
else
{
for (size_t i = 0; i < parts.size(); ++i)
if (parts[i]->name != replaced_parts[i]->name)
throw Exception("Unexpected part removed when adding " + new_data_part->name + ": " + replaced_parts[i]->name
+ " instead of " + parts[i]->name, ErrorCodes::LOGICAL_ERROR);
}
/** Это нормально, хотя такое бывает редко.
* Ситуация - было заменено 0 кусков вместо N может быть, например, в следующем случае:
* - у нас был кусок A, но не было куска B и C;
* - в очереди был мердж A, B -> AB, но его не делали, так как куска B нет;
* - в очереди был мердж AB, C -> ABC, но его не делали, так как куска AB и C нет;
* - мы выполнили задачу на скачивание куска B;
* - мы начали делать мердж A, B -> AB, так как все куски появились;
* - мы решили скачать с другой реплики кусок ABC, так как невозможно было сделать мердж AB, C -> ABC;
* - кусок ABC появился, при его добавлении, были удалены старые куски A, B, C;
* - мердж AB закончился. Добавился кусок AB. Но это устаревший кусок. В логе будет сообщение Obsolete part added,
* затем попадаем сюда.
* Ситуация - было заменено M > N кусков тоже нормальная.
*
* Хотя это должно предотвращаться проверкой в методе StorageReplicatedMergeTree::shouldExecuteLogEntry.
*/
LOG_WARNING(log, "Unexpected number of parts removed when adding " << new_data_part->name << ": " << replaced_parts.size()
<< " instead of " << parts.size());
}
else
{
for (size_t i = 0; i < parts.size(); ++i)
if (parts[i]->name != replaced_parts[i]->name)
throw Exception("Unexpected part removed when adding " + new_data_part->name + ": " + replaced_parts[i]->name
+ " instead of " + parts[i]->name, ErrorCodes::LOGICAL_ERROR);
}
LOG_TRACE(log, "Merged " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name);
@ -529,6 +514,257 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergeParts(
return new_data_part;
}
MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
const ReshardingJob & job,
size_t aio_threshold, DiskSpaceMonitor::Reservation * disk_reservation)
{
/// Собрать все куски партиции.
DayNum_t month = MergeTreeData::getMonthFromName(job.partition);
MergeTreeData::DataPartsVector parts = selectAllPartsFromPartition(month);
/// Создать временное название папки.
std::string merged_name = createMergedPartName(parts);
MergeList::EntryPtr merge_entry_ptr = data.context.getMergeList().insert(job.database_name,
job.table_name, merged_name);
MergeList::Entry & merge_entry = *merge_entry_ptr;
merge_entry->num_parts = parts.size();
LOG_DEBUG(log, "Resharding " << parts.size() << " parts from " << parts.front()->name
<< " to " << parts.back()->name << " which span the partition " << job.partition);
/// Слияние всех кусков партиции.
NameSet union_columns_set;
for (const MergeTreeData::DataPartPtr & part : parts)
{
Poco::ScopedReadRWLock part_lock(part->columns_lock);
Names part_columns = part->columns.getNames();
union_columns_set.insert(part_columns.begin(), part_columns.end());
merge_entry->total_size_bytes_compressed += part->size_in_bytes;
merge_entry->total_size_marks += part->size;
}
NamesAndTypesList columns_list = data.getColumnsList();
NamesAndTypesList union_columns = columns_list.filter(union_columns_set);
Names union_column_names = union_columns.getNames();
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
if (aio_threshold > 0)
{
for (const MergeTreeData::DataPartPtr & part : parts)
part->accumulateColumnSizes(merged_column_to_size);
}
BlockInputStreams src_streams;
size_t sum_rows_approx = 0;
const auto rows_total = merge_entry->total_size_marks * data.index_granularity;
for (size_t i = 0; i < parts.size(); ++i)
{
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
String part_path = data.getFullPath() + parts[i]->name + '/';
auto input = std::make_unique<MergeTreeBlockInputStream>(
part_path, DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data,
parts[i], ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback([&merge_entry, rows_total] (const Progress & value)
{
const auto new_rows_read = __sync_add_and_fetch(&merge_entry->rows_read, value.rows);
merge_entry->progress = static_cast<Float64>(new_rows_read) / rows_total;
__sync_add_and_fetch(&merge_entry->bytes_read_uncompressed, value.bytes);
});
if (data.mode != MergeTreeData::Unsorted)
src_streams.push_back(new MaterializingBlockInputStream{
new ExpressionBlockInputStream(input.release(), data.getPrimaryExpression())});
else
src_streams.push_back(input.release());
sum_rows_approx += parts[i]->size * data.index_granularity;
}
/// Шардирование слитых блоков.
/// Для нумерации блоков.
SimpleIncrement increment(data.getMaxDataPartIndex());
/// Создать новый кусок для каждого шарда.
MergeTreeData::PerShardDataParts per_shard_data_parts;
per_shard_data_parts.reserve(job.paths.size());
for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no)
{
Int64 temp_index = increment.get();
MergeTreeData::MutableDataPartPtr data_part = std::make_shared<MergeTreeData::DataPart>(data);
data_part->name = "tmp_" + merged_name;
data_part->is_temp = true;
data_part->left_date = std::numeric_limits<UInt16>::max();
data_part->right_date = std::numeric_limits<UInt16>::min();
data_part->month = month;
data_part->left = temp_index;
data_part->right = temp_index;
data_part->level = 0;
per_shard_data_parts.emplace(shard_no, data_part);
}
/// Очень грубая оценка для размера сжатых данных каждой шардированной партиции.
/// На самом деле всё зависит от свойств выражения для шардирования.
UInt64 per_shard_size_bytes_compressed = merge_entry->total_size_bytes_compressed / static_cast<double>(job.paths.size());
auto compression_method = data.context.chooseCompressionMethod(
per_shard_size_bytes_compressed,
static_cast<double>(per_shard_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
using MergedBlockOutputStreamPtr = std::unique_ptr<MergedBlockOutputStream>;
using PerShardOutput = std::unordered_map<size_t, MergedBlockOutputStreamPtr>;
/// Создать для каждого шарда поток, который записывает соответствующие шардированные блоки.
PerShardOutput per_shard_output;
per_shard_output.reserve(job.paths.size());
for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no)
{
std::string new_part_tmp_path = data.getFullPath() + "reshard/" + toString(shard_no) + "/tmp_" + merged_name + "/";
Poco::File(new_part_tmp_path).createDirectories();
MergedBlockOutputStreamPtr output_stream;
output_stream = std::make_unique<MergedBlockOutputStream>(data, new_part_tmp_path, union_columns, compression_method, merged_column_to_size, aio_threshold);
per_shard_output.emplace(shard_no, std::move(output_stream));
}
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.
/// В слитом куске строки с одинаковым ключом должны идти в порядке возрастания идентификатора исходного куска,
/// то есть (примерного) возрастания времени вставки.
std::unique_ptr<IProfilingBlockInputStream> merged_stream;
switch (data.mode)
{
case MergeTreeData::Ordinary:
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Collapsing:
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Summing:
merged_stream = std::make_unique<SummingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Aggregating:
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Unsorted:
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
break;
default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR);
}
merged_stream->readPrefix();
for (auto & entry : per_shard_output)
{
MergedBlockOutputStreamPtr & output_stream = entry.second;
output_stream->writePrefix();
}
size_t rows_written = 0;
const size_t initial_reservation = disk_reservation ? disk_reservation->getSize() : 0;
MergeTreeSharder sharder(data, job);
Block block;
while (block = merged_stream->read())
{
abortIfRequested();
ShardedBlocksWithDateIntervals blocks = sharder.shardBlock(block);
for (ShardedBlockWithDateInterval & block_with_dates : blocks)
{
abortIfRequested();
size_t shard_no = block_with_dates.shard_no;
MergeTreeData::MutableDataPartPtr & data_part = per_shard_data_parts.at(shard_no);
MergedBlockOutputStreamPtr & output_stream = per_shard_output.at(shard_no);
rows_written += block_with_dates.block.rows();
output_stream->write(block_with_dates.block);
if (block_with_dates.min_date < data_part->left_date)
data_part->left_date = block_with_dates.min_date;
if (block_with_dates.max_date > data_part->right_date)
data_part->right_date = block_with_dates.max_date;
merge_entry->rows_written = merged_stream->getInfo().rows;
merge_entry->bytes_written_uncompressed = merged_stream->getInfo().bytes;
if (disk_reservation)
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation));
}
}
merged_stream->readSuffix();
/// Завершить инициализацию куски новых партиций.
for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no)
{
abortIfRequested();
MergedBlockOutputStreamPtr & output_stream = per_shard_output.at(shard_no);
if (0 == output_stream->marksCount())
{
/// В этот шард не попало никаких данных. Игнорируем.
LOG_WARNING(log, "No data in partition for shard " + job.paths[shard_no].first);
per_shard_data_parts.erase(shard_no);
continue;
}
MergeTreeData::MutableDataPartPtr & data_part = per_shard_data_parts.at(shard_no);
data_part->columns = union_columns;
data_part->checksums = output_stream->writeSuffixAndGetChecksums();
data_part->index.swap(output_stream->getIndex());
data_part->size = output_stream->marksCount();
data_part->modification_time = time(0);
data_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(output_stream->getPartPath());
data_part->is_sharded = true;
data_part->shard_no = shard_no;
}
/// Превратить куски новых партиций в постоянные куски.
for (auto & entry : per_shard_data_parts)
{
size_t shard_no = entry.first;
MergeTreeData::MutableDataPartPtr & part_from_shard = entry.second;
part_from_shard->is_temp = false;
std::string prefix = data.getFullPath() + "reshard/" + toString(shard_no) + "/";
std::string old_name = part_from_shard->name;
std::string new_name = ActiveDataPartSet::getPartName(part_from_shard->left_date,
part_from_shard->right_date, part_from_shard->left, part_from_shard->right, part_from_shard->level);
part_from_shard->name = new_name;
Poco::File(prefix + old_name).renameTo(prefix + new_name);
}
LOG_TRACE(log, "Resharded the partition " << job.partition);
return per_shard_data_parts;
}
size_t MergeTreeDataMerger::estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts)
{
size_t res = 0;
@ -538,4 +774,11 @@ size_t MergeTreeDataMerger::estimateDiskSpaceForMerge(const MergeTreeData::DataP
return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
}
void MergeTreeDataMerger::abortIfRequested() const
{
if (cancelled)
throw Exception("Cancelled partition resharding", ErrorCodes::ABORTED);
}
}

View File

@ -84,81 +84,6 @@ ShardedBlocksWithDateIntervals MergeTreeSharder::shardBlock(const Block & block)
return res;
}
MergeTreeData::MutableDataPartPtr MergeTreeSharder::writeTempPart(
ShardedBlockWithDateInterval & sharded_block_with_dates, Int64 temp_index)
{
Block & block = sharded_block_with_dates.block;
UInt16 min_date = sharded_block_with_dates.min_date;
UInt16 max_date = sharded_block_with_dates.max_date;
size_t shard_no = sharded_block_with_dates.shard_no;
const auto & date_lut = DateLUT::instance();
DayNum_t min_month = date_lut.toFirstDayNumOfMonth(DayNum_t(min_date));
DayNum_t max_month = date_lut.toFirstDayNumOfMonth(DayNum_t(max_date));
if (min_month != max_month)
throw Exception("Logical error: part spans more than one month.", ErrorCodes::LOGICAL_ERROR);
size_t part_size = (block.rows() + data.index_granularity - 1) / data.index_granularity;
String tmp_part_name = "tmp_" + ActiveDataPartSet::getPartName(
DayNum_t(min_date), DayNum_t(max_date),
temp_index, temp_index, 0);
String part_tmp_path = data.getFullPath() + "reshard/" + toString(shard_no) + "/" + tmp_part_name + "/";
Poco::File(part_tmp_path).createDirectories();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
new_data_part->name = tmp_part_name;
new_data_part->is_temp = true;
/// Если для сортировки надо вычислить некоторые столбцы - делаем это.
if (data.mode != MergeTreeData::Unsorted)
data.getPrimaryExpression()->execute(block);
SortDescription sort_descr = data.getSortDescription();
/// Сортируем.
IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm;
if (data.mode != MergeTreeData::Unsorted)
{
if (!isAlreadySorted(block, sort_descr))
{
stableGetPermutation(block, sort_descr, perm);
perm_ptr = &perm;
}
}
NamesAndTypesList columns = data.getColumnsList().filter(block.getColumnsList().getNames());
MergedBlockOutputStream out(data, part_tmp_path, columns, CompressionMethod::LZ4);
out.getIndex().reserve(part_size * sort_descr.size());
out.writePrefix();
out.writeWithPermutation(block, perm_ptr);
MergeTreeData::DataPart::Checksums checksums = out.writeSuffixAndGetChecksums();
new_data_part->left_date = DayNum_t(min_date);
new_data_part->right_date = DayNum_t(max_date);
new_data_part->left = temp_index;
new_data_part->right = temp_index;
new_data_part->level = 0;
new_data_part->size = part_size;
new_data_part->modification_time = std::time(0);
new_data_part->month = min_month;
new_data_part->columns = columns;
new_data_part->checksums = checksums;
new_data_part->index.swap(out.getIndex());
new_data_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(part_tmp_path);
new_data_part->is_sharded = true;
new_data_part->shard_no = sharded_block_with_dates.shard_no;
return new_data_part;
}
std::vector<IColumn::Filter> MergeTreeSharder::createFilters(Block block)
{
using create_filters_sig = std::vector<IColumn::Filter>(size_t, const IColumn *, size_t num_shards, const std::vector<size_t> & slots);

View File

@ -38,22 +38,6 @@ namespace ErrorCodes
namespace
{
std::string createMergedPartName(const MergeTreeData::DataPartsVector & parts)
{
DayNum_t left_date = DayNum_t(std::numeric_limits<UInt16>::max());
DayNum_t right_date = DayNum_t(std::numeric_limits<UInt16>::min());
UInt32 level = 0;
for (const MergeTreeData::DataPartPtr & part : parts)
{
level = std::max(level, part->level);
left_date = std::min(left_date, part->left_date);
right_date = std::max(right_date, part->right_date);
}
return ActiveDataPartSet::getPartName(left_date, right_date, parts.front()->left, parts.back()->right, level + 1);
}
class Arguments final
{
public:
@ -93,20 +77,10 @@ ReshardingWorker::ReshardingWorker(const Poco::Util::AbstractConfiguration & con
: context(context_), log(&Logger::get("ReshardingWorker"))
{
Arguments arguments(config, config_name);
auto zookeeper = context.getZooKeeper();
host_task_queue_path = "/clickhouse";
zookeeper->createIfNotExists(host_task_queue_path, "");
host_task_queue_path += "/" + arguments.getTaskQueuePath();
zookeeper->createIfNotExists(host_task_queue_path, "");
host_task_queue_path += "/resharding";
zookeeper->createIfNotExists(host_task_queue_path, "");
host_task_queue_path += "/" + getFQDNOrHostName();
zookeeper->createIfNotExists(host_task_queue_path, "");
host_task_queue_path = arguments.getTaskQueuePath() + "resharding/" + getFQDNOrHostName();
zookeeper->createAncestors(host_task_queue_path + "/");
}
ReshardingWorker::~ReshardingWorker()
@ -276,7 +250,7 @@ void ReshardingWorker::perform(const ReshardingJob & job)
auto & storage = typeid_cast<StorageReplicatedMergeTree &>(*(generic_storage.get()));
/// Защитить перешардируемую партицию от задачи слияния.
ScopedPartitionMergeLock partition_merge_lock(storage, job.partition);
const MergeTreeMergeBlocker merge_blocker{storage.merger};
try
{
@ -309,166 +283,13 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor
LOG_DEBUG(log, "Splitting partition shard-wise.");
/// Куски одношо шарда, которые должы быть слиты.
struct PartsToBeMerged
{
void add(MergeTreeData::MutableDataPartPtr & part)
{
parts.insert(part);
total_size += part->size_in_bytes;
}
void clear()
{
parts.clear();
total_size = 0;
}
MergeTreeData::MutableDataParts parts;
size_t total_size = 0;
};
/// Для каждого шарда, куски, которые должны быть слиты.
std::unordered_map<size_t, PartsToBeMerged> to_merge;
/// Для нумерации блоков.
SimpleIncrement increment(storage.data.getMaxDataPartIndex());
MergeTreeData::PerShardDataParts & per_shard_data_parts = storage.data.per_shard_data_parts;
auto zookeeper = storage.getZooKeeper();
const auto & settings = context.getSettingsRef();
DayNum_t month = MergeTreeData::getMonthFromName(job.partition);
{
std::lock_guard<std::mutex> guard(cancel_mutex);
merger = std::make_unique<MergeTreeDataMerger>(storage.data);
}
auto parts_from_partition = merger->selectAllPartsFromPartition(month);
MergeTreeSharder sharder(storage.data, job);
for (const auto & part : parts_from_partition)
{
MarkRanges ranges(1, MarkRange(0, part->size));
MergeTreeBlockInputStream source(
storage.data.getFullPath() + part->name + '/',
DEFAULT_MERGE_BLOCK_SIZE,
part->columns.getNames(),
storage.data,
part,
ranges,
false,
nullptr,
"",
true,
settings.min_bytes_to_use_direct_io,
DBMS_DEFAULT_BUFFER_SIZE,
true);
Block block;
while (block = source.read())
{
/// Разбить куски на несколько, согласно ключу шардирования.
ShardedBlocksWithDateIntervals blocks = sharder.shardBlock(block);
for (ShardedBlockWithDateInterval & block_with_dates : blocks)
{
abortIfRequested();
/// Создать новый кусок соответствующий новому блоку.
Int64 temp_index = increment.get();
MergeTreeData::MutableDataPartPtr block_part = sharder.writeTempPart(block_with_dates, temp_index);
abortIfRequested();
/// Добавить новый кусок в список кусков соответствущего шарда, которые должны
/// быть слиты. Если установлено, что при вставке этого куска, суммарный размер
/// кусков бы превышал некоторый предел, сначала слияем все куски, затем их
/// перемещаем в список кусков новой партиции.
PartsToBeMerged & parts_to_be_merged = to_merge[block_with_dates.shard_no];
if ((parts_to_be_merged.total_size + block_part->size_in_bytes) > storage.data.settings.max_bytes_to_merge_parts)
{
MergeTreeData::MutableDataParts & sharded_parts = per_shard_data_parts[block_with_dates.shard_no];
if (parts_to_be_merged.parts.size() >= 2)
{
MergeTreeData::DataPartsVector parts(parts_to_be_merged.parts.begin(), parts_to_be_merged.parts.end());
std::string merged_name = createMergedPartName(parts);
const auto & merge_entry = storage.data.context.getMergeList().insert(job.database_name,
job.table_name, merged_name);
MergeTreeData::MutableDataPartPtr new_part = merger->mergeParts(parts, merged_name, *merge_entry,
storage.data.context.getSettings().min_bytes_to_use_direct_io);
sharded_parts.insert(new_part);
}
else
sharded_parts.insert(block_part);
/// Удалить исходные куски.
parts_to_be_merged.clear();
}
parts_to_be_merged.add(block_part);
}
}
/// Обработать все оставшиеся куски.
for (auto & entry : to_merge)
{
abortIfRequested();
size_t shard_no = entry.first;
PartsToBeMerged & parts_to_be_merged = entry.second;
MergeTreeData::MutableDataParts & sharded_parts = per_shard_data_parts[shard_no];
if (parts_to_be_merged.parts.size() >= 2)
{
MergeTreeData::DataPartsVector parts(parts_to_be_merged.parts.begin(), parts_to_be_merged.parts.end());
std::string merged_name = createMergedPartName(parts);
const auto & merge_entry = storage.data.context.getMergeList().insert(job.database_name,
job.table_name, merged_name);
MergeTreeData::MutableDataPartPtr new_part = merger->mergeParts(parts, merged_name, *merge_entry,
storage.data.context.getSettings().min_bytes_to_use_direct_io);
sharded_parts.insert(new_part);
}
else
{
auto single_part = *(parts_to_be_merged.parts.begin());
sharded_parts.insert(single_part);
}
/// Удалить исходные куски.
parts_to_be_merged.clear();
}
}
/// До сих пор все куски новых партиций были временны.
for (auto & entry : per_shard_data_parts)
{
size_t shard_no = entry.first;
MergeTreeData::MutableDataParts & sharded_parts = entry.second;
for (auto & sharded_part : sharded_parts)
{
sharded_part->is_temp = false;
std::string prefix = storage.full_path + "reshard/" + toString(shard_no) + "/";
std::string old_name = sharded_part->name;
std::string new_name = ActiveDataPartSet::getPartName(sharded_part->left_date,
sharded_part->right_date, sharded_part->left, sharded_part->right, sharded_part->level);
sharded_part->name = new_name;
Poco::File(prefix + old_name).renameTo(prefix + new_name);
}
}
MergeTreeData::PerShardDataParts & per_shard_data_parts = storage.data.per_shard_data_parts;
per_shard_data_parts = merger->reshardPartition(job, storage.data.context.getSettings().min_bytes_to_use_direct_io);
}
void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & storage, const ReshardingJob & job)
@ -482,17 +303,17 @@ void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & sto
struct TaskInfo
{
TaskInfo(const std::string & replica_path_,
const std::vector<std::string> & parts_,
const std::string & part_,
const ReplicatedMergeTreeAddress & dest_,
size_t shard_no_)
: replica_path(replica_path_), dest(dest_), parts(parts_),
: replica_path(replica_path_), dest(dest_), part(part_),
shard_no(shard_no_)
{
}
std::string replica_path;
ReplicatedMergeTreeAddress dest;
std::vector<std::string> parts;
std::string part;
size_t shard_no;
};
@ -507,14 +328,10 @@ void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & sto
for (const auto & entry : storage.data.per_shard_data_parts)
{
size_t shard_no = entry.first;
const MergeTreeData::MutableDataParts & sharded_parts = entry.second;
if (sharded_parts.empty())
const MergeTreeData::MutableDataPartPtr & part_from_shard = entry.second;
if (!part_from_shard)
continue;
std::vector<std::string> part_names;
for (const MergeTreeData::DataPartPtr & sharded_part : sharded_parts)
part_names.push_back(sharded_part->name);
const WeightedZooKeeperPath & weighted_path = job.paths[shard_no];
const std::string & zookeeper_path = weighted_path.first;
@ -524,7 +341,7 @@ void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & sto
const std::string replica_path = zookeeper_path + "/replicas/" + child;
auto host = zookeeper->get(replica_path + "/host");
ReplicatedMergeTreeAddress host_desc(host);
task_info_list.emplace_back(replica_path, part_names, host_desc, shard_no);
task_info_list.emplace_back(replica_path, part_from_shard->name, host_desc, shard_no);
if (replica_path == storage.replica_path)
{
++local_count;
@ -554,14 +371,14 @@ void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & sto
const TaskInfo & entry = task_info_list[i];
const auto & replica_path = entry.replica_path;
const auto & dest = entry.dest;
const auto & parts = entry.parts;
const auto & part = entry.part;
size_t shard_no = entry.shard_no;
InterserverIOEndpointLocation to_location(replica_path, dest.host, dest.replication_port);
size_t j = i - local_count;
tasks[j] = Tasks::value_type(std::bind(&ShardedPartitionSender::Client::send,
&storage.sharded_partition_sender_client, to_location, from_location, parts, shard_no));
&storage.sharded_partition_sender_client, to_location, from_location, part, shard_no));
pool.schedule([j, &tasks]{ tasks[j](); });
}
}
@ -586,15 +403,12 @@ void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & sto
{
/// На локальной реплике просто перемещаем шардированную паритцию в папку detached/.
const TaskInfo & entry = task_info_list[0];
const auto & parts = entry.parts;
const auto & part = entry.part;
size_t shard_no = entry.shard_no;
for (const auto & part : parts)
{
std::string from_path = storage.full_path + "reshard/" + toString(shard_no) + "/" + part + "/";
std::string to_path = storage.full_path + "detached/";
Poco::File(from_path).moveTo(to_path);
}
std::string from_path = storage.full_path + "reshard/" + toString(shard_no) + "/" + part + "/";
std::string to_path = storage.full_path + "detached/";
Poco::File(from_path).moveTo(to_path);
}
}
@ -628,8 +442,8 @@ void ReshardingWorker::applyChanges(StorageReplicatedMergeTree & storage, const
for (const auto & entry : storage.data.per_shard_data_parts)
{
size_t shard_no = entry.first;
const MergeTreeData::MutableDataParts & sharded_parts = entry.second;
if (sharded_parts.empty())
const MergeTreeData::MutableDataPartPtr & part_from_shard = entry.second;
if (!part_from_shard)
continue;
const WeightedZooKeeperPath & weighted_path = job.paths[shard_no];

View File

@ -5,8 +5,6 @@
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <boost/algorithm/string.hpp>
namespace DB
{
@ -15,28 +13,6 @@ namespace ErrorCodes
extern const int ABORTED;
}
namespace
{
std::string glue(const std::vector<std::string> & names, char delim)
{
std::string res;
bool is_first = true;
for (const auto & name : names)
{
if (is_first)
is_first = false;
else
res.append(1, delim);
res.append(name);
}
return res;
}
}
namespace ShardedPartitionSender
{
@ -66,33 +42,27 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out
throw Exception("ShardedPartitionSender service terminated", ErrorCodes::ABORTED);
InterserverIOEndpointLocation from_location(params.get("from_location"));
std::string glued_parts = params.get("parts");
std::string part_name = params.get("part");
size_t shard_no = std::stoul(params.get("shard"));
std::vector<std::string> parts;
boost::split(parts, glued_parts, boost::is_any_of(","));
if (is_cancelled)
throw Exception("ShardedPartitionSender service terminated", ErrorCodes::ABORTED);
for (const auto & part_name : parts)
MergeTreeData::MutableDataPartPtr part = storage.fetcher.fetchShardedPart(from_location, part_name, shard_no);
part->is_temp = false;
const std::string old_part_path = storage.full_path + part->name;
const std::string new_part_path = storage.full_path + "detached/" + part_name;
Poco::File new_part_dir(new_part_path);
if (new_part_dir.exists())
{
if (is_cancelled)
throw Exception("ShardedPartitionSender service terminated", ErrorCodes::ABORTED);
MergeTreeData::MutableDataPartPtr part = storage.fetcher.fetchShardedPart(from_location, part_name, shard_no);
part->is_temp = false;
const std::string old_part_path = storage.full_path + part->name;
const std::string new_part_path = storage.full_path + "detached/" + part_name;
Poco::File new_part_dir(new_part_path);
if (new_part_dir.exists())
{
LOG_WARNING(log, "Directory " + new_part_path + " already exists. Removing.");
new_part_dir.remove(true);
}
Poco::File(old_part_path).renameTo(new_part_path);
LOG_WARNING(log, "Directory " + new_part_path + " already exists. Removing.");
new_part_dir.remove(true);
}
Poco::File(old_part_path).renameTo(new_part_path);
bool flag = true;
writeBinary(flag, out);
out.next();
@ -104,16 +74,14 @@ Client::Client()
}
bool Client::send(const InterserverIOEndpointLocation & to_location, const InterserverIOEndpointLocation & from_location,
const std::vector<std::string> & parts, size_t shard_no)
const std::string & part, size_t shard_no)
{
std::string glued_parts = glue(parts, ',');
ReadBufferFromHTTP::Params params =
{
{"endpoint", getEndpointId(to_location.name)},
{"from_location", from_location.toString()},
{"compress", "false"},
{"parts", glued_parts},
{"part", part},
{"shard", toString(shard_no)}
};

View File

@ -75,6 +75,7 @@ namespace ErrorCodes
extern const int INVALID_PARTITIONS_INTERVAL;
extern const int RESHARDING_INVALID_PARAMETERS;
extern const int INVALID_SHARD_WEIGHT;
extern const int DUPLICATE_SHARD_PATHS;
}
@ -2720,8 +2721,35 @@ void StorageReplicatedMergeTree::dropPartition(ASTPtr query, const Field & field
return;
}
ScopedPartitionMergeLock partition_merge_lock(*this, month_name);
std::string fake_part_name = partition_merge_lock.getId();
/** Пропустим один номер в block_numbers для удаляемого месяца, и будем удалять только куски до этого номера.
* Это запретит мерджи удаляемых кусков с новыми вставляемыми данными.
* Инвариант: в логе не появятся слияния удаляемых кусков с другими кусками.
* NOTE: Если понадобится аналогично поддержать запрос DROP PART, для него придется придумать какой-нибудь новый механизм,
* чтобы гарантировать этот инвариант.
*/
Int64 right;
{
AbandonableLockInZooKeeper block_number_lock = allocateBlockNumber(month_name);
right = block_number_lock.getNumber();
block_number_lock.unlock();
}
/// Такого никогда не должно происходить.
if (right == 0)
throw Exception("Logical error: just allocated block number is zero", ErrorCodes::LOGICAL_ERROR);
--right;
String fake_part_name = getFakePartNameForDrop(month_name, 0, right);
/** Запретим выбирать для слияния удаляемые куски.
* Инвариант: после появления в логе записи DROP_RANGE, в логе не появятся слияния удаляемых кусков.
*/
{
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
queue.disableMergesInRange(fake_part_name);
}
/// Наконец, добившись нужных инвариантов, можно положить запись в лог.
LogEntry entry;
@ -2743,66 +2771,6 @@ void StorageReplicatedMergeTree::dropPartition(ASTPtr query, const Field & field
}
}
std::string StorageReplicatedMergeTree::acquirePartitionMergeLock(const std::string & partition_name)
{
std::lock_guard<std::mutex> guard(mutex_partition_to_merge_lock);
auto it = partition_to_merge_lock.find(partition_name);
if (it != partition_to_merge_lock.end())
{
auto & info = it->second;
++info.ref_count;
return info.fake_part_name;
}
/** Пропустим один номер в block_numbers для удаляемого месяца, и будем удалять только куски до этого номера.
* Это запретит мерджи удаляемых кусков с новыми вставляемыми данными.
* Инвариант: в логе не появятся слияния удаляемых кусков с другими кусками.
* NOTE: Если понадобится аналогично поддержать запрос DROP PART, для него придется придумать какой-нибудь новый механизм,
* чтобы гарантировать этот инвариант.
*/
Int64 right;
{
AbandonableLockInZooKeeper block_number_lock = allocateBlockNumber(partition_name);
right = block_number_lock.getNumber();
block_number_lock.unlock();
}
/// Такого никогда не должно происходить.
if (right == 0)
throw Exception("Logical error: just allocated block number is zero", ErrorCodes::LOGICAL_ERROR);
--right;
std::string fake_part_name = getFakePartNameForDrop(partition_name, 0, right);
partition_to_merge_lock.emplace(partition_name, PartitionMergeLockInfo(fake_part_name));
/** Запретим выбирать для слияния удаляемые куски.
* Инвариант: после появления в логе записи DROP_RANGE, в логе не появятся слияния удаляемых кусков.
*/
{
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
queue.disableMergesInRange(fake_part_name);
}
return fake_part_name;
}
void StorageReplicatedMergeTree::releasePartitionMergeLock(const std::string & partition_name)
{
std::lock_guard<std::mutex> guard(mutex_partition_to_merge_lock);
auto it = partition_to_merge_lock.find(partition_name);
if (it == partition_to_merge_lock.end())
throw Exception("StorageReplicatedMergeTree: trying to release a non-existent partition merge lock",
ErrorCodes::LOGICAL_ERROR);
auto & info = it->second;
--info.ref_count;
if (info.ref_count == 0)
partition_to_merge_lock.erase(it);
}
void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & field, bool unreplicated, bool attach_part, const Settings & settings)
{
@ -3435,7 +3403,7 @@ void StorageReplicatedMergeTree::reshardPartitions(const String & database_name,
{
auto & resharding_worker = context.getReshardingWorker();
if (!resharding_worker.isStarted())
throw Exception("Resharding background thread is not running.", ErrorCodes::RESHARDING_NO_WORKER);
throw Exception("Resharding background thread is not running", ErrorCodes::RESHARDING_NO_WORKER);
for (const auto & weighted_path : weighted_zookeeper_paths)
{
@ -3444,6 +3412,16 @@ void StorageReplicatedMergeTree::reshardPartitions(const String & database_name,
throw Exception("Shard has invalid weight", ErrorCodes::INVALID_SHARD_WEIGHT);
}
{
std::vector<std::string> all_paths;
all_paths.reserve(weighted_zookeeper_paths.size());
for (const auto & weighted_path : weighted_zookeeper_paths)
all_paths.push_back(weighted_path.first);
std::sort(all_paths.begin(), all_paths.end());
if (std::adjacent_find(all_paths.begin(), all_paths.end()) != all_paths.end())
throw Exception("Shard paths must be distinct", ErrorCodes::DUPLICATE_SHARD_PATHS);
}
DayNum_t first_partition_num = !first_partition.isNull() ? MergeTreeData::getMonthDayNum(first_partition) : DayNum_t();
DayNum_t last_partition_num = !last_partition.isNull() ? MergeTreeData::getMonthDayNum(last_partition) : DayNum_t();