ClickHouse/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp

308 lines
11 KiB
C++
Raw Normal View History

#include <Storages/MergeTree/MergedBlockOutputStream.h>
2019-06-19 10:07:56 +00:00
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <IO/createWriteBufferFromFileBase.h>
#include <Common/escapeForFileName.h>
#include <DataTypes/NestedUtils.h>
2018-11-15 14:06:54 +00:00
#include <DataStreams/MarkInCompressedFile.h>
#include <Common/StringUtils/StringUtils.h>
2017-07-13 20:58:19 +00:00
#include <Common/typeid_cast.h>
2017-04-08 01:32:05 +00:00
#include <Common/MemoryTracker.h>
#include <Poco/File.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
MergedBlockOutputStream::MergedBlockOutputStream(
2019-10-21 15:33:59 +00:00
const MergeTreeDataPartPtr & data_part_,
const NamesAndTypesList & columns_list_,
2019-03-18 12:02:33 +00:00
CompressionCodecPtr default_codec_,
2019-03-25 13:55:24 +00:00
bool blocks_are_granules_size_)
: IMergedBlockOutputStream(
2019-10-21 15:33:59 +00:00
data_part_, default_codec_,
{
data_part_->storage.global_context.getSettings().min_compress_block_size,
data_part_->storage.global_context.getSettings().max_compress_block_size,
data_part_->storage.global_context.getSettings().min_bytes_to_use_direct_io
},
2018-11-30 15:36:10 +00:00
blocks_are_granules_size_,
2019-10-21 15:33:59 +00:00
std::vector<MergeTreeIndexPtr>(std::begin(data_part_->storage.skip_indices), std::end(data_part_->storage.skip_indices)),
data_part_->storage.canUseAdaptiveGranularity())
2019-07-28 11:10:35 +00:00
, columns_list(columns_list_)
{
init();
2019-10-21 17:23:06 +00:00
writer = data_part_->getWriter(columns_list_, default_codec_, writer_settings);
}
MergedBlockOutputStream::MergedBlockOutputStream(
2019-10-21 15:33:59 +00:00
const MergeTreeDataPartPtr & data_part_,
const NamesAndTypesList & columns_list_,
2018-12-21 12:17:30 +00:00
CompressionCodecPtr default_codec_,
2019-10-28 11:00:29 +00:00
const MergeTreeData::DataPart::ColumnToSize & /* merged_column_to_size_ */, // FIXME
2018-11-30 15:36:10 +00:00
size_t aio_threshold_,
2019-03-25 13:55:24 +00:00
bool blocks_are_granules_size_)
: IMergedBlockOutputStream(
2019-10-21 15:33:59 +00:00
data_part_, default_codec_,
{
data_part_->storage.global_context.getSettings().min_compress_block_size,
data_part_->storage.global_context.getSettings().max_compress_block_size,
aio_threshold_
},
blocks_are_granules_size_,
2019-10-28 11:00:29 +00:00
std::vector<MergeTreeIndexPtr>(std::begin(data_part_->storage.skip_indices), std::end(data_part_->storage.skip_indices)),
data_part_->storage.canUseAdaptiveGranularity())
2019-07-28 11:10:35 +00:00
, columns_list(columns_list_)
{
init();
2019-10-21 17:23:06 +00:00
writer = data_part_->getWriter(columns_list_, default_codec_, writer_settings);
}
std::string MergedBlockOutputStream::getPartPath() const
{
return part_path;
}
2018-05-07 02:01:11 +00:00
/// If data is pre-sorted.
void MergedBlockOutputStream::write(const Block & block)
{
writeImpl(block, nullptr);
}
2017-03-12 19:18:07 +00:00
/** If the data is not sorted, but we pre-calculated the permutation, after which they will be sorted.
* This method is used to save RAM, since you do not need to keep two blocks at once - the source and the sorted.
*/
void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IColumn::Permutation * permutation)
{
writeImpl(block, permutation);
}
void MergedBlockOutputStream::writeSuffix()
{
throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
void MergedBlockOutputStream::writeSuffixAndFinalizePart(
MergeTreeData::MutableDataPartPtr & new_part,
const NamesAndTypesList * total_column_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums)
{
2019-10-21 17:23:06 +00:00
/// Finish write and get checksums.
MergeTreeData::DataPart::Checksums checksums;
2019-10-28 11:00:29 +00:00
if (additional_column_checksums)
checksums = std::move(*additional_column_checksums);
/// Finish columns serialization.
2019-10-21 17:23:06 +00:00
bool write_final_mark = (with_final_mark && rows_count != 0);
writer->finalize(checksums, write_final_mark);
2019-10-21 17:23:06 +00:00
if (write_final_mark)
index_granularity.appendMark(0); /// last mark
if (!total_column_list)
total_column_list = &columns_list;
if (index_stream)
{
2019-06-19 14:46:06 +00:00
if (with_final_mark && rows_count != 0)
{
for (size_t j = 0; j < index_columns.size(); ++j)
{
auto & column = *last_index_row[j].column;
index_columns[j]->insertFrom(column, 0); /// it has only one element
last_index_row[j].type->serializeBinary(column, 0, *index_stream);
}
last_index_row.clear();
}
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();
index_stream = nullptr;
}
2019-07-28 11:10:35 +00:00
finishSkipIndicesSerialization(checksums);
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
new_part->partition.store(storage, part_path, checksums);
2018-05-23 19:34:37 +00:00
if (new_part->minmax_idx.initialized)
new_part->minmax_idx.store(storage, part_path, checksums);
else if (rows_count)
2018-08-06 16:53:34 +00:00
throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
WriteBufferFromFile count_out(part_path + "count.txt", 4096);
HashingWriteBuffer count_out_hashing(count_out);
writeIntText(rows_count, count_out_hashing);
count_out_hashing.next();
checksums.files["count.txt"].file_size = count_out_hashing.count();
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
}
if (new_part->ttl_infos.part_min_ttl)
{
/// Write a file with ttl infos in json format.
WriteBufferFromFile out(part_path + "ttl.txt", 4096);
HashingWriteBuffer out_hashing(out);
new_part->ttl_infos.write(out_hashing);
checksums.files["ttl.txt"].file_size = out_hashing.count();
checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
}
{
/// Write a file with a description of columns.
WriteBufferFromFile out(part_path + "columns.txt", 4096);
total_column_list->writeText(out);
}
{
/// Write file with checksums.
WriteBufferFromFile out(part_path + "checksums.txt", 4096);
checksums.write(out);
}
new_part->rows_count = rows_count;
new_part->modification_time = time(nullptr);
new_part->columns = *total_column_list;
new_part->index.assign(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end()));
new_part->checksums = checksums;
new_part->bytes_on_disk = checksums.getTotalSizeOnDisk();
2019-03-25 13:55:24 +00:00
new_part->index_granularity = index_granularity;
}
void MergedBlockOutputStream::init()
{
Poco::File(part_path).createDirectories();
if (storage.hasPrimaryKey())
{
index_file_stream = std::make_unique<WriteBufferFromFile>(
part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
}
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
2019-07-28 11:10:35 +00:00
initSkipIndices();
}
void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Permutation * permutation)
{
block.checkNumberOfRows();
size_t rows = block.rows();
2018-12-04 14:44:42 +00:00
if (!rows)
return;
2018-11-30 15:36:10 +00:00
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical merge)
if (compute_granularity)
fillIndexGranularity(block);
2019-10-19 16:49:36 +00:00
Block primary_key_block;
Block skip_indexes_block;
auto primary_key_column_names = storage.primary_key_columns;
2019-10-16 18:27:53 +00:00
2019-02-14 16:59:26 +00:00
std::set<String> skip_indexes_column_names_set;
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
for (const auto & index : storage.skip_indices)
2019-02-14 16:59:26 +00:00
std::copy(index->columns.cbegin(), index->columns.cend(),
std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end()));
Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end());
for (size_t i = 0, size = primary_key_column_names.size(); i < size; ++i)
{
const auto & name = primary_key_column_names[i];
2019-10-16 18:27:53 +00:00
primary_key_block.insert(i, block.getByName(name));
/// Reorder primary key columns in advance and add them to `primary_key_columns`.
if (permutation)
2019-10-16 18:27:53 +00:00
{
auto & column = primary_key_block.getByPosition(i);
column.column = column.column->permute(*permutation, 0);
}
}
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
for (size_t i = 0, size = skip_indexes_column_names.size(); i < size; ++i)
{
const auto & name = skip_indexes_column_names[i];
2019-10-16 18:27:53 +00:00
skip_indexes_block.insert(i, block.getByName(name));
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
/// Reorder index columns in advance.
if (permutation)
2019-10-16 18:27:53 +00:00
{
auto & column = skip_indexes_block.getByPosition(i);
column.column = column.column->permute(*permutation, 0);
}
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
}
if (index_columns.empty())
{
index_columns.resize(primary_key_column_names.size());
last_index_row.resize(primary_key_column_names.size());
for (size_t i = 0, size = primary_key_column_names.size(); i < size; ++i)
{
2019-10-16 18:27:53 +00:00
last_index_row[i] = primary_key_block.getByPosition(i).cloneEmpty();
index_columns[i] = last_index_row[i].column->cloneEmpty();
}
}
2019-10-22 17:42:59 +00:00
size_t new_index_offset = writer->write(block, permutation, current_mark, index_offset, index_granularity, primary_key_block, skip_indexes_block).second;
rows_count += rows;
2019-08-30 14:29:08 +00:00
/// Should be written before index offset update, because we calculate,
/// indices of currently written granules
2019-10-21 15:33:59 +00:00
calculateAndSerializeSkipIndices(skip_indexes_block, rows);
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
{
/** While filling index (index_columns), disable memory tracker.
* Because memory is allocated here (maybe in context of INSERT query),
* but then freed in completely different place (while merging parts), where query memory_tracker is not available.
* And otherwise it will look like excessively growing memory consumption in context of query.
* (observed in long INSERT SELECTs)
*/
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
/// Write index. The index contains Primary Key value for each `index_granularity` row.
for (size_t i = index_offset; i < rows;)
{
if (storage.hasPrimaryKey())
{
2019-10-21 17:23:06 +00:00
for (size_t j = 0, size = primary_key_block.columns(); j < size; ++j)
{
2019-10-21 15:33:59 +00:00
const auto & primary_column = primary_key_block.getByPosition(j);
index_columns[j]->insertFrom(*primary_column.column, i);
primary_column.type->serializeBinary(*primary_column.column, i, *index_stream);
}
}
2018-11-30 15:36:10 +00:00
++current_mark;
2019-03-25 13:55:24 +00:00
if (current_mark < index_granularity.getMarksCount())
i += index_granularity.getMarkRows(current_mark);
else
break;
}
}
/// store last index row to write final mark at the end of column
2019-10-21 17:23:06 +00:00
for (size_t j = 0, size = primary_key_block.columns(); j < size; ++j)
{
2019-10-21 15:33:59 +00:00
const IColumn & primary_column = *primary_key_block.getByPosition(j).column.get();
auto mutable_column = std::move(*last_index_row[j].column).mutate();
if (!mutable_column->empty())
mutable_column->popBack(1);
mutable_column->insertFrom(primary_column, rows - 1);
last_index_row[j].column = std::move(mutable_column);
}
2018-11-30 15:36:10 +00:00
index_offset = new_index_offset;
}
}