2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/MergeTree/MergedBlockOutputStream.h>
|
2019-06-19 10:07:56 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/createWriteBufferFromFileBase.h>
|
|
|
|
#include <Common/escapeForFileName.h>
|
2017-12-25 18:58:39 +00:00
|
|
|
#include <DataTypes/NestedUtils.h>
|
2018-11-15 14:06:54 +00:00
|
|
|
#include <DataStreams/MarkInCompressedFile.h>
|
2018-01-15 19:07:47 +00:00
|
|
|
#include <Common/StringUtils/StringUtils.h>
|
2017-07-13 20:58:19 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2017-04-08 01:32:05 +00:00
|
|
|
#include <Common/MemoryTracker.h>
|
2017-01-21 04:24:28 +00:00
|
|
|
#include <Poco/File.h>
|
|
|
|
|
2016-07-21 16:22:24 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2019-03-14 23:10:51 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int BAD_ARGUMENTS;
|
|
|
|
}
|
|
|
|
|
2016-07-21 16:22:24 +00:00
|
|
|
|
|
|
|
MergedBlockOutputStream::MergedBlockOutputStream(
|
2019-10-21 15:33:59 +00:00
|
|
|
const MergeTreeDataPartPtr & data_part_,
|
2017-12-25 21:57:29 +00:00
|
|
|
const NamesAndTypesList & columns_list_,
|
2019-03-18 12:02:33 +00:00
|
|
|
CompressionCodecPtr default_codec_,
|
2019-03-25 13:55:24 +00:00
|
|
|
bool blocks_are_granules_size_)
|
2017-04-01 07:20:54 +00:00
|
|
|
: IMergedBlockOutputStream(
|
2019-10-21 15:33:59 +00:00
|
|
|
data_part_, default_codec_,
|
|
|
|
{
|
|
|
|
data_part_->storage.global_context.getSettings().min_compress_block_size,
|
|
|
|
data_part_->storage.global_context.getSettings().max_compress_block_size,
|
|
|
|
data_part_->storage.global_context.getSettings().min_bytes_to_use_direct_io
|
|
|
|
},
|
2018-11-30 15:36:10 +00:00
|
|
|
blocks_are_granules_size_,
|
2019-10-21 15:33:59 +00:00
|
|
|
std::vector<MergeTreeIndexPtr>(std::begin(data_part_->storage.skip_indices), std::end(data_part_->storage.skip_indices)),
|
|
|
|
data_part_->storage.canUseAdaptiveGranularity())
|
2019-07-28 11:10:35 +00:00
|
|
|
, columns_list(columns_list_)
|
2016-07-21 16:22:24 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
init();
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
MergedBlockOutputStream::MergedBlockOutputStream(
|
2019-10-21 15:33:59 +00:00
|
|
|
const MergeTreeDataPartPtr & data_part_,
|
2017-12-25 21:57:29 +00:00
|
|
|
const NamesAndTypesList & columns_list_,
|
2018-12-21 12:17:30 +00:00
|
|
|
CompressionCodecPtr default_codec_,
|
2019-10-21 15:33:59 +00:00
|
|
|
const MergeTreeData::DataPart::ColumnToSize & /* merged_column_to_size_ */,
|
2018-11-30 15:36:10 +00:00
|
|
|
size_t aio_threshold_,
|
2019-03-25 13:55:24 +00:00
|
|
|
bool blocks_are_granules_size_)
|
2017-04-01 07:20:54 +00:00
|
|
|
: IMergedBlockOutputStream(
|
2019-10-21 15:33:59 +00:00
|
|
|
data_part_, default_codec_,
|
|
|
|
{
|
|
|
|
data_part_->storage.global_context.getSettings().min_compress_block_size,
|
|
|
|
data_part_->storage.global_context.getSettings().max_compress_block_size,
|
|
|
|
aio_threshold_
|
|
|
|
},
|
|
|
|
blocks_are_granules_size_,
|
|
|
|
std::vector<MergeTreeIndexPtr>(std::begin(data_part_->storage.skip_indices), std::end(data_part_->storage.skip_indices)), {})
|
2019-07-28 11:10:35 +00:00
|
|
|
, columns_list(columns_list_)
|
2016-07-21 16:22:24 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
init();
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
std::string MergedBlockOutputStream::getPartPath() const
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
return part_path;
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
2018-05-07 02:01:11 +00:00
|
|
|
/// If data is pre-sorted.
|
2016-07-21 16:22:24 +00:00
|
|
|
void MergedBlockOutputStream::write(const Block & block)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
writeImpl(block, nullptr);
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
2017-03-12 19:18:07 +00:00
|
|
|
/** If the data is not sorted, but we pre-calculated the permutation, after which they will be sorted.
|
2017-04-01 07:20:54 +00:00
|
|
|
* This method is used to save RAM, since you do not need to keep two blocks at once - the source and the sorted.
|
|
|
|
*/
|
2016-07-21 16:22:24 +00:00
|
|
|
void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IColumn::Permutation * permutation)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
writeImpl(block, permutation);
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void MergedBlockOutputStream::writeSuffix()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED);
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
2017-08-30 19:03:19 +00:00
|
|
|
void MergedBlockOutputStream::writeSuffixAndFinalizePart(
|
|
|
|
MergeTreeData::MutableDataPartPtr & new_part,
|
2017-12-25 21:57:29 +00:00
|
|
|
const NamesAndTypesList * total_column_list,
|
2017-08-30 19:03:19 +00:00
|
|
|
MergeTreeData::DataPart::Checksums * additional_column_checksums)
|
2016-07-21 16:22:24 +00:00
|
|
|
{
|
2018-06-07 18:14:37 +00:00
|
|
|
/// Finish columns serialization.
|
|
|
|
{
|
2019-10-21 15:33:59 +00:00
|
|
|
/// FIXME
|
|
|
|
// const auto & settings = storage.global_context.getSettingsRef();
|
|
|
|
// IDataType::SerializeBinaryBulkSettings serialize_settings;
|
|
|
|
// serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
|
|
|
|
// serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
|
|
|
|
// WrittenOffsetColumns offset_columns;
|
|
|
|
// auto it = columns_list.begin();
|
|
|
|
// for (size_t i = 0; i < columns_list.size(); ++i, ++it)
|
|
|
|
// {
|
|
|
|
// if (!serialization_states.empty())
|
|
|
|
// {
|
|
|
|
// serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
|
|
|
|
// it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
|
|
|
|
// }
|
|
|
|
|
|
|
|
// if (with_final_mark && rows_count != 0)
|
|
|
|
// writeFinalMark(it->name, it->type, offset_columns, false, serialize_settings.path);
|
|
|
|
// }
|
2018-06-07 18:14:37 +00:00
|
|
|
}
|
|
|
|
|
2019-06-19 14:46:06 +00:00
|
|
|
if (with_final_mark && rows_count != 0)
|
2019-06-18 12:54:27 +00:00
|
|
|
index_granularity.appendMark(0); /// last mark
|
|
|
|
|
2017-08-30 19:03:19 +00:00
|
|
|
if (!total_column_list)
|
|
|
|
total_column_list = &columns_list;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Finish write and get checksums.
|
|
|
|
MergeTreeData::DataPart::Checksums checksums;
|
|
|
|
|
|
|
|
if (additional_column_checksums)
|
|
|
|
checksums = std::move(*additional_column_checksums);
|
|
|
|
|
2018-02-19 17:31:30 +00:00
|
|
|
if (index_stream)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2019-06-19 14:46:06 +00:00
|
|
|
if (with_final_mark && rows_count != 0)
|
2019-06-18 12:54:27 +00:00
|
|
|
{
|
|
|
|
for (size_t j = 0; j < index_columns.size(); ++j)
|
|
|
|
{
|
|
|
|
auto & column = *last_index_row[j].column;
|
|
|
|
index_columns[j]->insertFrom(column, 0); /// it has only one element
|
|
|
|
last_index_row[j].type->serializeBinary(column, 0, *index_stream);
|
|
|
|
}
|
|
|
|
last_index_row.clear();
|
|
|
|
}
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
index_stream->next();
|
|
|
|
checksums.files["primary.idx"].file_size = index_stream->count();
|
|
|
|
checksums.files["primary.idx"].file_hash = index_stream->getHash();
|
|
|
|
index_stream = nullptr;
|
|
|
|
}
|
|
|
|
|
2019-10-21 15:33:59 +00:00
|
|
|
/// FIXME
|
|
|
|
// for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
|
|
|
|
// {
|
|
|
|
// it->second->finalize();
|
|
|
|
// it->second->addToChecksums(checksums);
|
|
|
|
// }
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-07-28 11:10:35 +00:00
|
|
|
finishSkipIndicesSerialization(checksums);
|
|
|
|
|
2019-10-21 15:33:59 +00:00
|
|
|
// column_streams.clear();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-10-19 18:20:41 +00:00
|
|
|
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
|
|
|
{
|
|
|
|
new_part->partition.store(storage, part_path, checksums);
|
2018-05-23 19:34:37 +00:00
|
|
|
if (new_part->minmax_idx.initialized)
|
|
|
|
new_part->minmax_idx.store(storage, part_path, checksums);
|
2018-08-06 16:42:43 +00:00
|
|
|
else if (rows_count)
|
2018-08-06 16:53:34 +00:00
|
|
|
throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
|
2018-08-06 16:42:43 +00:00
|
|
|
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
|
2017-10-24 14:11:53 +00:00
|
|
|
|
|
|
|
WriteBufferFromFile count_out(part_path + "count.txt", 4096);
|
|
|
|
HashingWriteBuffer count_out_hashing(count_out);
|
|
|
|
writeIntText(rows_count, count_out_hashing);
|
|
|
|
count_out_hashing.next();
|
|
|
|
checksums.files["count.txt"].file_size = count_out_hashing.count();
|
|
|
|
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
2019-04-15 09:30:45 +00:00
|
|
|
if (new_part->ttl_infos.part_min_ttl)
|
|
|
|
{
|
|
|
|
/// Write a file with ttl infos in json format.
|
|
|
|
WriteBufferFromFile out(part_path + "ttl.txt", 4096);
|
|
|
|
HashingWriteBuffer out_hashing(out);
|
|
|
|
new_part->ttl_infos.write(out_hashing);
|
|
|
|
checksums.files["ttl.txt"].file_size = out_hashing.count();
|
|
|
|
checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
{
|
|
|
|
/// Write a file with a description of columns.
|
|
|
|
WriteBufferFromFile out(part_path + "columns.txt", 4096);
|
2017-08-30 19:03:19 +00:00
|
|
|
total_column_list->writeText(out);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
{
|
|
|
|
/// Write file with checksums.
|
|
|
|
WriteBufferFromFile out(part_path + "checksums.txt", 4096);
|
|
|
|
checksums.write(out);
|
|
|
|
}
|
|
|
|
|
2017-10-24 14:11:53 +00:00
|
|
|
new_part->rows_count = rows_count;
|
2017-08-30 19:03:19 +00:00
|
|
|
new_part->modification_time = time(nullptr);
|
|
|
|
new_part->columns = *total_column_list;
|
2017-12-15 20:48:46 +00:00
|
|
|
new_part->index.assign(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end()));
|
2017-08-30 19:03:19 +00:00
|
|
|
new_part->checksums = checksums;
|
2018-07-08 03:56:24 +00:00
|
|
|
new_part->bytes_on_disk = checksums.getTotalSizeOnDisk();
|
2019-03-25 13:55:24 +00:00
|
|
|
new_part->index_granularity = index_granularity;
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void MergedBlockOutputStream::init()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
Poco::File(part_path).createDirectories();
|
|
|
|
|
2018-02-19 17:31:30 +00:00
|
|
|
if (storage.hasPrimaryKey())
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
index_file_stream = std::make_unique<WriteBufferFromFile>(
|
|
|
|
part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
|
|
|
|
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
|
|
|
|
}
|
2019-02-05 14:50:25 +00:00
|
|
|
|
2019-07-28 11:10:35 +00:00
|
|
|
initSkipIndices();
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
2016-12-10 04:51:36 +00:00
|
|
|
|
2016-07-21 16:22:24 +00:00
|
|
|
void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Permutation * permutation)
|
|
|
|
{
|
2019-10-16 18:27:53 +00:00
|
|
|
std::cerr << "(MergedBlockOutputStream::writeImpl) block.rows(): " << block.rows() << "\n";
|
2017-04-01 07:20:54 +00:00
|
|
|
block.checkNumberOfRows();
|
|
|
|
size_t rows = block.rows();
|
2018-12-04 14:44:42 +00:00
|
|
|
if (!rows)
|
|
|
|
return;
|
2018-11-30 15:36:10 +00:00
|
|
|
|
|
|
|
/// Fill index granularity for this block
|
|
|
|
/// if it's unknown (in case of insert data or horizontal merge,
|
|
|
|
/// but not in case of vertical merge)
|
|
|
|
if (compute_granularity)
|
|
|
|
fillIndexGranularity(block);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-10-19 16:49:36 +00:00
|
|
|
Block primary_key_block;
|
|
|
|
Block skip_indexes_block;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-11-09 19:01:39 +00:00
|
|
|
auto primary_key_column_names = storage.primary_key_columns;
|
2019-10-16 18:27:53 +00:00
|
|
|
|
2019-02-14 16:59:26 +00:00
|
|
|
std::set<String> skip_indexes_column_names_set;
|
2019-02-05 14:50:25 +00:00
|
|
|
for (const auto & index : storage.skip_indices)
|
2019-02-14 16:59:26 +00:00
|
|
|
std::copy(index->columns.cbegin(), index->columns.cend(),
|
|
|
|
std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end()));
|
|
|
|
Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end());
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-10-11 14:53:23 +00:00
|
|
|
for (size_t i = 0, size = primary_key_column_names.size(); i < size; ++i)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-10-11 14:53:23 +00:00
|
|
|
const auto & name = primary_key_column_names[i];
|
2019-10-16 18:27:53 +00:00
|
|
|
primary_key_block.insert(i, block.getByName(name));
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-10-11 14:53:23 +00:00
|
|
|
/// Reorder primary key columns in advance and add them to `primary_key_columns`.
|
2017-04-01 07:20:54 +00:00
|
|
|
if (permutation)
|
2019-10-16 18:27:53 +00:00
|
|
|
{
|
|
|
|
auto & column = primary_key_block.getByPosition(i);
|
|
|
|
column.column = column.column->permute(*permutation, 0);
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
2019-02-05 14:50:25 +00:00
|
|
|
for (size_t i = 0, size = skip_indexes_column_names.size(); i < size; ++i)
|
|
|
|
{
|
|
|
|
const auto & name = skip_indexes_column_names[i];
|
2019-10-16 18:27:53 +00:00
|
|
|
skip_indexes_block.insert(i, block.getByName(name));
|
2019-02-05 14:50:25 +00:00
|
|
|
|
|
|
|
/// Reorder index columns in advance.
|
|
|
|
if (permutation)
|
2019-10-16 18:27:53 +00:00
|
|
|
{
|
|
|
|
auto & column = skip_indexes_block.getByPosition(i);
|
|
|
|
column.column = column.column->permute(*permutation, 0);
|
|
|
|
}
|
2019-02-05 14:50:25 +00:00
|
|
|
}
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (index_columns.empty())
|
|
|
|
{
|
2018-10-11 14:53:23 +00:00
|
|
|
index_columns.resize(primary_key_column_names.size());
|
2019-06-18 12:54:27 +00:00
|
|
|
last_index_row.resize(primary_key_column_names.size());
|
2018-10-11 14:53:23 +00:00
|
|
|
for (size_t i = 0, size = primary_key_column_names.size(); i < size; ++i)
|
2019-06-18 12:54:27 +00:00
|
|
|
{
|
2019-10-16 18:27:53 +00:00
|
|
|
last_index_row[i] = primary_key_block.getByPosition(i).cloneEmpty();
|
|
|
|
index_columns[i] = last_index_row[i].column->cloneEmpty();
|
2019-06-18 12:54:27 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
2019-10-21 15:33:59 +00:00
|
|
|
size_t new_index_offset = writer->write(block, current_mark, index_offset, index_granularity, primary_key_block, skip_indexes_block);
|
2017-10-24 14:11:53 +00:00
|
|
|
rows_count += rows;
|
2019-07-23 15:27:36 +00:00
|
|
|
|
2019-08-30 14:29:08 +00:00
|
|
|
/// Should be written before index offset update, because we calculate,
|
|
|
|
/// indices of currently written granules
|
2019-10-21 15:33:59 +00:00
|
|
|
calculateAndSerializeSkipIndices(skip_indexes_block, rows);
|
2019-02-05 14:50:25 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
/** While filling index (index_columns), disable memory tracker.
|
|
|
|
* Because memory is allocated here (maybe in context of INSERT query),
|
|
|
|
* but then freed in completely different place (while merging parts), where query memory_tracker is not available.
|
|
|
|
* And otherwise it will look like excessively growing memory consumption in context of query.
|
|
|
|
* (observed in long INSERT SELECTs)
|
|
|
|
*/
|
2018-05-31 15:54:08 +00:00
|
|
|
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
/// Write index. The index contains Primary Key value for each `index_granularity` row.
|
2019-03-18 15:54:58 +00:00
|
|
|
for (size_t i = index_offset; i < rows;)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-02-19 17:31:30 +00:00
|
|
|
if (storage.hasPrimaryKey())
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2019-10-21 15:33:59 +00:00
|
|
|
for (size_t j = 0, size = primary_key_block.rows(); j < size; ++j)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2019-10-21 15:33:59 +00:00
|
|
|
const auto & primary_column = primary_key_block.getByPosition(j);
|
|
|
|
index_columns[j]->insertFrom(*primary_column.column, i);
|
|
|
|
primary_column.type->serializeBinary(*primary_column.column, i, *index_stream);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-11-30 15:36:10 +00:00
|
|
|
++current_mark;
|
2019-03-25 13:55:24 +00:00
|
|
|
if (current_mark < index_granularity.getMarksCount())
|
|
|
|
i += index_granularity.getMarkRows(current_mark);
|
2019-03-18 15:54:58 +00:00
|
|
|
else
|
|
|
|
break;
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-06-18 12:54:27 +00:00
|
|
|
/// store last index row to write final mark at the end of column
|
2019-10-21 15:33:59 +00:00
|
|
|
for (size_t j = 0, size = primary_key_block.rows(); j < size; ++j)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2019-10-21 15:33:59 +00:00
|
|
|
const IColumn & primary_column = *primary_key_block.getByPosition(j).column.get();
|
2019-06-18 12:54:27 +00:00
|
|
|
auto mutable_column = std::move(*last_index_row[j].column).mutate();
|
|
|
|
if (!mutable_column->empty())
|
|
|
|
mutable_column->popBack(1);
|
|
|
|
mutable_column->insertFrom(primary_column, rows - 1);
|
|
|
|
last_index_row[j].column = std::move(mutable_column);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
2018-11-30 15:36:10 +00:00
|
|
|
index_offset = new_index_offset;
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|