ClickHouse/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp
2019-04-04 16:13:59 +03:00

233 lines
8.1 KiB
C++

#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Common/escapeForFileName.h>
#include <Common/HashTable/HashMap.h>
#include <Interpreters/AggregationCommon.h>
#include <IO/HashingWriteBuffer.h>
#include <Poco/File.h>
namespace ProfileEvents
{
extern const Event MergeTreeDataWriterBlocks;
extern const Event MergeTreeDataWriterBlocksAlreadySorted;
extern const Event MergeTreeDataWriterRows;
extern const Event MergeTreeDataWriterUncompressedBytes;
extern const Event MergeTreeDataWriterCompressedBytes;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
void buildScatterSelector(
const ColumnRawPtrs & columns,
PODArray<size_t> & partition_num_to_first_row,
IColumn::Selector & selector)
{
/// Use generic hashed variant since partitioning is unlikely to be a bottleneck.
using Data = HashMap<UInt128, size_t, UInt128TrivialHash>;
Data partitions_map;
size_t num_rows = columns[0]->size();
size_t partitions_count = 0;
for (size_t i = 0; i < num_rows; ++i)
{
Data::key_type key = hash128(i, columns.size(), columns);
typename Data::iterator it;
bool inserted;
partitions_map.emplace(key, it, inserted);
if (inserted)
{
partition_num_to_first_row.push_back(i);
it->getSecond() = partitions_count;
++partitions_count;
/// Optimization for common case when there is only one partition - defer selector initialization.
if (partitions_count == 2)
{
selector = IColumn::Selector(num_rows);
std::fill(selector.begin(), selector.begin() + i, 0);
}
}
if (partitions_count > 1)
selector[i] = it->getSecond();
}
}
}
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block)
{
BlocksWithPartition result;
if (!block || !block.rows())
return result;
data.check(block, true);
block.checkNumberOfRows();
if (!data.partition_key_expr) /// Table is not partitioned.
{
result.emplace_back(Block(block), Row());
return result;
}
Block block_copy = block;
data.partition_key_expr->execute(block_copy);
ColumnRawPtrs partition_columns;
partition_columns.reserve(data.partition_key_sample.columns());
for (const ColumnWithTypeAndName & element : data.partition_key_sample)
partition_columns.emplace_back(block_copy.getByName(element.name).column.get());
PODArray<size_t> partition_num_to_first_row;
IColumn::Selector selector;
buildScatterSelector(partition_columns, partition_num_to_first_row, selector);
size_t partitions_count = partition_num_to_first_row.size();
result.reserve(partitions_count);
auto get_partition = [&](size_t num)
{
Row partition(partition_columns.size());
for (size_t i = 0; i < partition_columns.size(); ++i)
partition[i] = Field((*partition_columns[i])[partition_num_to_first_row[num]]);
return partition;
};
if (partitions_count == 1)
{
/// A typical case is when there is one partition (you do not need to split anything).
/// NOTE: returning a copy of the original block so that calculated partition key columns
/// do not interfere with possible calculated primary key columns of the same name.
result.emplace_back(Block(block), get_partition(0));
return result;
}
for (size_t i = 0; i < partitions_count; ++i)
result.emplace_back(block.cloneEmpty(), get_partition(i));
for (size_t col = 0; col < block.columns(); ++col)
{
MutableColumns scattered = block.getByPosition(col).column->scatter(partitions_count, selector);
for (size_t i = 0; i < partitions_count; ++i)
result[i].block.getByPosition(col).column = std::move(scattered[i]);
}
return result;
}
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition)
{
Block & block = block_with_partition.block;
static const String TMP_PREFIX = "tmp_insert_";
/// This will generate unique name in scope of current server process.
Int64 temp_index = data.insert_increment.get();
MergeTreeDataPart::MinMaxIndex minmax_idx;
minmax_idx.update(block, data.minmax_idx_columns);
MergeTreePartition partition(std::move(block_with_partition.partition));
MergeTreePartInfo new_part_info(partition.getID(data.partition_key_sample), temp_index, temp_index, 0);
String part_name;
if (data.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
DayNum min_date(minmax_idx.parallelogram[data.minmax_idx_date_column_pos].left.get<UInt64>());
DayNum max_date(minmax_idx.parallelogram[data.minmax_idx_date_column_pos].right.get<UInt64>());
const auto & date_lut = DateLUT::instance();
DayNum min_month = date_lut.toFirstDayNumOfMonth(DayNum(min_date));
DayNum max_month = date_lut.toFirstDayNumOfMonth(DayNum(max_date));
if (min_month != max_month)
throw Exception("Logical error: part spans more than one month.", ErrorCodes::LOGICAL_ERROR);
part_name = new_part_info.getPartNameV0(min_date, max_date);
}
else
part_name = new_part_info.getPartName();
size_t expected_size = block.bytes();
auto reservation = data.reserveSpaceForPart(expected_size); ///@TODO_IGR ASK expected size
String part_absolute_path = reservation->getPath();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, part_absolute_path, part_name, new_part_info);
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->relative_path = TMP_PREFIX + part_name;
new_data_part->is_temp = true;
/// The name could be non-unique in case of stale files from previous runs.
String full_path = new_data_part->getFullPath();
Poco::File dir(full_path);
if (dir.exists())
{
LOG_WARNING(log, "Removing old temporary directory " + full_path);
dir.remove(true);
}
dir.createDirectories();
/// If we need to calculate some columns to sort.
if (data.hasSortingKey() || data.hasSkipIndices())
data.sorting_key_and_skip_indices_expr->execute(block);
Names sort_columns = data.sorting_key_columns;
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1);
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks);
/// Sort.
IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm;
if (!sort_description.empty())
{
if (!isAlreadySorted(block, sort_description))
{
stableGetPermutation(block, sort_description, perm);
perm_ptr = &perm;
}
else
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted);
}
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.global_context.chooseCompressionCodec(0, 0);
NamesAndTypesList columns = data.getColumns().getAllPhysical().filter(block.getNames());
MergedBlockOutputStream out(data, new_data_part->getFullPath(), columns, compression_codec);
out.writePrefix();
out.writeWithPermutation(block, perm_ptr);
out.writeSuffixAndFinalizePart(new_data_part);
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows());
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes());
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->bytes_on_disk);
return new_data_part;
}
}