Add ability to write final mark for *MergeTree engines family (#5624)

Add the ability to write final mark to *MergeTree engines family.
This commit is contained in:
alesapin 2019-06-18 15:54:27 +03:00 committed by GitHub
parent 69f1c48489
commit 4f98f875c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 979 additions and 600 deletions

View File

@ -721,6 +721,7 @@ inline void readText(Array &, ReadBuffer &) { throw Exception("Cannot read Array
inline void readQuoted(Array &, ReadBuffer &) { throw Exception("Cannot read Array.", ErrorCodes::NOT_IMPLEMENTED); }
/// It is assumed that all elements of the array have the same type.
/// Also write size and type into buf. UInt64 and Int64 is written in variadic size form
void writeBinary(const Array & x, WriteBuffer & buf);
void writeText(const Array & x, WriteBuffer & buf);

View File

@ -0,0 +1,353 @@
#include <Storages/MergeTree/IMergedBlockOutputStream.h>
#include <IO/createWriteBufferFromFileBase.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
constexpr auto DATA_FILE_EXTENSION = ".bin";
}
IMergedBlockOutputStream::IMergedBlockOutputStream(
MergeTreeData & storage_,
size_t min_compress_block_size_,
size_t max_compress_block_size_,
CompressionCodecPtr codec_,
size_t aio_threshold_,
bool blocks_are_granules_size_,
const MergeTreeIndexGranularity & index_granularity_)
: storage(storage_)
, min_compress_block_size(min_compress_block_size_)
, max_compress_block_size(max_compress_block_size_)
, aio_threshold(aio_threshold_)
, marks_file_extension(storage.index_granularity_info.marks_file_extension)
, mark_size_in_bytes(storage.index_granularity_info.mark_size_in_bytes)
, blocks_are_granules_size(blocks_are_granules_size_)
, index_granularity(index_granularity_)
, compute_granularity(index_granularity.empty())
, codec(std::move(codec_))
, with_final_mark(storage.settings.write_final_mark && storage.index_granularity_info.is_adaptive)
{
if (blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
}
void IMergedBlockOutputStream::addStreams(
const String & path,
const String & name,
const IDataType & type,
const CompressionCodecPtr & effective_codec,
size_t estimated_size,
bool skip_offsets)
{
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
{
if (skip_offsets && !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Shared offsets for Nested type.
if (column_streams.count(stream_name))
return;
column_streams[stream_name] = std::make_unique<ColumnStream>(
stream_name,
path + stream_name, DATA_FILE_EXTENSION,
path + stream_name, marks_file_extension,
effective_codec,
max_compress_block_size,
estimated_size,
aio_threshold);
};
IDataType::SubstreamPath stream_path;
type.enumerateStreams(callback, stream_path);
}
IDataType::OutputStreamGetter IMergedBlockOutputStream::createStreamGetter(
const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets)
{
return [&, skip_offsets] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
return nullptr;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return nullptr;
return &column_streams[stream_name]->compressed;
};
}
void fillIndexGranularityImpl(
const Block & block,
size_t index_granularity_bytes,
size_t fixed_index_granularity_rows,
bool blocks_are_granules,
size_t index_offset,
MergeTreeIndexGranularity & index_granularity)
{
size_t rows_in_block = block.rows();
size_t index_granularity_for_block;
if (index_granularity_bytes == 0)
index_granularity_for_block = fixed_index_granularity_rows;
else
{
size_t block_size_in_memory = block.bytes();
if (blocks_are_granules)
index_granularity_for_block = rows_in_block;
else if (block_size_in_memory >= index_granularity_bytes)
{
size_t granules_in_block = block_size_in_memory / index_granularity_bytes;
index_granularity_for_block = rows_in_block / granules_in_block;
}
else
{
size_t size_of_row_in_bytes = block_size_in_memory / rows_in_block;
index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes;
}
}
if (index_granularity_for_block == 0) /// very rare case when index granularity bytes less then single row
index_granularity_for_block = 1;
/// We should be less or equal than fixed index granularity
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
index_granularity.appendMark(index_granularity_for_block);
}
void IMergedBlockOutputStream::fillIndexGranularity(const Block & block)
{
fillIndexGranularityImpl(
block,
storage.index_granularity_info.index_granularity_bytes,
storage.index_granularity_info.fixed_index_granularity,
blocks_are_granules_size,
index_offset,
index_granularity);
}
void IMergedBlockOutputStream::writeSingleMark(
const String & name,
const IDataType & type,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
size_t number_of_rows,
DB::IDataType::SubstreamPath & path)
{
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return;
ColumnStream & stream = *column_streams[stream_name];
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
if (storage.index_granularity_info.is_adaptive)
writeIntBinary(number_of_rows, stream.marks);
}, path);
}
size_t IMergedBlockOutputStream::writeSingleGranule(
const String & name,
const IDataType & type,
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
IDataType::SerializeBinaryBulkSettings & serialize_settings,
size_t from_row,
size_t number_of_rows,
bool write_marks)
{
if (write_marks)
writeSingleMark(name, type, offset_columns, skip_offsets, number_of_rows, serialize_settings.path);
type.serializeBinaryBulkWithMultipleStreams(column, from_row, number_of_rows, serialize_settings, serialization_state);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return;
column_streams[stream_name]->compressed.nextIfAtEnd();
}, serialize_settings.path);
return from_row + number_of_rows;
}
std::pair<size_t, size_t> IMergedBlockOutputStream::writeColumn(
const String & name,
const IDataType & type,
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
size_t from_mark)
{
auto & settings = storage.global_context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = createStreamGetter(name, offset_columns, skip_offsets);
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
size_t total_rows = column.size();
size_t current_row = 0;
size_t current_column_mark = from_mark;
while (current_row < total_rows)
{
size_t rows_to_write;
bool write_marks = true;
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
if (current_row == 0 && index_offset != 0)
{
write_marks = false;
rows_to_write = index_offset;
}
else
{
if (index_granularity.getMarksCount() <= current_column_mark)
throw Exception(
"Incorrect size of index granularity expect mark " + toString(current_column_mark) + " totally have marks " + toString(index_granularity.getMarksCount()),
ErrorCodes::LOGICAL_ERROR);
rows_to_write = index_granularity.getMarkRows(current_column_mark);
}
current_row = writeSingleGranule(
name,
type,
column,
offset_columns,
skip_offsets,
serialization_state,
serialize_settings,
current_row,
rows_to_write,
write_marks
);
if (write_marks)
current_column_mark++;
}
/// Memoize offsets for Nested types, that are already written. They will not be written again for next columns of Nested structure.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets)
{
String stream_name = IDataType::getFileNameForStream(name, substream_path);
offset_columns.insert(stream_name);
}
}, serialize_settings.path);
return std::make_pair(current_column_mark, current_row - total_rows);
}
void IMergedBlockOutputStream::writeFinalMark(
const std::string & column_name,
const DataTypePtr column_type,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
DB::IDataType::SubstreamPath & path)
{
writeSingleMark(column_name, *column_type, offset_columns, skip_offsets, 0, path);
/// Memoize information about offsets
column_type->enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets)
{
String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
offset_columns.insert(stream_name);
}
}, path);
}
/// Implementation of IMergedBlockOutputStream::ColumnStream.
IMergedBlockOutputStream::ColumnStream::ColumnStream(
const String & escaped_column_name_,
const String & data_path,
const std::string & data_file_extension_,
const std::string & marks_path,
const std::string & marks_file_extension_,
const CompressionCodecPtr & compression_codec,
size_t max_compress_block_size,
size_t estimated_size,
size_t aio_threshold) :
escaped_column_name(escaped_column_name_),
data_file_extension{data_file_extension_},
marks_file_extension{marks_file_extension_},
plain_file(createWriteBufferFromFileBase(data_path + data_file_extension, estimated_size, aio_threshold, max_compress_block_size)),
plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_codec), compressed(compressed_buf),
marks_file(marks_path + marks_file_extension, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file)
{
}
void IMergedBlockOutputStream::ColumnStream::finalize()
{
compressed.next();
plain_file->next();
marks.next();
}
void IMergedBlockOutputStream::ColumnStream::sync()
{
plain_file->sync();
marks_file.sync();
}
void IMergedBlockOutputStream::ColumnStream::addToChecksums(MergeTreeData::DataPart::Checksums & checksums)
{
String name = escaped_column_name;
checksums.files[name + data_file_extension].is_compressed = true;
checksums.files[name + data_file_extension].uncompressed_size = compressed.count();
checksums.files[name + data_file_extension].uncompressed_hash = compressed.getHash();
checksums.files[name + data_file_extension].file_size = plain_hashing.count();
checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash();
checksums.files[name + marks_file_extension].file_size = marks.count();
checksums.files[name + marks_file_extension].file_hash = marks.getHash();
}
}

View File

@ -0,0 +1,148 @@
#pragma once
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/HashingWriteBuffer.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <DataStreams/IBlockOutputStream.h>
namespace DB
{
class IMergedBlockOutputStream : public IBlockOutputStream
{
public:
IMergedBlockOutputStream(
MergeTreeData & storage_,
size_t min_compress_block_size_,
size_t max_compress_block_size_,
CompressionCodecPtr default_codec_,
size_t aio_threshold_,
bool blocks_are_granules_size_,
const MergeTreeIndexGranularity & index_granularity_);
using WrittenOffsetColumns = std::set<std::string>;
protected:
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::vector<SerializationState>;
struct ColumnStream
{
ColumnStream(
const String & escaped_column_name_,
const String & data_path,
const std::string & data_file_extension_,
const std::string & marks_path,
const std::string & marks_file_extension_,
const CompressionCodecPtr & compression_codec,
size_t max_compress_block_size,
size_t estimated_size,
size_t aio_threshold);
String escaped_column_name;
std::string data_file_extension;
std::string marks_file_extension;
/// compressed -> compressed_buf -> plain_hashing -> plain_file
std::unique_ptr<WriteBufferFromFileBase> plain_file;
HashingWriteBuffer plain_hashing;
CompressedWriteBuffer compressed_buf;
HashingWriteBuffer compressed;
/// marks -> marks_file
WriteBufferFromFile marks_file;
HashingWriteBuffer marks;
void finalize();
void sync();
void addToChecksums(MergeTreeData::DataPart::Checksums & checksums);
};
using ColumnStreams = std::map<String, std::unique_ptr<ColumnStream>>;
void addStreams(const String & path, const String & name, const IDataType & type,
const CompressionCodecPtr & codec, size_t estimated_size, bool skip_offsets);
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);
/// Write data of one column.
/// Return how many marks were written and
/// how many rows were written for last mark
std::pair<size_t, size_t> writeColumn(
const String & name,
const IDataType & type,
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
size_t from_mark
);
/// Write single granule of one column (rows between 2 marks)
size_t writeSingleGranule(
const String & name,
const IDataType & type,
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
IDataType::SerializeBinaryBulkSettings & serialize_settings,
size_t from_row,
size_t number_of_rows,
bool write_marks);
/// Write mark for column
void writeSingleMark(
const String & name,
const IDataType & type,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
size_t number_of_rows,
DB::IDataType::SubstreamPath & path);
/// Count index_granularity for block and store in `index_granularity`
void fillIndexGranularity(const Block & block);
/// Write final mark to the end of column
void writeFinalMark(
const std::string & column_name,
const DataTypePtr column_type,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
DB::IDataType::SubstreamPath & path);
protected:
MergeTreeData & storage;
ColumnStreams column_streams;
/// The offset to the first row of the block for which you want to write the index.
size_t index_offset = 0;
size_t min_compress_block_size;
size_t max_compress_block_size;
size_t aio_threshold;
size_t current_mark = 0;
const std::string marks_file_extension;
const size_t mark_size_in_bytes;
const bool blocks_are_granules_size;
MergeTreeIndexGranularity index_granularity;
const bool compute_granularity;
CompressionCodecPtr codec;
const bool with_final_mark;
};
}

View File

@ -3,6 +3,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>

View File

@ -2,6 +2,7 @@
#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <Storages/MergeTree/SimpleMergeSelector.h>
#include <Storages/MergeTree/AllMergeSelector.h>

View File

@ -910,10 +910,15 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
if (marks_count == 0)
return res;
bool has_final_mark = part->index_granularity.hasFinalMark();
/// If index is not used.
if (key_condition.alwaysUnknownOrTrue())
{
res.push_back(MarkRange(0, marks_count));
if (has_final_mark)
res.push_back(MarkRange(0, marks_count - 1));
else
res.push_back(MarkRange(0, marks_count));
}
else
{
@ -940,18 +945,19 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
ranges_stack.pop_back();
bool may_be_true;
if (range.end == marks_count)
if (range.end == marks_count && !has_final_mark)
{
for (size_t i = 0; i < used_key_size; ++i)
{
index[i]->get(range.begin, index_left[i]);
}
may_be_true = key_condition.mayBeTrueAfter(
used_key_size, index_left.data(), data.primary_key_data_types);
}
else
{
if (has_final_mark && range.end == marks_count)
range.end -= 1; /// Remove final empty mark. It's useful only for primary key condition.
for (size_t i = 0; i < used_key_size; ++i)
{
index[i]->get(range.begin, index_left[i]);
@ -1010,9 +1016,13 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
size_t granules_dropped = 0;
size_t marks_count = part->getMarksCount();
size_t final_mark = part->index_granularity.hasFinalMark();
size_t index_marks_count = (marks_count - final_mark + index->granularity - 1) / index->granularity;
MergeTreeIndexReader reader(
index, part,
((part->getMarksCount() + index->granularity - 1) / index->granularity),
index_marks_count,
ranges);
MarkRanges res;

View File

@ -62,6 +62,11 @@ public:
return getMarkRows(last);
}
bool hasFinalMark() const
{
return getLastMarkRows() == 0;
}
bool empty() const
{
return marks_rows_partial_sums.empty();

View File

@ -78,7 +78,8 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingUInt64, finished_mutations_to_keep, 100, "How many records about mutations that are done to keep. If zero, then keep all of them.") \
M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \
M(SettingUInt64, index_granularity_bytes, 0, "Approximate amount of bytes in single granule (0 - disabled).") \
M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.")
M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \
M(SettingBool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)")
DECLARE_SETTINGS_COLLECTION(LIST_OF_MERGE_TREE_SETTINGS)

View File

@ -19,325 +19,10 @@ namespace ErrorCodes
namespace
{
constexpr auto DATA_FILE_EXTENSION = ".bin";
constexpr auto INDEX_FILE_EXTENSION = ".idx";
constexpr auto INDEX_FILE_EXTENSION = ".idx";
}
/// Implementation of IMergedBlockOutputStream.
IMergedBlockOutputStream::IMergedBlockOutputStream(
MergeTreeData & storage_,
size_t min_compress_block_size_,
size_t max_compress_block_size_,
CompressionCodecPtr codec_,
size_t aio_threshold_,
bool blocks_are_granules_size_,
const MergeTreeIndexGranularity & index_granularity_)
: storage(storage_)
, min_compress_block_size(min_compress_block_size_)
, max_compress_block_size(max_compress_block_size_)
, aio_threshold(aio_threshold_)
, marks_file_extension(storage.index_granularity_info.marks_file_extension)
, mark_size_in_bytes(storage.index_granularity_info.mark_size_in_bytes)
, blocks_are_granules_size(blocks_are_granules_size_)
, index_granularity(index_granularity_)
, compute_granularity(index_granularity.empty())
, codec(std::move(codec_))
{
if (blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
}
void IMergedBlockOutputStream::addStreams(
const String & path,
const String & name,
const IDataType & type,
const CompressionCodecPtr & effective_codec,
size_t estimated_size,
bool skip_offsets)
{
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
{
if (skip_offsets && !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Shared offsets for Nested type.
if (column_streams.count(stream_name))
return;
column_streams[stream_name] = std::make_unique<ColumnStream>(
stream_name,
path + stream_name, DATA_FILE_EXTENSION,
path + stream_name, marks_file_extension,
effective_codec,
max_compress_block_size,
estimated_size,
aio_threshold);
};
IDataType::SubstreamPath stream_path;
type.enumerateStreams(callback, stream_path);
}
IDataType::OutputStreamGetter IMergedBlockOutputStream::createStreamGetter(
const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets)
{
return [&, skip_offsets] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
return nullptr;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return nullptr;
return &column_streams[stream_name]->compressed;
};
}
void fillIndexGranularityImpl(
const Block & block,
size_t index_granularity_bytes,
size_t fixed_index_granularity_rows,
bool blocks_are_granules,
size_t index_offset,
MergeTreeIndexGranularity & index_granularity)
{
size_t rows_in_block = block.rows();
size_t index_granularity_for_block;
if (index_granularity_bytes == 0)
index_granularity_for_block = fixed_index_granularity_rows;
else
{
size_t block_size_in_memory = block.bytes();
if (blocks_are_granules)
index_granularity_for_block = rows_in_block;
else if (block_size_in_memory >= index_granularity_bytes)
{
size_t granules_in_block = block_size_in_memory / index_granularity_bytes;
index_granularity_for_block = rows_in_block / granules_in_block;
}
else
{
size_t size_of_row_in_bytes = block_size_in_memory / rows_in_block;
index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes;
}
}
if (index_granularity_for_block == 0) /// very rare case when index granularity bytes less then single row
index_granularity_for_block = 1;
/// We should be less or equal than fixed index granularity
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
index_granularity.appendMark(index_granularity_for_block);
}
void IMergedBlockOutputStream::fillIndexGranularity(const Block & block)
{
fillIndexGranularityImpl(
block,
storage.index_granularity_info.index_granularity_bytes,
storage.index_granularity_info.fixed_index_granularity,
blocks_are_granules_size,
index_offset,
index_granularity);
}
size_t IMergedBlockOutputStream::writeSingleGranule(
const String & name,
const IDataType & type,
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
IDataType::SerializeBinaryBulkSettings & serialize_settings,
size_t from_row,
size_t number_of_rows,
bool write_marks)
{
if (write_marks)
{
/// Write marks.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return;
ColumnStream & stream = *column_streams[stream_name];
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
if (storage.index_granularity_info.is_adaptive)
writeIntBinary(number_of_rows, stream.marks);
}, serialize_settings.path);
}
type.serializeBinaryBulkWithMultipleStreams(column, from_row, number_of_rows, serialize_settings, serialization_state);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return;
column_streams[stream_name]->compressed.nextIfAtEnd();
}, serialize_settings.path);
return from_row + number_of_rows;
}
std::pair<size_t, size_t> IMergedBlockOutputStream::writeColumn(
const String & name,
const IDataType & type,
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
size_t from_mark)
{
auto & settings = storage.global_context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = createStreamGetter(name, offset_columns, skip_offsets);
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
size_t total_rows = column.size();
size_t current_row = 0;
size_t current_column_mark = from_mark;
while (current_row < total_rows)
{
size_t rows_to_write;
bool write_marks = true;
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
if (current_row == 0 && index_offset != 0)
{
write_marks = false;
rows_to_write = index_offset;
}
else
{
if (index_granularity.getMarksCount() <= current_column_mark)
throw Exception(
"Incorrect size of index granularity expect mark " + toString(current_column_mark) + " totally have marks " + toString(index_granularity.getMarksCount()),
ErrorCodes::LOGICAL_ERROR);
rows_to_write = index_granularity.getMarkRows(current_column_mark);
}
current_row = writeSingleGranule(
name,
type,
column,
offset_columns,
skip_offsets,
serialization_state,
serialize_settings,
current_row,
rows_to_write,
write_marks
);
if (write_marks)
current_column_mark++;
}
/// Memoize offsets for Nested types, that are already written. They will not be written again for next columns of Nested structure.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets)
{
String stream_name = IDataType::getFileNameForStream(name, substream_path);
offset_columns.insert(stream_name);
}
}, serialize_settings.path);
return std::make_pair(current_column_mark, current_row - total_rows);
}
/// Implementation of IMergedBlockOutputStream::ColumnStream.
IMergedBlockOutputStream::ColumnStream::ColumnStream(
const String & escaped_column_name_,
const String & data_path,
const std::string & data_file_extension_,
const std::string & marks_path,
const std::string & marks_file_extension_,
const CompressionCodecPtr & compression_codec,
size_t max_compress_block_size,
size_t estimated_size,
size_t aio_threshold) :
escaped_column_name(escaped_column_name_),
data_file_extension{data_file_extension_},
marks_file_extension{marks_file_extension_},
plain_file(createWriteBufferFromFileBase(data_path + data_file_extension, estimated_size, aio_threshold, max_compress_block_size)),
plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_codec), compressed(compressed_buf),
marks_file(marks_path + marks_file_extension, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file)
{
}
void IMergedBlockOutputStream::ColumnStream::finalize()
{
compressed.next();
plain_file->next();
marks.next();
}
void IMergedBlockOutputStream::ColumnStream::sync()
{
plain_file->sync();
marks_file.sync();
}
void IMergedBlockOutputStream::ColumnStream::addToChecksums(MergeTreeData::DataPart::Checksums & checksums)
{
String name = escaped_column_name;
checksums.files[name + data_file_extension].is_compressed = true;
checksums.files[name + data_file_extension].uncompressed_size = compressed.count();
checksums.files[name + data_file_extension].uncompressed_hash = compressed.getHash();
checksums.files[name + data_file_extension].file_size = plain_hashing.count();
checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash();
checksums.files[name + marks_file_extension].file_size = marks.count();
checksums.files[name + marks_file_extension].file_hash = marks.getHash();
}
/// Implementation of MergedBlockOutputStream.
MergedBlockOutputStream::MergedBlockOutputStream(
MergeTreeData & storage_,
String part_path_,
@ -424,8 +109,8 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
const NamesAndTypesList * total_column_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums)
{
/// Finish columns serialization.
if (!serialization_states.empty())
{
auto & settings = storage.global_context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
@ -435,11 +120,20 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
{
serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
if (!serialization_states.empty())
{
serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
}
if (with_final_mark)
writeFinalMark(it->name, it->type, offset_columns, false, serialize_settings.path);
}
}
if (with_final_mark)
index_granularity.appendMark(0); /// last mark
/// Finish skip index serialization
for (size_t i = 0; i < storage.skip_indices.size(); ++i)
{
@ -460,6 +154,17 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
if (index_stream)
{
if (with_final_mark)
{
for (size_t j = 0; j < index_columns.size(); ++j)
{
auto & column = *last_index_row[j].column;
index_columns[j]->insertFrom(column, 0); /// it has only one element
last_index_row[j].type->serializeBinary(column, 0, *index_stream);
}
last_index_row.clear();
}
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();
@ -618,8 +323,12 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
if (index_columns.empty())
{
index_columns.resize(primary_key_column_names.size());
last_index_row.resize(primary_key_column_names.size());
for (size_t i = 0, size = primary_key_column_names.size(); i < size; ++i)
{
index_columns[i] = primary_key_columns[i].column->cloneEmpty();
last_index_row[i] = primary_key_columns[i].cloneEmpty();
}
}
if (serialization_states.empty())
@ -760,101 +469,18 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
}
}
index_offset = new_index_offset;
}
/// Implementation of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionCodecPtr default_codec_, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns,
const MergeTreeIndexGranularity & index_granularity_)
: IMergedBlockOutputStream(
storage_, storage_.global_context.getSettings().min_compress_block_size,
storage_.global_context.getSettings().max_compress_block_size, default_codec_,
storage_.global_context.getSettings().min_bytes_to_use_direct_io,
false,
index_granularity_),
header(header_), part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_),
already_written_offset_columns(already_written_offset_columns)
{
}
void MergedColumnOnlyOutputStream::write(const Block & block)
{
if (!initialized)
/// store last index row to write final mark at the end of column
for (size_t j = 0, size = primary_key_columns.size(); j < size; ++j)
{
column_streams.clear();
serialization_states.clear();
serialization_states.reserve(block.columns());
WrittenOffsetColumns tmp_offset_columns;
IDataType::SerializeBinaryBulkSettings settings;
for (size_t i = 0; i < block.columns(); ++i)
{
const auto & col = block.safeGetByPosition(i);
const auto columns = storage.getColumns();
addStreams(part_path, col.name, *col.type, columns.getCodecOrDefault(col.name, codec), 0, skip_offsets);
serialization_states.emplace_back(nullptr);
settings.getter = createStreamGetter(col.name, tmp_offset_columns, false);
col.type->serializeBinaryBulkStatePrefix(settings, serialization_states.back());
}
initialized = true;
}
size_t new_index_offset = 0;
size_t new_current_mark = 0;
WrittenOffsetColumns offset_columns = already_written_offset_columns;
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
std::tie(new_current_mark, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], current_mark);
const IColumn & primary_column = *primary_key_columns[j].column.get();
auto mutable_column = std::move(*last_index_row[j].column).mutate();
if (!mutable_column->empty())
mutable_column->popBack(1);
mutable_column->insertFrom(primary_column, rows - 1);
last_index_row[j].column = std::move(mutable_column);
}
index_offset = new_index_offset;
current_mark = new_current_mark;
}
void MergedColumnOnlyOutputStream::writeSuffix()
{
throw Exception("Method writeSuffix is not supported by MergedColumnOnlyOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums()
{
/// Finish columns serialization.
auto & settings = storage.global_context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
for (size_t i = 0, size = header.columns(); i < size; ++i)
{
auto & column = header.getByPosition(i);
serialize_settings.getter = createStreamGetter(column.name, already_written_offset_columns, skip_offsets);
column.type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
}
MergeTreeData::DataPart::Checksums checksums;
for (auto & column_stream : column_streams)
{
column_stream.second->finalize();
if (sync)
column_stream.second->sync();
column_stream.second->addToChecksums(checksums);
}
column_streams.clear();
serialization_states.clear();
initialized = false;
return checksums;
}
}

View File

@ -1,133 +1,12 @@
#pragma once
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/HashingWriteBuffer.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/MergeTree/IMergedBlockOutputStream.h>
#include <Columns/ColumnArray.h>
namespace DB
{
class IMergedBlockOutputStream : public IBlockOutputStream
{
public:
IMergedBlockOutputStream(
MergeTreeData & storage_,
size_t min_compress_block_size_,
size_t max_compress_block_size_,
CompressionCodecPtr default_codec_,
size_t aio_threshold_,
bool blocks_are_granules_size_,
const MergeTreeIndexGranularity & index_granularity_);
using WrittenOffsetColumns = std::set<std::string>;
protected:
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::vector<SerializationState>;
struct ColumnStream
{
ColumnStream(
const String & escaped_column_name_,
const String & data_path,
const std::string & data_file_extension_,
const std::string & marks_path,
const std::string & marks_file_extension_,
const CompressionCodecPtr & compression_codec,
size_t max_compress_block_size,
size_t estimated_size,
size_t aio_threshold);
String escaped_column_name;
std::string data_file_extension;
std::string marks_file_extension;
/// compressed -> compressed_buf -> plain_hashing -> plain_file
std::unique_ptr<WriteBufferFromFileBase> plain_file;
HashingWriteBuffer plain_hashing;
CompressedWriteBuffer compressed_buf;
HashingWriteBuffer compressed;
/// marks -> marks_file
WriteBufferFromFile marks_file;
HashingWriteBuffer marks;
void finalize();
void sync();
void addToChecksums(MergeTreeData::DataPart::Checksums & checksums);
};
using ColumnStreams = std::map<String, std::unique_ptr<ColumnStream>>;
void addStreams(const String & path, const String & name, const IDataType & type,
const CompressionCodecPtr & codec, size_t estimated_size, bool skip_offsets);
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);
/// Write data of one column.
/// Return how many marks were written and
/// how many rows were written for last mark
std::pair<size_t, size_t> writeColumn(
const String & name,
const IDataType & type,
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
size_t from_mark
);
/// Write single granule of one column (rows between 2 marks)
size_t writeSingleGranule(
const String & name,
const IDataType & type,
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
IDataType::SerializeBinaryBulkSettings & serialize_settings,
size_t from_row,
size_t number_of_rows,
bool write_marks);
/// Count index_granularity for block and store in `index_granularity`
void fillIndexGranularity(const Block & block);
MergeTreeData & storage;
ColumnStreams column_streams;
/// The offset to the first row of the block for which you want to write the index.
size_t index_offset = 0;
size_t min_compress_block_size;
size_t max_compress_block_size;
size_t aio_threshold;
size_t current_mark = 0;
const std::string marks_file_extension;
const size_t mark_size_in_bytes;
const bool blocks_are_granules_size;
MergeTreeIndexGranularity index_granularity;
const bool compute_granularity;
CompressionCodecPtr codec;
};
/** To write one part.
* The data refers to one partition, and is written in one part.
*/
@ -193,42 +72,13 @@ private:
std::unique_ptr<WriteBufferFromFile> index_file_stream;
std::unique_ptr<HashingWriteBuffer> index_stream;
MutableColumns index_columns;
/// Index columns values from the last row from the last block
/// It's written to index file in the `writeSuffixAndFinalizePart` method
ColumnsWithTypeAndName last_index_row;
std::vector<std::unique_ptr<ColumnStream>> skip_indices_streams;
MergeTreeIndexAggregators skip_indices_aggregators;
std::vector<size_t> skip_index_filling;
};
/// Writes only those columns that are in `header`
class MergedColumnOnlyOutputStream final : public IMergedBlockOutputStream
{
public:
/// skip_offsets: used when ALTERing columns if we know that array offsets are not altered.
/// Pass empty 'already_written_offset_columns' first time then and pass the same object to subsequent instances of MergedColumnOnlyOutputStream
/// if you want to serialize elements of Nested data structure in different instances of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionCodecPtr default_codec_, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns,
const MergeTreeIndexGranularity & index_granularity_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void writeSuffix() override;
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums();
private:
Block header;
SerializationStates serialization_states;
String part_path;
bool initialized = false;
bool sync;
bool skip_offsets;
/// To correctly write Nested elements column-by-column.
WrittenOffsetColumns & already_written_offset_columns;
};
}

View File

@ -0,0 +1,102 @@
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
namespace DB
{
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionCodecPtr default_codec_, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns,
const MergeTreeIndexGranularity & index_granularity_)
: IMergedBlockOutputStream(
storage_, storage_.global_context.getSettings().min_compress_block_size,
storage_.global_context.getSettings().max_compress_block_size, default_codec_,
storage_.global_context.getSettings().min_bytes_to_use_direct_io,
false,
index_granularity_),
header(header_), part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_),
already_written_offset_columns(already_written_offset_columns)
{
}
void MergedColumnOnlyOutputStream::write(const Block & block)
{
if (!initialized)
{
column_streams.clear();
serialization_states.clear();
serialization_states.reserve(block.columns());
WrittenOffsetColumns tmp_offset_columns;
IDataType::SerializeBinaryBulkSettings settings;
for (size_t i = 0; i < block.columns(); ++i)
{
const auto & col = block.safeGetByPosition(i);
const auto columns = storage.getColumns();
addStreams(part_path, col.name, *col.type, columns.getCodecOrDefault(col.name, codec), 0, skip_offsets);
serialization_states.emplace_back(nullptr);
settings.getter = createStreamGetter(col.name, tmp_offset_columns, false);
col.type->serializeBinaryBulkStatePrefix(settings, serialization_states.back());
}
initialized = true;
}
size_t new_index_offset = 0;
size_t new_current_mark = 0;
WrittenOffsetColumns offset_columns = already_written_offset_columns;
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
std::tie(new_current_mark, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], current_mark);
}
index_offset = new_index_offset;
current_mark = new_current_mark;
}
void MergedColumnOnlyOutputStream::writeSuffix()
{
throw Exception("Method writeSuffix is not supported by MergedColumnOnlyOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums()
{
/// Finish columns serialization.
auto & settings = storage.global_context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
WrittenOffsetColumns offset_columns;
for (size_t i = 0, size = header.columns(); i < size; ++i)
{
auto & column = header.getByPosition(i);
serialize_settings.getter = createStreamGetter(column.name, already_written_offset_columns, skip_offsets);
column.type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
if (with_final_mark)
writeFinalMark(column.name, column.type, offset_columns, skip_offsets, serialize_settings.path);
}
MergeTreeData::DataPart::Checksums checksums;
for (auto & column_stream : column_streams)
{
column_stream.second->finalize();
if (sync)
column_stream.second->sync();
column_stream.second->addToChecksums(checksums);
}
column_streams.clear();
serialization_states.clear();
initialized = false;
return checksums;
}
}

View File

@ -0,0 +1,40 @@
#pragma once
#include <Storages/MergeTree/IMergedBlockOutputStream.h>
namespace DB
{
/// Writes only those columns that are in `header`
class MergedColumnOnlyOutputStream final : public IMergedBlockOutputStream
{
public:
/// skip_offsets: used when ALTERing columns if we know that array offsets are not altered.
/// Pass empty 'already_written_offset_columns' first time then and pass the same object to subsequent instances of MergedColumnOnlyOutputStream
/// if you want to serialize elements of Nested data structure in different instances of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionCodecPtr default_codec_, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns,
const MergeTreeIndexGranularity & index_granularity_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void writeSuffix() override;
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums();
private:
Block header;
SerializationStates serialization_states;
String part_path;
bool initialized = false;
bool sync;
bool skip_offsets;
/// To correctly write Nested elements column-by-column.
WrittenOffsetColumns & already_written_offset_columns;
};
}

View File

@ -8,7 +8,7 @@
#include <Columns/ColumnVector.h>
// I know that inclusion of .cpp is not good at all
#include <Storages/MergeTree/MergedBlockOutputStream.cpp>
#include <Storages/MergeTree/IMergedBlockOutputStream.cpp>
using namespace DB;
Block getBlockWithSize(size_t required_size_in_bytes, size_t size_of_row_in_bytes)

View File

@ -8,7 +8,7 @@ CREATE TABLE zero_rows_per_granule (
v2 Int64,
Sign Int8
) ENGINE CollapsingMergeTree(Sign) PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes=20,
SETTINGS index_granularity_bytes=20, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;
@ -39,7 +39,7 @@ CREATE TABLE four_rows_per_granule (
v2 Int64,
Sign Int8
) ENGINE CollapsingMergeTree(Sign) PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes=110,
SETTINGS index_granularity_bytes=110, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;

View File

@ -7,7 +7,7 @@ CREATE TABLE zero_rows_per_granule (
k UInt64,
v1 UInt64,
v2 Int64
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 20;
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 20, write_final_mark = 0;
INSERT INTO zero_rows_per_granule (p, k, v1, v2) VALUES ('2018-05-15', 1, 1000, 2000), ('2018-05-16', 2, 3000, 4000), ('2018-05-17', 3, 5000, 6000), ('2018-05-18', 4, 7000, 8000);
@ -34,7 +34,7 @@ CREATE TABLE two_rows_per_granule (
k UInt64,
v1 UInt64,
v2 Int64
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 40;
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 40, write_final_mark = 0;
INSERT INTO two_rows_per_granule (p, k, v1, v2) VALUES ('2018-05-15', 1, 1000, 2000), ('2018-05-16', 2, 3000, 4000), ('2018-05-17', 3, 5000, 6000), ('2018-05-18', 4, 7000, 8000);
@ -61,7 +61,7 @@ CREATE TABLE four_rows_per_granule (
k UInt64,
v1 UInt64,
v2 Int64
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 110;
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 110, write_final_mark = 0;
INSERT INTO four_rows_per_granule (p, k, v1, v2) VALUES ('2018-05-15', 1, 1000, 2000), ('2018-05-16', 2, 3000, 4000), ('2018-05-17', 3, 5000, 6000), ('2018-05-18', 4, 7000, 8000);
@ -95,7 +95,7 @@ CREATE TABLE huge_granularity_small_blocks (
k UInt64,
v1 UInt64,
v2 Int64
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 1000000;
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 1000000, write_final_mark = 0;
INSERT INTO huge_granularity_small_blocks (p, k, v1, v2) VALUES ('2018-05-15', 1, 1000, 2000), ('2018-05-16', 2, 3000, 4000), ('2018-05-17', 3, 5000, 6000), ('2018-05-18', 4, 7000, 8000);
@ -126,7 +126,7 @@ CREATE TABLE adaptive_granularity_alter (
k UInt64,
v1 UInt64,
v2 Int64
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 110;
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 110, write_final_mark = 0;
INSERT INTO adaptive_granularity_alter (p, k, v1, v2) VALUES ('2018-05-15', 1, 1000, 2000), ('2018-05-16', 2, 3000, 4000), ('2018-05-17', 3, 5000, 6000), ('2018-05-18', 4, 7000, 8000);
@ -180,7 +180,7 @@ CREATE TABLE zero_rows_per_granule (
v1 UInt64,
v2 Int64
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes=20,
SETTINGS index_granularity_bytes=20, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;
@ -212,7 +212,7 @@ CREATE TABLE two_rows_per_granule (
v1 UInt64,
v2 Int64
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes=40,
SETTINGS index_granularity_bytes=40, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;
@ -243,7 +243,7 @@ CREATE TABLE four_rows_per_granule (
v1 UInt64,
v2 Int64
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes = 110,
SETTINGS index_granularity_bytes = 110, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;
@ -281,7 +281,7 @@ CREATE TABLE huge_granularity_small_blocks (
v1 UInt64,
v2 Int64
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes=1000000,
SETTINGS index_granularity_bytes=1000000, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;
@ -316,7 +316,7 @@ CREATE TABLE adaptive_granularity_alter (
v1 UInt64,
v2 Int64
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes=110,
SETTINGS index_granularity_bytes=110, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;

View File

@ -2,7 +2,7 @@ SET send_logs_level = 'none';
SELECT '----00489----';
DROP TABLE IF EXISTS pk;
CREATE TABLE pk (d Date DEFAULT '2000-01-01', x DateTime, y UInt64, z UInt64) ENGINE = MergeTree() PARTITION BY d ORDER BY (toStartOfMinute(x), y, z) SETTINGS index_granularity_bytes=19; -- one row granule
CREATE TABLE pk (d Date DEFAULT '2000-01-01', x DateTime, y UInt64, z UInt64) ENGINE = MergeTree() PARTITION BY d ORDER BY (toStartOfMinute(x), y, z) SETTINGS index_granularity_bytes=19, write_final_mark = 0; -- one row granule
INSERT INTO pk (x, y, z) VALUES (1, 11, 1235), (2, 11, 4395), (3, 22, 3545), (4, 22, 6984), (5, 33, 4596), (61, 11, 4563), (62, 11, 4578), (63, 11, 3572), (64, 22, 5786), (65, 22, 5786), (66, 22, 2791), (67, 22, 2791), (121, 33, 2791), (122, 33, 2791), (123, 33, 1235), (124, 44, 4935), (125, 44, 4578), (126, 55, 5786), (127, 55, 2791), (128, 55, 1235);
@ -34,7 +34,7 @@ SELECT '----00607----';
SET max_rows_to_read = 0;
DROP TABLE IF EXISTS merge_tree;
CREATE TABLE merge_tree (x UInt32) ENGINE = MergeTree ORDER BY x SETTINGS index_granularity_bytes = 4;
CREATE TABLE merge_tree (x UInt32) ENGINE = MergeTree ORDER BY x SETTINGS index_granularity_bytes = 4, write_final_mark = 0;
INSERT INTO merge_tree VALUES (0), (1);
SET force_primary_key = 1;
@ -61,7 +61,7 @@ CREATE TABLE large_alter_table_00926 (
somedate Date CODEC(ZSTD, ZSTD, ZSTD(12), LZ4HC(12)),
id UInt64 CODEC(LZ4, ZSTD, NONE, LZ4HC),
data String CODEC(ZSTD(2), LZ4HC, NONE, LZ4, LZ4)
) ENGINE = MergeTree() PARTITION BY somedate ORDER BY id SETTINGS index_granularity_bytes=40;
) ENGINE = MergeTree() PARTITION BY somedate ORDER BY id SETTINGS index_granularity_bytes=40, write_final_mark = 0;
INSERT INTO large_alter_table_00926 SELECT toDate('2019-01-01'), number, toString(number + rand()) FROM system.numbers LIMIT 300000;

View File

@ -7,7 +7,7 @@ CREATE TABLE zero_rows_per_granule (
v1 UInt64,
v2 Int64
) ENGINE ReplacingMergeTree() PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes=20,
SETTINGS index_granularity_bytes=20, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;
@ -38,7 +38,7 @@ CREATE TABLE two_rows_per_granule (
v1 UInt64,
v2 Int64
) ENGINE ReplacingMergeTree() PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes=40,
SETTINGS index_granularity_bytes=40, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;
@ -69,7 +69,7 @@ CREATE TABLE four_rows_per_granule (
v1 UInt64,
v2 Int64
) ENGINE ReplacingMergeTree() PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes = 110,
SETTINGS index_granularity_bytes = 110, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;
@ -115,7 +115,7 @@ CREATE TABLE huge_granularity_small_blocks (
v1 UInt64,
v2 Int64
) ENGINE ReplacingMergeTree() PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes=1000000,
SETTINGS index_granularity_bytes=1000000, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;
@ -151,7 +151,7 @@ CREATE TABLE adaptive_granularity_alter (
v1 UInt64,
v2 Int64
) ENGINE ReplacingMergeTree() PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes=110,
SETTINGS index_granularity_bytes=110, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;

View File

@ -9,7 +9,7 @@ CREATE TABLE zero_rows_per_granule (
Sign Int8,
Version UInt8
) ENGINE VersionedCollapsingMergeTree(Sign, Version) PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes=20,
SETTINGS index_granularity_bytes=20, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;
@ -41,7 +41,7 @@ CREATE TABLE four_rows_per_granule (
Sign Int8,
Version UInt8
) ENGINE VersionedCollapsingMergeTree(Sign, Version) PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes=120,
SETTINGS index_granularity_bytes=120, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;
@ -86,7 +86,7 @@ CREATE TABLE six_rows_per_granule (
Sign Int8,
Version UInt8
) ENGINE VersionedCollapsingMergeTree(Sign, Version) PARTITION BY toYYYYMM(p) ORDER BY k
SETTINGS index_granularity_bytes=170,
SETTINGS index_granularity_bytes=170, write_final_mark = 0,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;

View File

@ -8,14 +8,14 @@ CREATE TABLE test.zero_rows_per_granule1 (
k UInt64,
v1 UInt64,
v2 Int64
) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/zero_rows_in_granule', '1') PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 20;
) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/zero_rows_in_granule', '1') PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 20, write_final_mark = 0;
CREATE TABLE test.zero_rows_per_granule2 (
p Date,
k UInt64,
v1 UInt64,
v2 Int64
) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/zero_rows_in_granule', '2') PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 20;
) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/zero_rows_in_granule', '2') PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 20, write_final_mark = 0;
INSERT INTO test.zero_rows_per_granule1 (p, k, v1, v2) VALUES ('2018-05-15', 1, 1000, 2000), ('2018-05-16', 2, 3000, 4000), ('2018-05-17', 3, 5000, 6000), ('2018-05-18', 4, 7000, 8000);
@ -68,14 +68,14 @@ CREATE TABLE test.four_rows_per_granule1 (
k UInt64,
v1 UInt64,
v2 Int64
) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/four_rows_in_granule', '1') PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 110;
) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/four_rows_in_granule', '1') PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 110, write_final_mark = 0;
CREATE TABLE test.four_rows_per_granule2 (
p Date,
k UInt64,
v1 UInt64,
v2 Int64
) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/four_rows_in_granule', '2') PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 110;
) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/four_rows_in_granule', '2') PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 110, write_final_mark = 0;
INSERT INTO test.four_rows_per_granule1 (p, k, v1, v2) VALUES ('2018-05-15', 1, 1000, 2000), ('2018-05-16', 2, 3000, 4000), ('2018-05-17', 3, 5000, 6000), ('2018-05-18', 4, 7000, 8000);
@ -135,14 +135,14 @@ CREATE TABLE test.adaptive_granularity_alter1 (
k UInt64,
v1 UInt64,
v2 Int64
) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/adaptive_granularity_alter', '1') PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 110;
) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/adaptive_granularity_alter', '1') PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 110, write_final_mark = 0;
CREATE TABLE test.adaptive_granularity_alter2 (
p Date,
k UInt64,
v1 UInt64,
v2 Int64
) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/adaptive_granularity_alter', '2') PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 110;
) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/adaptive_granularity_alter', '2') PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 110, write_final_mark = 0;
INSERT INTO test.adaptive_granularity_alter1 (p, k, v1, v2) VALUES ('2018-05-15', 1, 1000, 2000), ('2018-05-16', 2, 3000, 4000), ('2018-05-17', 3, 5000, 6000), ('2018-05-18', 4, 7000, 8000);

View File

@ -0,0 +1,41 @@
===test insert===
0
2
===test merge===
0
2
===test alter===
0
2
===test mutation===
0
['q','q','q']
===test skip_idx===
0
2
===test alter attach===
1 1
2 1
3 1
4 2 Hello
5 2 World
1 1
2 1
3 1
4 2
5 2
6 3
7 3
===test alter update===
foo
foo
===test no pk===
0
2
0
2
===test a lot of marks===
0
4
0
7

View File

@ -0,0 +1,167 @@
SET send_logs_level = 'none';
SET allow_experimental_data_skipping_indices = 1;
DROP TABLE IF EXISTS mt_with_pk;
CREATE TABLE mt_with_pk (
d Date DEFAULT '2000-01-01',
x DateTime,
y Array(UInt64),
z UInt64,
n Nested (Age UInt8, Name String),
w Int16 DEFAULT 10
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(d) ORDER BY (x, z) SETTINGS index_granularity_bytes=10000; -- write_final_mark=1 by default
SELECT '===test insert===';
INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 12:57:57'), [1, 1, 1], 11, [77], ['Joe']), (toDate('2018-10-01'), toDateTime('2018-10-01 16:57:57'), [2, 2, 2], 12, [88], ['Mark']), (toDate('2018-10-01'), toDateTime('2018-10-01 19:57:57'), [3, 3, 3], 13, [99], ['Robert']);
SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
SELECT '===test merge===';
INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
OPTIMIZE TABLE mt_with_pk FINAL;
SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
SELECT '===test alter===';
ALTER TABLE mt_with_pk MODIFY COLUMN y Array(String);
INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 05:57:57'), ['a', 'a', 'a'], 14, [888, 999], ['Jack', 'Elvis']);
OPTIMIZE TABLE mt_with_pk FINAL;
SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
SELECT '===test mutation===';
ALTER TABLE mt_with_pk UPDATE w = 0 WHERE 1;
ALTER TABLE mt_with_pk UPDATE y = ['q', 'q', 'q'] WHERE 1;
SELECT sleep(1) FORMAT Null;
SELECT sum(w) FROM mt_with_pk;
SELECT distinct(y) FROM mt_with_pk;
OPTIMIZE TABLE mt_with_pk FINAL;
SELECT '===test skip_idx===';
ALTER TABLE mt_with_pk ADD INDEX idx1 z + w TYPE minmax GRANULARITY 1;
INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`, w) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 03:57:57'), ['z', 'z', 'z'], 15, [1111, 2222], ['Garry', 'Ron'], 1);
OPTIMIZE TABLE mt_with_pk FINAL;
SELECT COUNT(*) FROM mt_with_pk WHERE z + w > 5000;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
DROP TABLE IF EXISTS mt_with_pk;
SELECT '===test alter attach===';
DROP TABLE IF EXISTS alter_attach;
CREATE TABLE alter_attach (x UInt64, p UInt8) ENGINE = MergeTree ORDER BY tuple() PARTITION BY p SETTINGS index_granularity_bytes=10000, write_final_mark=1;
INSERT INTO alter_attach VALUES (1, 1), (2, 1), (3, 1);
ALTER TABLE alter_attach DETACH PARTITION 1;
ALTER TABLE alter_attach ADD COLUMN s String;
INSERT INTO alter_attach VALUES (4, 2, 'Hello'), (5, 2, 'World');
ALTER TABLE alter_attach ATTACH PARTITION 1;
SELECT * FROM alter_attach ORDER BY x;
ALTER TABLE alter_attach DETACH PARTITION 2;
ALTER TABLE alter_attach DROP COLUMN s;
INSERT INTO alter_attach VALUES (6, 3), (7, 3);
ALTER TABLE alter_attach ATTACH PARTITION 2;
SELECT * FROM alter_attach ORDER BY x;
DROP TABLE IF EXISTS alter_attach;
DROP TABLE IF EXISTS mt_with_pk;
SELECT '===test alter update===';
DROP TABLE IF EXISTS alter_update_00806;
CREATE TABLE alter_update_00806 (d Date, e Enum8('foo'=1, 'bar'=2)) Engine = MergeTree PARTITION BY d ORDER BY (d) SETTINGS index_granularity_bytes=10000, write_final_mark=1;
INSERT INTO alter_update_00806 (d, e) VALUES ('2018-01-01', 'foo');
INSERT INTO alter_update_00806 (d, e) VALUES ('2018-01-02', 'bar');
ALTER TABLE alter_update_00806 UPDATE e = CAST('foo', 'Enum8(\'foo\' = 1, \'bar\' = 2)') WHERE d='2018-01-02';
SELECT sleep(1) FORMAT Null;
SELECT e FROM alter_update_00806 ORDER BY d;
DROP TABLE IF EXISTS alter_update_00806;
SELECT '===test no pk===';
DROP TABLE IF EXISTS mt_without_pk;
CREATE TABLE mt_without_pk (
d Date DEFAULT '2000-01-01',
x DateTime,
y Array(UInt64),
z UInt64,
n Nested (Age UInt8, Name String),
w Int16 DEFAULT 10
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(d) ORDER BY tuple() SETTINGS index_granularity_bytes=10000, write_final_mark=1;
INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 12:57:57'), [1, 1, 1], 11, [77], ['Joe']), (toDate('2018-10-01'), toDateTime('2018-10-01 16:57:57'), [2, 2, 2], 12, [88], ['Mark']), (toDate('2018-10-01'), toDateTime('2018-10-01 19:57:57'), [3, 3, 3], 13, [99], ['Robert']);
SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1;
INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
OPTIMIZE TABLE mt_without_pk FINAL;
SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1;
DROP TABLE IF EXISTS mt_without_pk;
SELECT '===test a lot of marks===';
DROP TABLE IF EXISTS mt_with_small_granularity;
CREATE TABLE mt_with_small_granularity (
d Date DEFAULT '2000-01-01',
x DateTime,
y Array(UInt64),
z UInt64,
n Nested (Age UInt8, Name String),
w Int16 DEFAULT 10
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(d) ORDER BY (x, z) SETTINGS index_granularity_bytes=30, write_final_mark=1;
INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 12:57:57'), [1, 1, 1], 11, [77], ['Joe']), (toDate('2018-10-01'), toDateTime('2018-10-01 16:57:57'), [2, 2, 2], 12, [88], ['Mark']), (toDate('2018-10-01'), toDateTime('2018-10-01 19:57:57'), [3, 3, 3], 13, [99], ['Robert']);
SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1;
INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
OPTIMIZE TABLE mt_with_small_granularity FINAL;
SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1;
DROP TABLE IF EXISTS mt_with_small_granularity;

View File

@ -0,0 +1,4 @@
2
"rows_read": 0,
2
"rows_read": 0,

View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query="
CREATE TABLE mt_with_pk (
d Date DEFAULT '2000-01-01',
x DateTime,
y Array(UInt64),
z UInt64,
n Nested (Age UInt8, Name String),
w Int16 DEFAULT 10
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(d) ORDER BY (x, z) SETTINGS index_granularity_bytes=10000, write_final_mark=1;"
$CLICKHOUSE_CLIENT --query="INSERT INTO mt_with_pk (d, x, y, z, n.Age, n.Name) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 12:57:57'), [1, 1, 1], 11, [77], ['Joe']), (toDate('2018-10-01'), toDateTime('2018-10-01 16:57:57'), [2, 2, 2], 12, [88], ['Mark']), (toDate('2018-10-01'), toDateTime('2018-10-01 19:57:57'), [3, 3, 3], 13, [99], ['Robert']);"
$CLICKHOUSE_CLIENT --query="SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;"
$CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57') FORMAT JSON;" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="INSERT INTO mt_with_pk (d, x, y, z, n.Age, n.Name) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);"
$CLICKHOUSE_CLIENT --query="OPTIMIZE TABLE mt_with_pk FINAL"
$CLICKHOUSE_CLIENT --query="SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;"
$CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57') FORMAT JSON;" | grep "rows_read"