mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
store rows count for part on disk to allow tables without fixed size columns [#CLICKHOUSE-3000]
This commit is contained in:
parent
8ccebefae6
commit
b6b5d1cf25
@ -83,7 +83,7 @@ MergeTreeBlockSizePredictor::MergeTreeBlockSizePredictor(
|
||||
const MergeTreeData::DataPartPtr & data_part_, const Names & columns, const Block & sample_block)
|
||||
: data_part(data_part_)
|
||||
{
|
||||
number_of_rows_in_part = data_part->getExactSizeRows();
|
||||
number_of_rows_in_part = data_part->rows_count;
|
||||
/// Initialize with sample block untill update won't called.
|
||||
initialize(sample_block, columns);
|
||||
}
|
||||
|
@ -1091,7 +1091,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
|
||||
else
|
||||
{
|
||||
const IDataType & type = *new_primary_key_sample.safeGetByPosition(i).type;
|
||||
new_index[i] = type.createConstColumn(part->size, type.getDefault())->convertToFullColumnIfConst();
|
||||
new_index[i] = type.createConstColumn(part->marks_count, type.getDefault())->convertToFullColumnIfConst();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1102,7 +1102,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
|
||||
WriteBufferFromFile index_file(index_tmp_path);
|
||||
HashingWriteBuffer index_stream(index_file);
|
||||
|
||||
for (size_t i = 0, size = part->size; i < size; ++i)
|
||||
for (size_t i = 0, marks_count = part->marks_count; i < marks_count; ++i)
|
||||
for (size_t j = 0; j < new_key_size; ++j)
|
||||
new_primary_key_sample.getByPosition(j).type->serializeBinary(*new_index[j].get(), i, index_stream);
|
||||
|
||||
@ -1122,7 +1122,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
|
||||
/// Apply the expression and write the result to temporary files.
|
||||
if (expression)
|
||||
{
|
||||
MarkRanges ranges{MarkRange(0, part->size)};
|
||||
MarkRanges ranges{MarkRange(0, part->marks_count)};
|
||||
BlockInputStreamPtr part_in = std::make_shared<MergeTreeBlockInputStream>(
|
||||
*this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, expression->getRequiredColumns(), ranges,
|
||||
false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
|
||||
|
@ -62,7 +62,8 @@ namespace ErrorCodes
|
||||
/// Part directory - / partiiton-id _ min-id _ max-id _ level /
|
||||
/// Inside the part directory:
|
||||
/// The same files as for month-partitioned tables, plus
|
||||
/// partition.dat - contains the value of the partitioning expression
|
||||
/// count.txt - contains total number of rows in this part.
|
||||
/// partition.dat - contains the value of the partitioning expression.
|
||||
/// minmax_[Column].idx - MinMax indexes (see MergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression.
|
||||
///
|
||||
/// Several modes are implemented. Modes determine additional actions during merge:
|
||||
|
@ -499,7 +499,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
|
||||
std::shared_lock<std::shared_mutex> part_lock(part->columns_lock);
|
||||
|
||||
merge_entry->total_size_bytes_compressed += part->size_in_bytes;
|
||||
merge_entry->total_size_marks += part->size;
|
||||
merge_entry->total_size_marks += part->marks_count;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
|
||||
@ -557,7 +557,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
auto input = std::make_unique<MergeTreeBlockInputStream>(
|
||||
data, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, part->size)),
|
||||
data, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, part->marks_count)),
|
||||
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
|
||||
|
||||
input->setProgressCallback(MergeProgressCallback(
|
||||
@ -691,7 +691,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
|
||||
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
|
||||
{
|
||||
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
|
||||
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->size)},
|
||||
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->marks_count)},
|
||||
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
|
||||
|
||||
column_part_stream->setProgressCallback(MergeProgressCallbackVerticalStep(
|
||||
@ -755,7 +755,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
|
||||
to.writeSuffixAndFinalizePart(new_data_part, &all_columns, &checksums_gathered_columns);
|
||||
|
||||
/// For convenience, even CollapsingSortedBlockInputStream can not return zero rows.
|
||||
if (0 == to.marksCount())
|
||||
if (0 == to.getRowsCount())
|
||||
throw Exception("Empty part after merge", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
return new_data_part;
|
||||
@ -862,12 +862,16 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
|
||||
|
||||
/// Merge all parts of the partition.
|
||||
|
||||
size_t total_input_rows = 0;
|
||||
|
||||
for (const MergeTreeData::DataPartPtr & part : parts)
|
||||
{
|
||||
total_input_rows += part->rows_count;
|
||||
|
||||
std::shared_lock<std::shared_mutex> part_lock(part->columns_lock);
|
||||
|
||||
merge_entry->total_size_bytes_compressed += part->size_in_bytes;
|
||||
merge_entry->total_size_marks += part->size;
|
||||
merge_entry->total_size_marks += part->marks_count;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
|
||||
@ -882,22 +886,18 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
|
||||
|
||||
BlockInputStreams src_streams;
|
||||
|
||||
size_t sum_rows_approx = 0;
|
||||
|
||||
const auto rows_total = merge_entry->total_size_marks * data.index_granularity;
|
||||
|
||||
for (size_t i = 0; i < parts.size(); ++i)
|
||||
{
|
||||
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
|
||||
MarkRanges ranges(1, MarkRange(0, parts[i]->marks_count));
|
||||
|
||||
auto input = std::make_unique<MergeTreeBlockInputStream>(
|
||||
data, parts[i], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_names,
|
||||
ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
|
||||
|
||||
input->setProgressCallback([&merge_entry, rows_total] (const Progress & value)
|
||||
input->setProgressCallback([&merge_entry, total_input_rows] (const Progress & value)
|
||||
{
|
||||
const auto new_rows_read = merge_entry->rows_read += value.rows;
|
||||
merge_entry->progress = static_cast<Float64>(new_rows_read) / rows_total;
|
||||
merge_entry->progress = static_cast<Float64>(new_rows_read) / total_input_rows;
|
||||
merge_entry->bytes_read_uncompressed += value.bytes;
|
||||
});
|
||||
|
||||
@ -906,8 +906,6 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
|
||||
std::make_shared<ExpressionBlockInputStream>(BlockInputStreamPtr(std::move(input)), data.getPrimaryExpression())));
|
||||
else
|
||||
src_streams.emplace_back(std::move(input));
|
||||
|
||||
sum_rows_approx += parts[i]->size * data.index_granularity;
|
||||
}
|
||||
|
||||
/// Sharding of merged blocks.
|
||||
@ -1038,7 +1036,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
|
||||
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
|
||||
|
||||
if (disk_reservation)
|
||||
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation));
|
||||
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / total_input_rows)) * initial_reservation));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1050,7 +1048,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
|
||||
abortReshardPartitionIfRequested();
|
||||
|
||||
MergedBlockOutputStreamPtr & output_stream = per_shard_output.at(shard_no);
|
||||
if (0 == output_stream->marksCount())
|
||||
if (0 == output_stream->getRowsCount())
|
||||
{
|
||||
/// There was no data in this shard. Ignore.
|
||||
LOG_WARNING(log, "No data in partition for shard " + job.paths[shard_no].first);
|
||||
|
@ -319,6 +319,7 @@ void MergeTreeDataPart::MinMaxIndex::store(const MergeTreeData & storage, const
|
||||
HashingWriteBuffer out_hashing(out);
|
||||
type->serializeBinary(min_values[i], out_hashing);
|
||||
type->serializeBinary(max_values[i], out_hashing);
|
||||
out_hashing.next();
|
||||
checksums.files[file_name].file_size = out_hashing.count();
|
||||
checksums.files[file_name].file_hash = out_hashing.getHash();
|
||||
}
|
||||
@ -426,43 +427,6 @@ String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
|
||||
}
|
||||
|
||||
|
||||
size_t MergeTreeDataPart::getExactSizeRows() const
|
||||
{
|
||||
size_t rows_approx = storage.index_granularity * size;
|
||||
|
||||
for (const NameAndTypePair & column : columns)
|
||||
{
|
||||
ColumnPtr column_col = column.type->createColumn();
|
||||
const auto checksum = tryGetBinChecksum(column.name);
|
||||
|
||||
/// Should be fixed non-nullable column
|
||||
if (!checksum || !column_col->isFixed() || column_col->isNullable())
|
||||
continue;
|
||||
|
||||
size_t sizeof_field = column_col->sizeOfField();
|
||||
size_t rows = checksum->uncompressed_size / sizeof_field;
|
||||
|
||||
if (checksum->uncompressed_size % sizeof_field != 0)
|
||||
{
|
||||
throw Exception(
|
||||
"Column " + column.name + " has indivisible uncompressed size " + toString(checksum->uncompressed_size)
|
||||
+ ", sizeof " + toString(sizeof_field),
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
if (!(rows_approx - storage.index_granularity < rows && rows <= rows_approx))
|
||||
{
|
||||
throw Exception("Unexpected size of column " + column.name + ": " + toString(rows) + " rows",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
||||
throw Exception("Data part doesn't contain fixed size column (even Date column)", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
|
||||
String MergeTreeDataPart::getFullPath() const
|
||||
{
|
||||
if (relative_path.empty())
|
||||
@ -647,6 +611,7 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu
|
||||
loadColumns(require_columns_checksums);
|
||||
loadChecksums(require_columns_checksums);
|
||||
loadIndex();
|
||||
loadRowsCount(); /// Must be called after loadIndex() as it uses the value of `marks_count`.
|
||||
loadPartitionAndMinMaxIndex();
|
||||
if (check_consistency)
|
||||
checkConsistency(require_columns_checksums);
|
||||
@ -655,13 +620,12 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu
|
||||
|
||||
void MergeTreeDataPart::loadIndex()
|
||||
{
|
||||
/// Size - in number of marks.
|
||||
if (!size)
|
||||
if (!marks_count)
|
||||
{
|
||||
if (columns.empty())
|
||||
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
|
||||
size = Poco::File(getFullPath() + escapeForFileName(columns.front().name) + ".mrk")
|
||||
marks_count = Poco::File(getFullPath() + escapeForFileName(columns.front().name) + ".mrk")
|
||||
.getSize() / MERGE_TREE_MARK_SIZE;
|
||||
}
|
||||
|
||||
@ -675,20 +639,20 @@ void MergeTreeDataPart::loadIndex()
|
||||
for (size_t i = 0; i < key_size; ++i)
|
||||
{
|
||||
index[i] = storage.primary_key_data_types[i]->createColumn();
|
||||
index[i]->reserve(size);
|
||||
index[i]->reserve(marks_count);
|
||||
}
|
||||
|
||||
String index_path = getFullPath() + "primary.idx";
|
||||
ReadBufferFromFile index_file = openForReading(index_path);
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
for (size_t i = 0; i < marks_count; ++i)
|
||||
for (size_t j = 0; j < key_size; ++j)
|
||||
storage.primary_key_data_types[j]->deserializeBinary(*index[j].get(), index_file);
|
||||
|
||||
for (size_t i = 0; i < key_size; ++i)
|
||||
if (index[i]->size() != size)
|
||||
if (index[i]->size() != marks_count)
|
||||
throw Exception("Cannot read all data from index file " + index_path
|
||||
+ "(expected size: " + toString(size) + ", read: " + toString(index[i]->size()) + ")",
|
||||
+ "(expected size: " + toString(marks_count) + ", read: " + toString(index[i]->size()) + ")",
|
||||
ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||
|
||||
if (!index_file.eof())
|
||||
@ -740,6 +704,54 @@ void MergeTreeDataPart::loadChecksums(bool require)
|
||||
assertEOF(file);
|
||||
}
|
||||
|
||||
void MergeTreeDataPart::loadRowsCount()
|
||||
{
|
||||
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
{
|
||||
String path = getFullPath() + "count.txt";
|
||||
if (!Poco::File(path).exists())
|
||||
throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
|
||||
ReadBufferFromFile file = openForReading(path);
|
||||
readIntText(rows_count, file);
|
||||
assertEOF(file);
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t rows_approx = storage.index_granularity * marks_count;
|
||||
|
||||
for (const NameAndTypePair & column : columns)
|
||||
{
|
||||
ColumnPtr column_col = column.type->createColumn();
|
||||
const auto checksum = tryGetBinChecksum(column.name);
|
||||
|
||||
/// Should be fixed non-nullable column
|
||||
if (!checksum || !column_col->isFixed() || column_col->isNullable())
|
||||
continue;
|
||||
|
||||
size_t sizeof_field = column_col->sizeOfField();
|
||||
rows_count = checksum->uncompressed_size / sizeof_field;
|
||||
|
||||
if (checksum->uncompressed_size % sizeof_field != 0)
|
||||
{
|
||||
throw Exception(
|
||||
"Column " + column.name + " has indivisible uncompressed size " + toString(checksum->uncompressed_size)
|
||||
+ ", sizeof " + toString(sizeof_field),
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
if (!(rows_count <= rows_approx && rows_approx < rows_count + storage.index_granularity))
|
||||
throw Exception(
|
||||
"Unexpected size of column " + column.name + ": " + toString(rows_count) + " rows",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
throw Exception("Data part doesn't contain fixed size column (even Date column)", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeDataPart::accumulateColumnSizes(ColumnToSize & column_to_size) const
|
||||
{
|
||||
std::shared_lock<std::shared_mutex> part_lock(columns_lock);
|
||||
@ -799,6 +811,9 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
|
||||
|
||||
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
{
|
||||
if (!checksums.files.count("count.txt"))
|
||||
throw Exception("No checksum for count.txt", ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
|
||||
if (storage.partition_expr && !checksums.files.count("partition.dat"))
|
||||
throw Exception("No checksum for partition.dat", ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
|
||||
@ -827,6 +842,8 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
|
||||
|
||||
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
{
|
||||
check_file_not_empty(path + "count.txt");
|
||||
|
||||
if (storage.partition_expr)
|
||||
check_file_not_empty(path + "partition.dat");
|
||||
|
||||
|
@ -108,9 +108,6 @@ struct MergeTreeDataPart
|
||||
/// If no checksums are present returns the name of the first physically existing column.
|
||||
String getColumnNameWithMinumumCompressedSize() const;
|
||||
|
||||
/// If part has column with fixed size, will return exact size of part (in rows)
|
||||
size_t getExactSizeRows() const;
|
||||
|
||||
/// Returns full path to part dir
|
||||
String getFullPath() const;
|
||||
|
||||
@ -132,7 +129,8 @@ struct MergeTreeDataPart
|
||||
/// Examples: 'detached/tmp_fetch_<name>', 'tmp_<name>', '<name>'
|
||||
mutable String relative_path;
|
||||
|
||||
size_t size = 0; /// in number of marks.
|
||||
size_t rows_count = 0;
|
||||
size_t marks_count = 0;
|
||||
std::atomic<size_t> size_in_bytes {0}; /// size in bytes, 0 - if not counted;
|
||||
/// is used from several threads without locks (it is changed with ALTER).
|
||||
time_t modification_time = 0;
|
||||
@ -239,9 +237,13 @@ private:
|
||||
/// If checksums.txt exists, reads files' checksums (and sizes) from it
|
||||
void loadChecksums(bool require);
|
||||
|
||||
/// Loads index file. Also calculates this->size if size=0
|
||||
/// Loads index file. Also calculates this->marks_count if marks_count = 0
|
||||
void loadIndex();
|
||||
|
||||
/// Load rows count for this part from disk (for the newer storage format version).
|
||||
/// For the older format version calculates rows count from the size of a column with a fixed size.
|
||||
void loadRowsCount();
|
||||
|
||||
void loadPartitionAndMinMaxIndex();
|
||||
|
||||
void checkConsistency(bool require_part_metadata);
|
||||
|
@ -526,7 +526,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
|
||||
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
|
||||
ranges.ranges = markRangesFromPKRange(part->index, key_condition, settings);
|
||||
else
|
||||
ranges.ranges = MarkRanges{MarkRange{0, part->size}};
|
||||
ranges.ranges = MarkRanges{MarkRange{0, part->marks_count}};
|
||||
|
||||
if (!ranges.ranges.empty())
|
||||
{
|
||||
|
@ -125,6 +125,7 @@ void MergeTreePartition::store(const MergeTreeData & storage, const String & par
|
||||
HashingWriteBuffer out_hashing(out);
|
||||
for (size_t i = 0; i < value.size(); ++i)
|
||||
storage.partition_expr_column_types[i]->serializeBinary(value[i], out_hashing);
|
||||
out_hashing.next();
|
||||
checksums.files["partition.dat"].file_size = out_hashing.count();
|
||||
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
|
||||
}
|
||||
|
@ -403,7 +403,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con
|
||||
std::string filename = name + NULL_MAP_EXTENSION;
|
||||
|
||||
streams.emplace(filename, std::make_unique<Stream>(
|
||||
path + escaped_column_name, NULL_MAP_EXTENSION, data_part->size,
|
||||
path + escaped_column_name, NULL_MAP_EXTENSION, data_part->marks_count,
|
||||
all_mark_ranges, mark_cache, save_marks_in_cache,
|
||||
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
|
||||
|
||||
@ -425,7 +425,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con
|
||||
|
||||
if (!streams.count(size_name))
|
||||
streams.emplace(size_name, std::make_unique<Stream>(
|
||||
path + escaped_size_name, DATA_FILE_EXTENSION, data_part->size,
|
||||
path + escaped_size_name, DATA_FILE_EXTENSION, data_part->marks_count,
|
||||
all_mark_ranges, mark_cache, save_marks_in_cache,
|
||||
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
|
||||
|
||||
@ -436,7 +436,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con
|
||||
}
|
||||
else
|
||||
streams.emplace(name, std::make_unique<Stream>(
|
||||
path + escaped_column_name, DATA_FILE_EXTENSION, data_part->size,
|
||||
path + escaped_column_name, DATA_FILE_EXTENSION, data_part->marks_count,
|
||||
all_mark_ranges, mark_cache, save_marks_in_cache,
|
||||
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
|
||||
}
|
||||
|
@ -408,7 +408,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
|
||||
|
||||
column_streams.clear();
|
||||
|
||||
if (marks_count == 0)
|
||||
if (rows_count == 0)
|
||||
{
|
||||
/// A part is empty - all records are deleted.
|
||||
Poco::File(part_path).remove(true);
|
||||
@ -419,6 +419,13 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
|
||||
{
|
||||
new_part->partition.store(storage, part_path, checksums);
|
||||
new_part->minmax_idx.store(storage, part_path, checksums);
|
||||
|
||||
WriteBufferFromFile count_out(part_path + "count.txt", 4096);
|
||||
HashingWriteBuffer count_out_hashing(count_out);
|
||||
writeIntText(rows_count, count_out_hashing);
|
||||
count_out_hashing.next();
|
||||
checksums.files["count.txt"].file_size = count_out_hashing.count();
|
||||
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
|
||||
}
|
||||
|
||||
{
|
||||
@ -433,7 +440,8 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
|
||||
checksums.write(out);
|
||||
}
|
||||
|
||||
new_part->size = marks_count;
|
||||
new_part->rows_count = rows_count;
|
||||
new_part->marks_count = marks_count;
|
||||
new_part->modification_time = time(nullptr);
|
||||
new_part->columns = *total_column_list;
|
||||
new_part->index.swap(index_columns);
|
||||
@ -441,11 +449,6 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
|
||||
new_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(new_part->getFullPath());
|
||||
}
|
||||
|
||||
size_t MergedBlockOutputStream::marksCount()
|
||||
{
|
||||
return marks_count;
|
||||
}
|
||||
|
||||
void MergedBlockOutputStream::init()
|
||||
{
|
||||
Poco::File(part_path).createDirectories();
|
||||
@ -525,6 +528,8 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
|
||||
}
|
||||
}
|
||||
|
||||
rows_count += rows;
|
||||
|
||||
{
|
||||
/** While filling index (index_columns), disable memory tracker.
|
||||
* Because memory is allocated here (maybe in context of INSERT query),
|
||||
|
@ -130,8 +130,8 @@ public:
|
||||
const NamesAndTypesList * total_columns_list = nullptr,
|
||||
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
|
||||
|
||||
/// How many marks are already written.
|
||||
size_t marksCount();
|
||||
/// How many rows are already written.
|
||||
size_t getRowsCount() const { return rows_count; }
|
||||
|
||||
private:
|
||||
void init();
|
||||
@ -145,6 +145,7 @@ private:
|
||||
NamesAndTypesList columns_list;
|
||||
String part_path;
|
||||
|
||||
size_t rows_count = 0;
|
||||
size_t marks_count = 0;
|
||||
|
||||
std::unique_ptr<WriteBufferFromFile> index_file_stream;
|
||||
|
@ -209,7 +209,7 @@ BlockInputStreams StorageSystemParts::read(
|
||||
}
|
||||
block.getByPosition(i++).column->insert(part->name);
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(active_parts.count(part)));
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->size));
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->marks_count));
|
||||
|
||||
size_t marks_size = 0;
|
||||
for (const NameAndTypePair & it : part->columns)
|
||||
@ -221,7 +221,7 @@ BlockInputStreams StorageSystemParts::read(
|
||||
}
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(marks_size));
|
||||
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->getExactSizeRows()));
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->rows_count));
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->size_in_bytes));
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->modification_time));
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->remove_time));
|
||||
|
@ -48,3 +48,15 @@ Sum before DROP PARTITION:
|
||||
15
|
||||
Sum after DROP PARTITION:
|
||||
8
|
||||
*** Table without columns with fixed size ***
|
||||
Parts:
|
||||
1 1_1_1_0 2
|
||||
2 2_2_2_0 2
|
||||
Before DROP PARTITION:
|
||||
a
|
||||
aa
|
||||
b
|
||||
cc
|
||||
After DROP PARTITION:
|
||||
aa
|
||||
cc
|
||||
|
@ -89,3 +89,21 @@ SELECT 'Sum after DROP PARTITION:';
|
||||
SELECT sum(x) FROM test.partitioned_by_string;
|
||||
|
||||
DROP TABLE test.partitioned_by_string;
|
||||
|
||||
SELECT '*** Table without columns with fixed size ***';
|
||||
|
||||
DROP TABLE IF EXISTS test.without_fixed_size_columns;
|
||||
CREATE TABLE test.without_fixed_size_columns(s String) ENGINE MergeTree PARTITION BY length(s) ORDER BY s;
|
||||
|
||||
INSERT INTO test.without_fixed_size_columns VALUES ('a'), ('aa'), ('b'), ('cc');
|
||||
|
||||
SELECT 'Parts:';
|
||||
SELECT partition, name, rows FROM system.parts WHERE database = 'test' AND table = 'without_fixed_size_columns' AND active ORDER BY name;
|
||||
|
||||
SELECT 'Before DROP PARTITION:';
|
||||
SELECT * FROM test.without_fixed_size_columns ORDER BY s;
|
||||
ALTER TABLE test.without_fixed_size_columns DROP PARTITION 1;
|
||||
SELECT 'After DROP PARTITION:';
|
||||
SELECT * FROM test.without_fixed_size_columns ORDER BY s;
|
||||
|
||||
DROP TABLE test.without_fixed_size_columns;
|
||||
|
@ -48,3 +48,15 @@ Sum before DROP PARTITION:
|
||||
15
|
||||
Sum after DROP PARTITION:
|
||||
8
|
||||
*** Table without columns with fixed size ***
|
||||
Parts:
|
||||
1 1_0_0_1 2
|
||||
2 2_0_0_0 2
|
||||
Before DROP PARTITION:
|
||||
a
|
||||
aa
|
||||
b
|
||||
cc
|
||||
After DROP PARTITION:
|
||||
aa
|
||||
cc
|
||||
|
@ -103,3 +103,26 @@ SELECT sum(x) FROM test.partitioned_by_string_replica2;
|
||||
|
||||
DROP TABLE test.partitioned_by_string_replica1;
|
||||
DROP TABLE test.partitioned_by_string_replica2;
|
||||
|
||||
SELECT '*** Table without columns with fixed size ***';
|
||||
|
||||
DROP TABLE IF EXISTS test.without_fixed_size_columns_replica1;
|
||||
DROP TABLE IF EXISTS test.without_fixed_size_columns_replica2;
|
||||
CREATE TABLE test.without_fixed_size_columns_replica1(s String) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/without_fixed_size_columns', '1') PARTITION BY length(s) ORDER BY s;
|
||||
CREATE TABLE test.without_fixed_size_columns_replica2(s String) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/without_fixed_size_columns', '2') PARTITION BY length(s) ORDER BY s;
|
||||
|
||||
INSERT INTO test.without_fixed_size_columns_replica1 VALUES ('a'), ('aa'), ('b'), ('cc');
|
||||
|
||||
OPTIMIZE TABLE test.without_fixed_size_columns_replica2 PARTITION 1 FINAL; -- Wait for replication.
|
||||
|
||||
SELECT 'Parts:';
|
||||
SELECT partition, name, rows FROM system.parts WHERE database = 'test' AND table = 'without_fixed_size_columns_replica2' AND active ORDER BY name;
|
||||
|
||||
SELECT 'Before DROP PARTITION:';
|
||||
SELECT * FROM test.without_fixed_size_columns_replica2 ORDER BY s;
|
||||
ALTER TABLE test.without_fixed_size_columns_replica1 DROP PARTITION 1;
|
||||
SELECT 'After DROP PARTITION:';
|
||||
SELECT * FROM test.without_fixed_size_columns_replica2 ORDER BY s;
|
||||
|
||||
DROP TABLE test.without_fixed_size_columns_replica1;
|
||||
DROP TABLE test.without_fixed_size_columns_replica2;
|
||||
|
Loading…
Reference in New Issue
Block a user