2014-03-13 12:48:07 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
|
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
|
|
|
|
|
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
|
2014-08-04 11:41:59 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
|
2016-01-28 16:06:57 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreeSharder.h>
|
2016-01-28 01:00:27 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/ReshardingJob.h>
|
2016-10-27 22:50:02 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/SimpleMergeSelector.h>
|
|
|
|
|
#include <DB/Storages/MergeTree/MergeList.h>
|
2014-03-13 12:48:07 +00:00
|
|
|
|
#include <DB/DataStreams/ExpressionBlockInputStream.h>
|
|
|
|
|
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
|
|
|
|
|
#include <DB/DataStreams/CollapsingSortedBlockInputStream.h>
|
|
|
|
|
#include <DB/DataStreams/SummingSortedBlockInputStream.h>
|
2016-04-15 19:09:42 +00:00
|
|
|
|
#include <DB/DataStreams/ReplacingSortedBlockInputStream.h>
|
2016-04-24 09:44:47 +00:00
|
|
|
|
#include <DB/DataStreams/GraphiteRollupSortedBlockInputStream.h>
|
2014-05-26 16:11:20 +00:00
|
|
|
|
#include <DB/DataStreams/AggregatingSortedBlockInputStream.h>
|
2014-11-11 15:22:39 +00:00
|
|
|
|
#include <DB/DataStreams/MaterializingBlockInputStream.h>
|
2015-03-13 21:31:23 +00:00
|
|
|
|
#include <DB/DataStreams/ConcatBlockInputStream.h>
|
2016-10-27 22:50:02 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
|
2016-01-28 16:06:57 +00:00
|
|
|
|
#include <DB/Common/Increment.h>
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
#include <cmath>
|
|
|
|
|
|
2016-10-24 02:02:37 +00:00
|
|
|
|
|
|
|
|
|
namespace ProfileEvents
|
|
|
|
|
{
|
|
|
|
|
extern const Event MergedRows;
|
|
|
|
|
extern const Event MergedUncompressedBytes;
|
|
|
|
|
}
|
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
namespace CurrentMetrics
|
|
|
|
|
{
|
|
|
|
|
extern const Metric BackgroundPoolTask;
|
|
|
|
|
}
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
|
namespace ErrorCodes
|
|
|
|
|
{
|
|
|
|
|
extern const int ABORTED;
|
|
|
|
|
}
|
|
|
|
|
|
2016-01-28 16:06:57 +00:00
|
|
|
|
namespace
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
std::string createMergedPartName(const MergeTreeData::DataPartsVector & parts)
|
|
|
|
|
{
|
|
|
|
|
DayNum_t left_date = DayNum_t(std::numeric_limits<UInt16>::max());
|
|
|
|
|
DayNum_t right_date = DayNum_t(std::numeric_limits<UInt16>::min());
|
|
|
|
|
UInt32 level = 0;
|
|
|
|
|
|
|
|
|
|
for (const MergeTreeData::DataPartPtr & part : parts)
|
|
|
|
|
{
|
|
|
|
|
level = std::max(level, part->level);
|
|
|
|
|
left_date = std::min(left_date, part->left_date);
|
|
|
|
|
right_date = std::max(right_date, part->right_date);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ActiveDataPartSet::getPartName(left_date, right_date, parts.front()->left, parts.back()->right, level + 1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
/// Не будем соглашаться мерджить куски, если места на диске менее чем во столько раз больше суммарного размера кусков.
|
2014-03-13 17:44:00 +00:00
|
|
|
|
static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 1.6;
|
|
|
|
|
|
|
|
|
|
/// Объединяя куски, зарезервируем столько места на диске. Лучше сделать немного меньше, чем DISK_USAGE_COEFFICIENT_TO_SELECT,
|
|
|
|
|
/// потому что между выбором кусков и резервированием места места может стать немного меньше.
|
|
|
|
|
static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.4;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-03-25 11:48:45 +00:00
|
|
|
|
MergeTreeDataMerger::MergeTreeDataMerger(MergeTreeData & data_)
|
|
|
|
|
: data(data_), log(&Logger::get(data.getLogName() + " (Merger)"))
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
2016-03-01 17:47:53 +00:00
|
|
|
|
void MergeTreeDataMerger::setCancellationHook(CancellationHook cancellation_hook_)
|
|
|
|
|
{
|
|
|
|
|
cancellation_hook = cancellation_hook_;
|
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
|
|
|
|
|
size_t MergeTreeDataMerger::getMaxPartsSizeForMerge()
|
|
|
|
|
{
|
|
|
|
|
size_t total_threads_in_pool = data.context.getBackgroundPool().getNumberOfThreads();
|
|
|
|
|
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
|
|
|
|
|
size_t free_threads_in_pool = total_threads_in_pool - busy_threads_in_pool;
|
|
|
|
|
|
|
|
|
|
/// Interpolate between 'max_bytes_to_merge_at_min_space_in_pool' and 'max_bytes_to_merge_at_max_space_in_pool' with exponential function.
|
|
|
|
|
|
|
|
|
|
double base_of_exponent =
|
|
|
|
|
static_cast<double>(data.settings.max_bytes_to_merge_at_min_space_in_pool)
|
|
|
|
|
/ data.settings.max_bytes_to_merge_at_min_space_in_pool;
|
|
|
|
|
|
|
|
|
|
/// from 0 to 1
|
|
|
|
|
double power_of_exponent =
|
|
|
|
|
static_cast<double>(free_threads_in_pool)
|
|
|
|
|
/ total_threads_in_pool;
|
|
|
|
|
|
|
|
|
|
size_t max_size = data.settings.max_bytes_to_merge_at_min_space_in_pool * std::pow(base_of_exponent, power_of_exponent);
|
|
|
|
|
|
|
|
|
|
return std::min(max_size, static_cast<size_t>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_SELECT));
|
|
|
|
|
}
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-05-16 18:43:38 +00:00
|
|
|
|
bool MergeTreeDataMerger::selectPartsToMerge(
|
|
|
|
|
MergeTreeData::DataPartsVector & parts,
|
|
|
|
|
String & merged_name,
|
|
|
|
|
bool aggressive,
|
2016-10-27 22:50:02 +00:00
|
|
|
|
size_t max_total_size_to_merge,
|
2016-05-16 18:43:38 +00:00
|
|
|
|
const AllowedMergingPredicate & can_merge_callback)
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
|
|
|
|
MergeTreeData::DataParts data_parts = data.getDataParts();
|
|
|
|
|
|
2016-03-25 11:48:45 +00:00
|
|
|
|
if (data_parts.empty())
|
|
|
|
|
return false;
|
|
|
|
|
|
2014-08-01 09:32:31 +00:00
|
|
|
|
/// Мемоизация для функции can_merge_callback. Результат вызова can_merge_callback для этого куска и предыдущего в data_parts.
|
|
|
|
|
std::map<MergeTreeData::DataPartPtr, bool> can_merge_with_previous;
|
|
|
|
|
auto can_merge = [&can_merge_with_previous, &can_merge_callback]
|
|
|
|
|
(const MergeTreeData::DataPartPtr & first, const MergeTreeData::DataPartPtr & second) -> bool
|
|
|
|
|
{
|
|
|
|
|
auto it = can_merge_with_previous.find(second);
|
|
|
|
|
if (it != can_merge_with_previous.end())
|
|
|
|
|
return it->second;
|
|
|
|
|
bool res = can_merge_callback(first, second);
|
|
|
|
|
can_merge_with_previous[second] = res;
|
|
|
|
|
return res;
|
|
|
|
|
};
|
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
time_t current_time = time(nullptr);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
IMergeSelector::Partitions partitions;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
DayNum_t prev_month = DayNum_t(-1);
|
|
|
|
|
for (const MergeTreeData::DataPartPtr & part : data_parts)
|
|
|
|
|
{
|
|
|
|
|
DayNum_t month = part->month;
|
|
|
|
|
if (month != prev_month)
|
2014-04-11 13:05:17 +00:00
|
|
|
|
{
|
2016-10-27 22:50:02 +00:00
|
|
|
|
partitions.emplace_back();
|
|
|
|
|
prev_month = month;
|
2014-04-11 13:05:17 +00:00
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
IMergeSelector::Part part_info;
|
|
|
|
|
part_info.size = part->size_in_bytes;
|
|
|
|
|
part_info.age = current_time - part->modification_time;
|
|
|
|
|
part_info.level = part->level;
|
|
|
|
|
part_info.data = ∂
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
partitions.back().emplace_back(part_info);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
SimpleMergeSelector merge_selector{SimpleMergeSelector::Settings()};
|
|
|
|
|
IMergeSelector::PartsInPartition parts_to_merge = merge_selector.select(
|
|
|
|
|
partitions,
|
|
|
|
|
[] (const IMergeSelector::Part &) { return true; },
|
|
|
|
|
[&] (const IMergeSelector::Part & left, const IMergeSelector::Part & right)
|
|
|
|
|
{
|
|
|
|
|
return can_merge(
|
|
|
|
|
*static_cast<const MergeTreeData::DataPartPtr *>(left.data),
|
|
|
|
|
*static_cast<const MergeTreeData::DataPartPtr *>(right.data));
|
|
|
|
|
},
|
|
|
|
|
max_total_size_to_merge,
|
|
|
|
|
aggressive);
|
|
|
|
|
|
|
|
|
|
if (!parts_to_merge.empty())
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
|
|
|
|
parts.clear();
|
|
|
|
|
|
2014-04-04 10:37:33 +00:00
|
|
|
|
DayNum_t left_date = DayNum_t(std::numeric_limits<UInt16>::max());
|
|
|
|
|
DayNum_t right_date = DayNum_t(std::numeric_limits<UInt16>::min());
|
|
|
|
|
UInt32 level = 0;
|
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
for (IMergeSelector::Part & part_info : parts_to_merge)
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2016-10-27 22:50:02 +00:00
|
|
|
|
const MergeTreeData::DataPartPtr & part = *static_cast<const MergeTreeData::DataPartPtr *>(part_info.data);
|
2014-04-04 10:37:33 +00:00
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
parts.push_back(part);
|
2014-04-04 10:37:33 +00:00
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
level = std::max(level, part->level);
|
|
|
|
|
left_date = std::min(left_date, part->left_date);
|
|
|
|
|
right_date = std::max(right_date, part->right_date);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-05-26 10:41:40 +00:00
|
|
|
|
merged_name = ActiveDataPartSet::getPartName(
|
2014-04-04 10:37:33 +00:00
|
|
|
|
left_date, right_date, parts.front()->left, parts.back()->right, level + 1);
|
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name);
|
|
|
|
|
return true;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-10-27 22:50:02 +00:00
|
|
|
|
return false;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-03-31 01:25:16 +00:00
|
|
|
|
|
2016-05-16 18:43:38 +00:00
|
|
|
|
bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
|
|
|
|
|
MergeTreeData::DataPartsVector & what,
|
|
|
|
|
String & merged_name,
|
|
|
|
|
size_t available_disk_space,
|
|
|
|
|
const AllowedMergingPredicate & can_merge,
|
|
|
|
|
DayNum_t partition,
|
|
|
|
|
bool final)
|
|
|
|
|
{
|
|
|
|
|
MergeTreeData::DataPartsVector parts = selectAllPartsFromPartition(partition);
|
|
|
|
|
|
|
|
|
|
if (parts.empty())
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
if (!final && parts.size() == 1)
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
MergeTreeData::DataPartsVector::const_iterator it = parts.begin();
|
|
|
|
|
MergeTreeData::DataPartsVector::const_iterator prev_it = it;
|
|
|
|
|
|
|
|
|
|
size_t sum_bytes = 0;
|
|
|
|
|
DayNum_t left_date = DayNum_t(std::numeric_limits<UInt16>::max());
|
|
|
|
|
DayNum_t right_date = DayNum_t(std::numeric_limits<UInt16>::min());
|
|
|
|
|
UInt32 level = 0;
|
|
|
|
|
|
|
|
|
|
while (it != parts.end())
|
|
|
|
|
{
|
|
|
|
|
if ((it != parts.begin() || parts.size() == 1) /// Для случая одного куска, проверяем, что его можно мерджить "самого с собой".
|
|
|
|
|
&& !can_merge(*prev_it, *it))
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
level = std::max(level, (*it)->level);
|
|
|
|
|
left_date = std::min(left_date, (*it)->left_date);
|
|
|
|
|
right_date = std::max(right_date, (*it)->right_date);
|
|
|
|
|
|
|
|
|
|
sum_bytes += (*it)->size_in_bytes;
|
|
|
|
|
|
|
|
|
|
prev_it = it;
|
|
|
|
|
++it;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Достаточно места на диске, чтобы покрыть новый мердж с запасом.
|
|
|
|
|
if (available_disk_space <= sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT)
|
|
|
|
|
{
|
|
|
|
|
time_t now = time(0);
|
|
|
|
|
if (now - disk_space_warning_time > 3600)
|
|
|
|
|
{
|
|
|
|
|
disk_space_warning_time = now;
|
|
|
|
|
LOG_WARNING(log, "Won't merge parts from " << parts.front()->name << " to " << (*prev_it)->name
|
|
|
|
|
<< " because not enough free space: "
|
|
|
|
|
<< formatReadableSizeWithBinarySuffix(available_disk_space) << " free and unreserved "
|
|
|
|
|
<< "(" << formatReadableSizeWithBinarySuffix(DiskSpaceMonitor::getReservedSpace()) << " reserved in "
|
|
|
|
|
<< DiskSpaceMonitor::getReservationCount() << " chunks), "
|
|
|
|
|
<< formatReadableSizeWithBinarySuffix(sum_bytes)
|
|
|
|
|
<< " required now (+" << static_cast<int>((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100)
|
|
|
|
|
<< "% on overhead); suppressing similar warnings for the next hour");
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
what = parts;
|
|
|
|
|
merged_name = ActiveDataPartSet::getPartName(
|
|
|
|
|
left_date, right_date, parts.front()->left, parts.back()->right, level + 1);
|
|
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2016-01-28 01:00:27 +00:00
|
|
|
|
MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition(DayNum_t partition)
|
|
|
|
|
{
|
|
|
|
|
MergeTreeData::DataPartsVector parts_from_partition;
|
|
|
|
|
|
|
|
|
|
MergeTreeData::DataParts data_parts = data.getDataParts();
|
|
|
|
|
|
|
|
|
|
for (MergeTreeData::DataParts::iterator it = data_parts.cbegin(); it != data_parts.cend(); ++it)
|
|
|
|
|
{
|
|
|
|
|
const MergeTreeData::DataPartPtr & current_part = *it;
|
|
|
|
|
DayNum_t month = current_part->month;
|
|
|
|
|
if (month != partition)
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
parts_from_partition.push_back(*it);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return parts_from_partition;
|
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-03-31 01:25:16 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
/// parts должны быть отсортированы.
|
2016-03-31 01:25:16 +00:00
|
|
|
|
MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart(
|
2016-03-25 11:48:45 +00:00
|
|
|
|
MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry,
|
2016-04-24 09:44:47 +00:00
|
|
|
|
size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation)
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2016-03-31 23:49:31 +00:00
|
|
|
|
if (isCancelled())
|
|
|
|
|
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
|
|
|
|
|
|
2014-09-10 11:34:26 +00:00
|
|
|
|
merge_entry->num_parts = parts.size();
|
|
|
|
|
|
2014-06-30 13:57:20 +00:00
|
|
|
|
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name << " into " << merged_name);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-01-28 16:06:57 +00:00
|
|
|
|
String merged_dir = data.getFullPath() + merged_name;
|
2014-07-28 14:31:07 +00:00
|
|
|
|
if (Poco::File(merged_dir).exists())
|
|
|
|
|
throw Exception("Directory " + merged_dir + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
|
|
|
|
|
|
2014-07-15 09:56:17 +00:00
|
|
|
|
for (const MergeTreeData::DataPartPtr & part : parts)
|
|
|
|
|
{
|
|
|
|
|
Poco::ScopedReadRWLock part_lock(part->columns_lock);
|
2014-09-10 11:34:26 +00:00
|
|
|
|
|
2014-09-12 16:05:29 +00:00
|
|
|
|
merge_entry->total_size_bytes_compressed += part->size_in_bytes;
|
2014-09-10 11:34:26 +00:00
|
|
|
|
merge_entry->total_size_marks += part->size;
|
2014-07-15 09:56:17 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-04-08 16:48:47 +00:00
|
|
|
|
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
|
2015-04-10 17:09:16 +00:00
|
|
|
|
if (aio_threshold > 0)
|
|
|
|
|
{
|
|
|
|
|
for (const MergeTreeData::DataPartPtr & part : parts)
|
|
|
|
|
part->accumulateColumnSizes(merged_column_to_size);
|
|
|
|
|
}
|
2015-04-08 16:48:47 +00:00
|
|
|
|
|
2016-05-18 02:49:52 +00:00
|
|
|
|
Names column_names = data.getColumnNamesList();
|
|
|
|
|
NamesAndTypesList column_names_and_types = data.getColumnsList();
|
|
|
|
|
|
2014-03-14 17:19:38 +00:00
|
|
|
|
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
|
2014-05-26 10:41:40 +00:00
|
|
|
|
ActiveDataPartSet::parsePartName(merged_name, *new_data_part);
|
2014-04-07 15:45:46 +00:00
|
|
|
|
new_data_part->name = "tmp_" + merged_name;
|
2014-07-17 10:44:17 +00:00
|
|
|
|
new_data_part->is_temp = true;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
|
|
|
|
/** Читаем из всех кусков, сливаем и пишем в новый.
|
|
|
|
|
* Попутно вычисляем выражение для сортировки.
|
|
|
|
|
*/
|
|
|
|
|
BlockInputStreams src_streams;
|
|
|
|
|
|
2014-08-04 11:41:59 +00:00
|
|
|
|
size_t sum_rows_approx = 0;
|
|
|
|
|
|
2014-09-18 10:23:12 +00:00
|
|
|
|
const auto rows_total = merge_entry->total_size_marks * data.index_granularity;
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
for (size_t i = 0; i < parts.size(); ++i)
|
|
|
|
|
{
|
|
|
|
|
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
|
2014-09-12 16:05:29 +00:00
|
|
|
|
|
2016-01-28 16:06:57 +00:00
|
|
|
|
String part_path = data.getFullPath() + parts[i]->name + '/';
|
2015-02-10 21:10:58 +00:00
|
|
|
|
auto input = std::make_unique<MergeTreeBlockInputStream>(
|
2016-05-18 02:49:52 +00:00
|
|
|
|
part_path, DEFAULT_MERGE_BLOCK_SIZE, column_names, data,
|
2015-12-26 00:59:09 +00:00
|
|
|
|
parts[i], ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
|
2014-10-25 18:33:52 +00:00
|
|
|
|
|
|
|
|
|
input->setProgressCallback([&merge_entry, rows_total] (const Progress & value)
|
2016-04-23 02:39:40 +00:00
|
|
|
|
{
|
2016-07-31 03:53:16 +00:00
|
|
|
|
const auto new_rows_read = merge_entry->rows_read += value.rows;
|
2016-04-23 02:39:40 +00:00
|
|
|
|
merge_entry->progress = static_cast<Float64>(new_rows_read) / rows_total;
|
2016-07-31 03:53:16 +00:00
|
|
|
|
merge_entry->bytes_read_uncompressed += value.bytes;
|
2016-04-23 02:39:40 +00:00
|
|
|
|
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::MergedRows, value.rows);
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
|
|
|
|
|
});
|
2014-09-12 16:05:29 +00:00
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
|
2016-05-28 12:22:22 +00:00
|
|
|
|
src_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(
|
|
|
|
|
std::make_shared<ExpressionBlockInputStream>(BlockInputStreamPtr(std::move(input)), data.getPrimaryExpression())));
|
2015-03-14 02:37:53 +00:00
|
|
|
|
else
|
2016-05-28 12:22:22 +00:00
|
|
|
|
src_streams.emplace_back(std::move(input));
|
2015-03-14 02:37:53 +00:00
|
|
|
|
|
2014-08-04 11:41:59 +00:00
|
|
|
|
sum_rows_approx += parts[i]->size * data.index_granularity;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.
|
2014-06-30 10:02:25 +00:00
|
|
|
|
/// В слитом куске строки с одинаковым ключом должны идти в порядке возрастания идентификатора исходного куска,
|
|
|
|
|
/// то есть (примерного) возрастания времени вставки.
|
2014-09-12 16:05:29 +00:00
|
|
|
|
std::unique_ptr<IProfilingBlockInputStream> merged_stream;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
switch (data.merging_params.mode)
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Ordinary:
|
2015-04-10 17:54:33 +00:00
|
|
|
|
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
|
|
|
|
|
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Collapsing:
|
2015-04-10 17:54:33 +00:00
|
|
|
|
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
|
2016-04-15 17:13:51 +00:00
|
|
|
|
src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Summing:
|
2015-04-10 17:54:33 +00:00
|
|
|
|
merged_stream = std::make_unique<SummingSortedBlockInputStream>(
|
2016-04-15 17:13:51 +00:00
|
|
|
|
src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Aggregating:
|
2015-04-10 17:54:33 +00:00
|
|
|
|
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(
|
|
|
|
|
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
|
2014-05-26 16:11:20 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 19:09:42 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Replacing:
|
|
|
|
|
merged_stream = std::make_unique<ReplacingSortedBlockInputStream>(
|
|
|
|
|
src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE);
|
2016-04-15 17:13:51 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2016-04-24 09:44:47 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Graphite:
|
|
|
|
|
merged_stream = std::make_unique<GraphiteRollupSortedBlockInputStream>(
|
|
|
|
|
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE,
|
|
|
|
|
data.merging_params.graphite_params, time_of_merge);
|
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Unsorted:
|
2015-03-13 21:31:23 +00:00
|
|
|
|
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
|
|
|
|
|
break;
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
default:
|
2016-04-15 17:42:51 +00:00
|
|
|
|
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.merging_params.mode), ErrorCodes::LOGICAL_ERROR);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-01-28 16:06:57 +00:00
|
|
|
|
String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
|
2014-03-27 12:32:37 +00:00
|
|
|
|
|
2015-04-17 05:35:53 +00:00
|
|
|
|
auto compression_method = data.context.chooseCompressionMethod(
|
|
|
|
|
merge_entry->total_size_bytes_compressed,
|
|
|
|
|
static_cast<double>(merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
|
|
|
|
|
|
2016-05-18 02:49:52 +00:00
|
|
|
|
MergedBlockOutputStream to{
|
|
|
|
|
data, new_part_tmp_path, column_names_and_types, compression_method, merged_column_to_size, aio_threshold};
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
|
|
|
|
merged_stream->readPrefix();
|
2014-09-12 16:05:29 +00:00
|
|
|
|
to.writePrefix();
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2014-08-04 11:41:59 +00:00
|
|
|
|
size_t rows_written = 0;
|
2014-09-12 16:05:29 +00:00
|
|
|
|
const size_t initial_reservation = disk_reservation ? disk_reservation->getSize() : 0;
|
2014-08-04 11:41:59 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
Block block;
|
2015-09-18 06:39:39 +00:00
|
|
|
|
while (!isCancelled() && (block = merged_stream->read()))
|
2014-08-04 11:41:59 +00:00
|
|
|
|
{
|
|
|
|
|
rows_written += block.rows();
|
2014-09-12 16:05:29 +00:00
|
|
|
|
to.write(block);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-08-13 02:21:17 +00:00
|
|
|
|
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
|
|
|
|
|
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2014-08-04 11:41:59 +00:00
|
|
|
|
if (disk_reservation)
|
|
|
|
|
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation));
|
|
|
|
|
}
|
|
|
|
|
|
2015-09-18 06:39:39 +00:00
|
|
|
|
if (isCancelled())
|
|
|
|
|
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
|
|
|
|
merged_stream->readSuffix();
|
2016-05-18 02:49:52 +00:00
|
|
|
|
new_data_part->columns = column_names_and_types;
|
2014-09-12 16:05:29 +00:00
|
|
|
|
new_data_part->checksums = to.writeSuffixAndGetChecksums();
|
|
|
|
|
new_data_part->index.swap(to.getIndex());
|
2014-03-27 12:32:37 +00:00
|
|
|
|
|
2014-04-04 17:20:45 +00:00
|
|
|
|
/// Для удобства, даже CollapsingSortedBlockInputStream не может выдать ноль строк.
|
2014-09-12 16:05:29 +00:00
|
|
|
|
if (0 == to.marksCount())
|
2014-03-13 12:48:07 +00:00
|
|
|
|
throw Exception("Empty part after merge", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
2014-09-12 16:05:29 +00:00
|
|
|
|
new_data_part->size = to.marksCount();
|
2014-03-13 12:48:07 +00:00
|
|
|
|
new_data_part->modification_time = time(0);
|
2014-05-14 17:51:37 +00:00
|
|
|
|
new_data_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(new_part_tmp_path);
|
2016-01-28 16:06:57 +00:00
|
|
|
|
new_data_part->is_sharded = false;
|
|
|
|
|
|
2016-03-31 01:25:16 +00:00
|
|
|
|
return new_data_part;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MergeTreeData::DataPartPtr MergeTreeDataMerger::renameMergedTemporaryPart(
|
|
|
|
|
MergeTreeData::DataPartsVector & parts,
|
|
|
|
|
MergeTreeData::MutableDataPartPtr & new_data_part,
|
|
|
|
|
const String & merged_name,
|
|
|
|
|
MergeTreeData::Transaction * out_transaction)
|
|
|
|
|
{
|
2016-01-28 16:06:57 +00:00
|
|
|
|
/// Переименовываем новый кусок, добавляем в набор и убираем исходные куски.
|
|
|
|
|
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, nullptr, out_transaction);
|
|
|
|
|
|
|
|
|
|
if (new_data_part->name != merged_name)
|
|
|
|
|
throw Exception("Unexpected part name: " + new_data_part->name + " instead of " + merged_name, ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
|
|
/// Проверим, что удалились все исходные куски и только они.
|
|
|
|
|
if (replaced_parts.size() != parts.size())
|
|
|
|
|
{
|
|
|
|
|
/** Это нормально, хотя такое бывает редко.
|
|
|
|
|
* Ситуация - было заменено 0 кусков вместо N может быть, например, в следующем случае:
|
|
|
|
|
* - у нас был кусок A, но не было куска B и C;
|
|
|
|
|
* - в очереди был мердж A, B -> AB, но его не делали, так как куска B нет;
|
|
|
|
|
* - в очереди был мердж AB, C -> ABC, но его не делали, так как куска AB и C нет;
|
|
|
|
|
* - мы выполнили задачу на скачивание куска B;
|
|
|
|
|
* - мы начали делать мердж A, B -> AB, так как все куски появились;
|
|
|
|
|
* - мы решили скачать с другой реплики кусок ABC, так как невозможно было сделать мердж AB, C -> ABC;
|
|
|
|
|
* - кусок ABC появился, при его добавлении, были удалены старые куски A, B, C;
|
|
|
|
|
* - мердж AB закончился. Добавился кусок AB. Но это устаревший кусок. В логе будет сообщение Obsolete part added,
|
|
|
|
|
* затем попадаем сюда.
|
|
|
|
|
* Ситуация - было заменено M > N кусков тоже нормальная.
|
|
|
|
|
*
|
|
|
|
|
* Хотя это должно предотвращаться проверкой в методе StorageReplicatedMergeTree::shouldExecuteLogEntry.
|
|
|
|
|
*/
|
|
|
|
|
LOG_WARNING(log, "Unexpected number of parts removed when adding " << new_data_part->name << ": " << replaced_parts.size()
|
|
|
|
|
<< " instead of " << parts.size());
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
for (size_t i = 0; i < parts.size(); ++i)
|
|
|
|
|
if (parts[i]->name != replaced_parts[i]->name)
|
|
|
|
|
throw Exception("Unexpected part removed when adding " + new_data_part->name + ": " + replaced_parts[i]->name
|
|
|
|
|
+ " instead of " + parts[i]->name, ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
LOG_TRACE(log, "Merged " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name);
|
|
|
|
|
return new_data_part;
|
|
|
|
|
}
|
|
|
|
|
|
2016-03-31 01:25:16 +00:00
|
|
|
|
|
2016-01-28 16:06:57 +00:00
|
|
|
|
MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
|
2016-03-01 17:47:53 +00:00
|
|
|
|
const ReshardingJob & job, DiskSpaceMonitor::Reservation * disk_reservation)
|
2016-01-28 16:06:57 +00:00
|
|
|
|
{
|
2016-03-01 17:47:53 +00:00
|
|
|
|
size_t aio_threshold = data.context.getSettings().min_bytes_to_use_direct_io;
|
|
|
|
|
|
2016-01-28 16:06:57 +00:00
|
|
|
|
/// Собрать все куски партиции.
|
|
|
|
|
DayNum_t month = MergeTreeData::getMonthFromName(job.partition);
|
|
|
|
|
MergeTreeData::DataPartsVector parts = selectAllPartsFromPartition(month);
|
|
|
|
|
|
|
|
|
|
/// Создать временное название папки.
|
|
|
|
|
std::string merged_name = createMergedPartName(parts);
|
|
|
|
|
|
|
|
|
|
MergeList::EntryPtr merge_entry_ptr = data.context.getMergeList().insert(job.database_name,
|
|
|
|
|
job.table_name, merged_name);
|
|
|
|
|
MergeList::Entry & merge_entry = *merge_entry_ptr;
|
|
|
|
|
merge_entry->num_parts = parts.size();
|
|
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Resharding " << parts.size() << " parts from " << parts.front()->name
|
|
|
|
|
<< " to " << parts.back()->name << " which span the partition " << job.partition);
|
|
|
|
|
|
|
|
|
|
/// Слияние всех кусков партиции.
|
|
|
|
|
|
|
|
|
|
for (const MergeTreeData::DataPartPtr & part : parts)
|
|
|
|
|
{
|
|
|
|
|
Poco::ScopedReadRWLock part_lock(part->columns_lock);
|
|
|
|
|
|
|
|
|
|
merge_entry->total_size_bytes_compressed += part->size_in_bytes;
|
|
|
|
|
merge_entry->total_size_marks += part->size;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
|
|
|
|
|
if (aio_threshold > 0)
|
|
|
|
|
{
|
|
|
|
|
for (const MergeTreeData::DataPartPtr & part : parts)
|
|
|
|
|
part->accumulateColumnSizes(merged_column_to_size);
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-18 02:49:52 +00:00
|
|
|
|
Names column_names = data.getColumnNamesList();
|
|
|
|
|
NamesAndTypesList column_names_and_types = data.getColumnsList();
|
|
|
|
|
|
2016-01-28 16:06:57 +00:00
|
|
|
|
BlockInputStreams src_streams;
|
|
|
|
|
|
|
|
|
|
size_t sum_rows_approx = 0;
|
|
|
|
|
|
|
|
|
|
const auto rows_total = merge_entry->total_size_marks * data.index_granularity;
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < parts.size(); ++i)
|
|
|
|
|
{
|
|
|
|
|
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
|
|
|
|
|
|
|
|
|
|
String part_path = data.getFullPath() + parts[i]->name + '/';
|
|
|
|
|
|
|
|
|
|
auto input = std::make_unique<MergeTreeBlockInputStream>(
|
2016-05-18 02:49:52 +00:00
|
|
|
|
part_path, DEFAULT_MERGE_BLOCK_SIZE, column_names, data,
|
2016-01-28 16:06:57 +00:00
|
|
|
|
parts[i], ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
|
|
|
|
|
|
|
|
|
|
input->setProgressCallback([&merge_entry, rows_total] (const Progress & value)
|
|
|
|
|
{
|
2016-07-31 03:53:16 +00:00
|
|
|
|
const auto new_rows_read = merge_entry->rows_read += value.rows;
|
2016-01-28 16:06:57 +00:00
|
|
|
|
merge_entry->progress = static_cast<Float64>(new_rows_read) / rows_total;
|
2016-07-31 03:53:16 +00:00
|
|
|
|
merge_entry->bytes_read_uncompressed += value.bytes;
|
2016-01-28 16:06:57 +00:00
|
|
|
|
});
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
|
2016-05-28 12:22:22 +00:00
|
|
|
|
src_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(
|
|
|
|
|
std::make_shared<ExpressionBlockInputStream>(BlockInputStreamPtr(std::move(input)), data.getPrimaryExpression())));
|
2016-01-28 16:06:57 +00:00
|
|
|
|
else
|
2016-05-28 12:22:22 +00:00
|
|
|
|
src_streams.emplace_back(std::move(input));
|
2016-01-28 16:06:57 +00:00
|
|
|
|
|
|
|
|
|
sum_rows_approx += parts[i]->size * data.index_granularity;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Шардирование слитых блоков.
|
|
|
|
|
|
|
|
|
|
/// Для нумерации блоков.
|
2016-03-01 17:47:53 +00:00
|
|
|
|
SimpleIncrement increment(job.block_number);
|
2016-01-28 16:06:57 +00:00
|
|
|
|
|
|
|
|
|
/// Создать новый кусок для каждого шарда.
|
|
|
|
|
MergeTreeData::PerShardDataParts per_shard_data_parts;
|
|
|
|
|
|
|
|
|
|
per_shard_data_parts.reserve(job.paths.size());
|
|
|
|
|
for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no)
|
|
|
|
|
{
|
|
|
|
|
Int64 temp_index = increment.get();
|
|
|
|
|
|
|
|
|
|
MergeTreeData::MutableDataPartPtr data_part = std::make_shared<MergeTreeData::DataPart>(data);
|
|
|
|
|
data_part->name = "tmp_" + merged_name;
|
|
|
|
|
data_part->is_temp = true;
|
|
|
|
|
data_part->left_date = std::numeric_limits<UInt16>::max();
|
|
|
|
|
data_part->right_date = std::numeric_limits<UInt16>::min();
|
|
|
|
|
data_part->month = month;
|
|
|
|
|
data_part->left = temp_index;
|
|
|
|
|
data_part->right = temp_index;
|
|
|
|
|
data_part->level = 0;
|
|
|
|
|
per_shard_data_parts.emplace(shard_no, data_part);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Очень грубая оценка для размера сжатых данных каждой шардированной партиции.
|
|
|
|
|
/// На самом деле всё зависит от свойств выражения для шардирования.
|
|
|
|
|
UInt64 per_shard_size_bytes_compressed = merge_entry->total_size_bytes_compressed / static_cast<double>(job.paths.size());
|
|
|
|
|
|
|
|
|
|
auto compression_method = data.context.chooseCompressionMethod(
|
|
|
|
|
per_shard_size_bytes_compressed,
|
|
|
|
|
static_cast<double>(per_shard_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
|
|
|
|
|
|
|
|
|
|
using MergedBlockOutputStreamPtr = std::unique_ptr<MergedBlockOutputStream>;
|
|
|
|
|
using PerShardOutput = std::unordered_map<size_t, MergedBlockOutputStreamPtr>;
|
|
|
|
|
|
|
|
|
|
/// Создать для каждого шарда поток, который записывает соответствующие шардированные блоки.
|
|
|
|
|
PerShardOutput per_shard_output;
|
|
|
|
|
|
|
|
|
|
per_shard_output.reserve(job.paths.size());
|
|
|
|
|
for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no)
|
|
|
|
|
{
|
|
|
|
|
std::string new_part_tmp_path = data.getFullPath() + "reshard/" + toString(shard_no) + "/tmp_" + merged_name + "/";
|
|
|
|
|
Poco::File(new_part_tmp_path).createDirectories();
|
|
|
|
|
|
|
|
|
|
MergedBlockOutputStreamPtr output_stream;
|
2016-05-18 02:49:52 +00:00
|
|
|
|
output_stream = std::make_unique<MergedBlockOutputStream>(
|
|
|
|
|
data, new_part_tmp_path, column_names_and_types, compression_method, merged_column_to_size, aio_threshold);
|
2016-01-28 16:06:57 +00:00
|
|
|
|
per_shard_output.emplace(shard_no, std::move(output_stream));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.
|
|
|
|
|
/// В слитом куске строки с одинаковым ключом должны идти в порядке возрастания идентификатора исходного куска,
|
|
|
|
|
/// то есть (примерного) возрастания времени вставки.
|
|
|
|
|
std::unique_ptr<IProfilingBlockInputStream> merged_stream;
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
switch (data.merging_params.mode)
|
2016-01-28 16:06:57 +00:00
|
|
|
|
{
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Ordinary:
|
2016-01-28 16:06:57 +00:00
|
|
|
|
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
|
|
|
|
|
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
|
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Collapsing:
|
2016-01-28 16:06:57 +00:00
|
|
|
|
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
|
2016-04-15 17:13:51 +00:00
|
|
|
|
src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
|
2016-01-28 16:06:57 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Summing:
|
2016-01-28 16:06:57 +00:00
|
|
|
|
merged_stream = std::make_unique<SummingSortedBlockInputStream>(
|
2016-04-15 17:13:51 +00:00
|
|
|
|
src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
|
2016-01-28 16:06:57 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Aggregating:
|
2016-01-28 16:06:57 +00:00
|
|
|
|
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(
|
2016-04-15 17:13:51 +00:00
|
|
|
|
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
|
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 19:09:42 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Replacing:
|
|
|
|
|
merged_stream = std::make_unique<ReplacingSortedBlockInputStream>(
|
|
|
|
|
src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE);
|
2016-01-28 16:06:57 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2016-04-24 09:44:47 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Graphite:
|
|
|
|
|
merged_stream = std::make_unique<GraphiteRollupSortedBlockInputStream>(
|
|
|
|
|
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE,
|
|
|
|
|
data.merging_params.graphite_params, time(0));
|
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Unsorted:
|
2016-01-28 16:06:57 +00:00
|
|
|
|
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
default:
|
2016-04-15 17:42:51 +00:00
|
|
|
|
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.merging_params.mode), ErrorCodes::LOGICAL_ERROR);
|
2016-01-28 16:06:57 +00:00
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-01-28 16:06:57 +00:00
|
|
|
|
merged_stream->readPrefix();
|
|
|
|
|
|
|
|
|
|
for (auto & entry : per_shard_output)
|
2016-01-28 01:00:27 +00:00
|
|
|
|
{
|
2016-01-28 16:06:57 +00:00
|
|
|
|
MergedBlockOutputStreamPtr & output_stream = entry.second;
|
|
|
|
|
output_stream->writePrefix();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t rows_written = 0;
|
|
|
|
|
const size_t initial_reservation = disk_reservation ? disk_reservation->getSize() : 0;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-01-28 16:06:57 +00:00
|
|
|
|
MergeTreeSharder sharder(data, job);
|
2014-04-07 15:45:46 +00:00
|
|
|
|
|
2016-03-07 04:43:14 +00:00
|
|
|
|
while (Block block = merged_stream->read())
|
2016-01-28 16:06:57 +00:00
|
|
|
|
{
|
2016-08-26 19:50:04 +00:00
|
|
|
|
abortReshardPartitionIfRequested();
|
2016-01-28 16:06:57 +00:00
|
|
|
|
|
|
|
|
|
ShardedBlocksWithDateIntervals blocks = sharder.shardBlock(block);
|
|
|
|
|
|
|
|
|
|
for (ShardedBlockWithDateInterval & block_with_dates : blocks)
|
2016-01-28 01:00:27 +00:00
|
|
|
|
{
|
2016-08-26 19:50:04 +00:00
|
|
|
|
abortReshardPartitionIfRequested();
|
2016-01-28 16:06:57 +00:00
|
|
|
|
|
|
|
|
|
size_t shard_no = block_with_dates.shard_no;
|
|
|
|
|
MergeTreeData::MutableDataPartPtr & data_part = per_shard_data_parts.at(shard_no);
|
|
|
|
|
MergedBlockOutputStreamPtr & output_stream = per_shard_output.at(shard_no);
|
|
|
|
|
|
|
|
|
|
rows_written += block_with_dates.block.rows();
|
|
|
|
|
output_stream->write(block_with_dates.block);
|
|
|
|
|
|
|
|
|
|
if (block_with_dates.min_date < data_part->left_date)
|
|
|
|
|
data_part->left_date = block_with_dates.min_date;
|
|
|
|
|
if (block_with_dates.max_date > data_part->right_date)
|
|
|
|
|
data_part->right_date = block_with_dates.max_date;
|
|
|
|
|
|
2016-08-13 02:21:17 +00:00
|
|
|
|
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
|
|
|
|
|
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
|
2016-01-28 16:06:57 +00:00
|
|
|
|
|
|
|
|
|
if (disk_reservation)
|
|
|
|
|
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation));
|
2016-01-28 01:00:27 +00:00
|
|
|
|
}
|
2016-01-28 16:06:57 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
merged_stream->readSuffix();
|
|
|
|
|
|
|
|
|
|
/// Завершить инициализацию куски новых партиций.
|
|
|
|
|
for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no)
|
|
|
|
|
{
|
2016-08-26 19:50:04 +00:00
|
|
|
|
abortReshardPartitionIfRequested();
|
2016-01-28 16:06:57 +00:00
|
|
|
|
|
|
|
|
|
MergedBlockOutputStreamPtr & output_stream = per_shard_output.at(shard_no);
|
|
|
|
|
if (0 == output_stream->marksCount())
|
2016-01-28 01:00:27 +00:00
|
|
|
|
{
|
2016-01-28 16:06:57 +00:00
|
|
|
|
/// В этот шард не попало никаких данных. Игнорируем.
|
|
|
|
|
LOG_WARNING(log, "No data in partition for shard " + job.paths[shard_no].first);
|
|
|
|
|
per_shard_data_parts.erase(shard_no);
|
|
|
|
|
continue;
|
2016-01-28 01:00:27 +00:00
|
|
|
|
}
|
2016-01-28 16:06:57 +00:00
|
|
|
|
|
|
|
|
|
MergeTreeData::MutableDataPartPtr & data_part = per_shard_data_parts.at(shard_no);
|
|
|
|
|
|
2016-05-18 02:49:52 +00:00
|
|
|
|
data_part->columns = column_names_and_types;
|
2016-01-28 16:06:57 +00:00
|
|
|
|
data_part->checksums = output_stream->writeSuffixAndGetChecksums();
|
|
|
|
|
data_part->index.swap(output_stream->getIndex());
|
|
|
|
|
data_part->size = output_stream->marksCount();
|
|
|
|
|
data_part->modification_time = time(0);
|
|
|
|
|
data_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(output_stream->getPartPath());
|
|
|
|
|
data_part->is_sharded = true;
|
|
|
|
|
data_part->shard_no = shard_no;
|
2014-04-07 15:45:46 +00:00
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-01-28 16:06:57 +00:00
|
|
|
|
/// Превратить куски новых партиций в постоянные куски.
|
|
|
|
|
for (auto & entry : per_shard_data_parts)
|
|
|
|
|
{
|
|
|
|
|
size_t shard_no = entry.first;
|
|
|
|
|
MergeTreeData::MutableDataPartPtr & part_from_shard = entry.second;
|
|
|
|
|
part_from_shard->is_temp = false;
|
|
|
|
|
std::string prefix = data.getFullPath() + "reshard/" + toString(shard_no) + "/";
|
|
|
|
|
std::string old_name = part_from_shard->name;
|
|
|
|
|
std::string new_name = ActiveDataPartSet::getPartName(part_from_shard->left_date,
|
|
|
|
|
part_from_shard->right_date, part_from_shard->left, part_from_shard->right, part_from_shard->level);
|
|
|
|
|
part_from_shard->name = new_name;
|
|
|
|
|
Poco::File(prefix + old_name).renameTo(prefix + new_name);
|
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-01-28 16:06:57 +00:00
|
|
|
|
LOG_TRACE(log, "Resharded the partition " << job.partition);
|
|
|
|
|
|
|
|
|
|
return per_shard_data_parts;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-03-13 17:44:00 +00:00
|
|
|
|
size_t MergeTreeDataMerger::estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts)
|
|
|
|
|
{
|
|
|
|
|
size_t res = 0;
|
|
|
|
|
for (const MergeTreeData::DataPartPtr & part : parts)
|
|
|
|
|
res += part->size_in_bytes;
|
2015-06-11 00:35:36 +00:00
|
|
|
|
|
2014-03-13 17:44:00 +00:00
|
|
|
|
return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
|
|
|
|
|
}
|
|
|
|
|
|
2016-08-26 19:50:04 +00:00
|
|
|
|
void MergeTreeDataMerger::abortReshardPartitionIfRequested()
|
2016-01-28 16:06:57 +00:00
|
|
|
|
{
|
2016-08-26 19:50:04 +00:00
|
|
|
|
if (isCancelled())
|
2016-01-28 16:06:57 +00:00
|
|
|
|
throw Exception("Cancelled partition resharding", ErrorCodes::ABORTED);
|
|
|
|
|
|
2016-03-01 17:47:53 +00:00
|
|
|
|
if (cancellation_hook)
|
|
|
|
|
cancellation_hook();
|
|
|
|
|
}
|
2016-01-28 16:06:57 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|