Mixed parts by setting

This commit is contained in:
alesapin 2019-06-19 17:46:06 +03:00
parent dd2e4fd805
commit fefce49c8f
13 changed files with 48 additions and 32 deletions

View File

@ -27,13 +27,13 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
, min_compress_block_size(min_compress_block_size_) , min_compress_block_size(min_compress_block_size_)
, max_compress_block_size(max_compress_block_size_) , max_compress_block_size(max_compress_block_size_)
, aio_threshold(aio_threshold_) , aio_threshold(aio_threshold_)
, marks_file_extension(storage.settings.index_granularity_bytes ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension()) , marks_file_extension(storage.canUseAdaptiveGranularity() ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension())
, mark_size_in_bytes(storage.settings.index_granularity_bytes ? getAdaptiveMrkSize() : getNonAdaptiveMrkSize()) , mark_size_in_bytes(storage.canUseAdaptiveGranularity() ? getAdaptiveMrkSize() : getNonAdaptiveMrkSize())
, blocks_are_granules_size(blocks_are_granules_size_) , blocks_are_granules_size(blocks_are_granules_size_)
, index_granularity(index_granularity_) , index_granularity(index_granularity_)
, compute_granularity(index_granularity.empty()) , compute_granularity(index_granularity.empty())
, codec(std::move(codec_)) , codec(std::move(codec_))
, with_final_mark(storage.settings.write_final_mark && storage.settings.index_granularity_bytes) , with_final_mark(storage.settings.write_final_mark && storage.canUseAdaptiveGranularity())
{ {
if (blocks_are_granules_size && !index_granularity.empty()) if (blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR); throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);

View File

@ -743,6 +743,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
auto lock = lockParts(); auto lock = lockParts();
data_parts_indexes.clear(); data_parts_indexes.clear();
bool has_adaptive_parts = false, has_non_adaptive_parts = false;
for (const String & file_name : part_file_names) for (const String & file_name : part_file_names)
{ {
MergeTreePartInfo part_info; MergeTreePartInfo part_info;
@ -827,6 +828,10 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
continue; continue;
} }
if (!part->index_granularity_info.is_adaptive)
has_non_adaptive_parts = true;
else
has_adaptive_parts = true;
part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime(); part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime();
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later /// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
@ -836,6 +841,11 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART); throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
} }
if (has_non_adaptive_parts && has_adaptive_parts && !settings.enable_mixed_granularity_parts)
throw Exception("Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", ErrorCodes::LOGICAL_ERROR);
has_non_adaptive_index_granularity_parts = has_non_adaptive_parts;
if (suspicious_broken_parts > settings.max_suspicious_broken_parts && !skip_sanity_checks) if (suspicious_broken_parts > settings.max_suspicious_broken_parts && !skip_sanity_checks)
throw Exception("Suspiciously many (" + toString(suspicious_broken_parts) + ") broken parts to remove.", throw Exception("Suspiciously many (" + toString(suspicious_broken_parts) + ") broken parts to remove.",
ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS); ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);

View File

@ -574,6 +574,12 @@ public:
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0; virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
bool canUseAdaptiveGranularity() const
{
return settings.index_granularity_bytes != 0 &&
(settings.enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts);
}
MergeTreeDataFormatVersion format_version; MergeTreeDataFormatVersion format_version;
Context global_context; Context global_context;
@ -631,6 +637,8 @@ public:
/// For generating names of temporary parts during insertion. /// For generating names of temporary parts during insertion.
SimpleIncrement insert_increment; SimpleIncrement insert_increment;
bool has_non_adaptive_index_granularity_parts = false;
protected: protected:
friend struct MergeTreeDataPart; friend struct MergeTreeDataPart;
friend class MergeTreeDataMergerMutator; friend class MergeTreeDataMergerMutator;

View File

@ -138,7 +138,7 @@ MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & na
: storage(storage_) : storage(storage_)
, name(name_) , name(name_)
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version)) , info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
, index_granularity_info(storage.settings) , index_granularity_info(storage)
{ {
} }
@ -146,7 +146,7 @@ MergeTreeDataPart::MergeTreeDataPart(const MergeTreeData & storage_, const Strin
: storage(storage_) : storage(storage_)
, name(name_) , name(name_)
, info(info_) , info(info_)
, index_granularity_info(storage.settings) , index_granularity_info(storage)
{ {
} }

View File

@ -627,14 +627,10 @@ size_t roundRowsOrBytesToMarks(
size_t bytes_granularity) size_t bytes_granularity)
{ {
if (bytes_granularity == 0) if (bytes_granularity == 0)
{
return (rows_setting + rows_granularity - 1) / rows_granularity; return (rows_setting + rows_granularity - 1) / rows_granularity;
}
else else
{
return (bytes_setting + bytes_granularity - 1) / bytes_granularity; return (bytes_setting + bytes_granularity - 1) / bytes_granularity;
} }
}
} }

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h> #include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Poco/Path.h> #include <Poco/Path.h>
#include <Poco/File.h> #include <Poco/File.h>
#include <Poco/DirectoryIterator.h> #include <Poco/DirectoryIterator.h>
@ -22,14 +23,14 @@ std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(
} }
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo( MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(
const MergeTreeSettings & storage_settings) const MergeTreeData & storage)
{ {
fixed_index_granularity = storage_settings.index_granularity; fixed_index_granularity = storage.settings.index_granularity;
/// Granularity is fixed /// Granularity is fixed
if (storage_settings.index_granularity_bytes == 0) if (!storage.canUseAdaptiveGranularity())
setNonAdaptive(); setNonAdaptive();
else else
setAdaptive(storage_settings.index_granularity_bytes); setAdaptive(storage.settings.index_granularity_bytes);
} }

View File

@ -1,12 +1,12 @@
#pragma once #pragma once
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <optional> #include <optional>
#include <Core/Types.h> #include <Core/Types.h>
namespace DB namespace DB
{ {
class MergeTreeData;
/// Meta information about index granularity /// Meta information about index granularity
struct MergeTreeIndexGranularityInfo struct MergeTreeIndexGranularityInfo
{ {
@ -27,7 +27,7 @@ public:
size_t index_granularity_bytes; size_t index_granularity_bytes;
MergeTreeIndexGranularityInfo( MergeTreeIndexGranularityInfo(
const MergeTreeSettings & storage_settings); const MergeTreeData & storage);
void changeGranularityIfRequired(const std::string & path_to_part); void changeGranularityIfRequired(const std::string & path_to_part);

View File

@ -79,7 +79,8 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \ M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \
M(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \ M(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \
M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \ M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \
M(SettingBool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)") M(SettingBool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)") \
M(SettingBool, enable_mixed_granularity_parts, 0, "Enable parts with adaptive and non adaptive granularity")
DECLARE_SETTINGS_COLLECTION(LIST_OF_MERGE_TREE_SETTINGS) DECLARE_SETTINGS_COLLECTION(LIST_OF_MERGE_TREE_SETTINGS)

View File

@ -29,7 +29,7 @@ MergeTreeThreadSelectBlockInputStream::MergeTreeThreadSelectBlockInputStream(
/// round min_marks_to_read up to nearest multiple of block_size expressed in marks /// round min_marks_to_read up to nearest multiple of block_size expressed in marks
/// If granularity is adaptive it doesn't make sense /// If granularity is adaptive it doesn't make sense
/// Maybe it will make sence to add settings `max_block_size_bytes` /// Maybe it will make sence to add settings `max_block_size_bytes`
if (max_block_size_rows && storage.settings.index_granularity_bytes == 0) if (max_block_size_rows && !storage.canUseAdaptiveGranularity())
{ {
size_t fixed_index_granularity = storage.settings.index_granularity; size_t fixed_index_granularity = storage.settings.index_granularity;
min_marks_to_read = (min_marks_to_read_ * fixed_index_granularity + max_block_size_rows - 1) min_marks_to_read = (min_marks_to_read_ * fixed_index_granularity + max_block_size_rows - 1)

View File

@ -127,12 +127,12 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]); it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
} }
if (with_final_mark) if (with_final_mark && rows_count != 0)
writeFinalMark(it->name, it->type, offset_columns, false, serialize_settings.path); writeFinalMark(it->name, it->type, offset_columns, false, serialize_settings.path);
} }
} }
if (with_final_mark) if (with_final_mark && rows_count != 0)
index_granularity.appendMark(0); /// last mark index_granularity.appendMark(0); /// last mark
/// Finish skip index serialization /// Finish skip index serialization
@ -155,7 +155,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
if (index_stream) if (index_stream)
{ {
if (with_final_mark) if (with_final_mark && rows_count != 0)
{ {
for (size_t j = 0; j < index_columns.size(); ++j) for (size_t j = 0; j < index_columns.size(); ++j)
{ {
@ -415,7 +415,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
writeIntBinary(stream.compressed.offset(), stream.marks); writeIntBinary(stream.compressed.offset(), stream.marks);
/// Actually this numbers is redundant, but we have to store them /// Actually this numbers is redundant, but we have to store them
/// to be compatible with normal .mrk2 file format /// to be compatible with normal .mrk2 file format
if (storage.settings.index_granularity_bytes != 0) if (storage.canUseAdaptiveGranularity())
writeIntBinary(1UL, stream.marks); writeIntBinary(1UL, stream.marks);
} }
} }

View File

@ -6,7 +6,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh . $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS preferred_block_size_bytes" $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS preferred_block_size_bytes"
$CLICKHOUSE_CLIENT -q "CREATE TABLE preferred_block_size_bytes (p Date, s String) ENGINE = MergeTree(p, p, 1)" $CLICKHOUSE_CLIENT -q "CREATE TABLE preferred_block_size_bytes (p Date, s String) ENGINE = MergeTree PARTITION BY p ORDER BY p SETTINGS index_granularity=1, index_granularity_bytes=0"
$CLICKHOUSE_CLIENT -q "INSERT INTO preferred_block_size_bytes (s) SELECT '16_bytes_-_-_-_' AS s FROM system.numbers LIMIT 10, 90" $CLICKHOUSE_CLIENT -q "INSERT INTO preferred_block_size_bytes (s) SELECT '16_bytes_-_-_-_' AS s FROM system.numbers LIMIT 10, 90"
$CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE preferred_block_size_bytes" $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE preferred_block_size_bytes"
$CLICKHOUSE_CLIENT --preferred_block_size_bytes=26 -q "SELECT DISTINCT blockSize(), ignore(p, s) FROM preferred_block_size_bytes" $CLICKHOUSE_CLIENT --preferred_block_size_bytes=26 -q "SELECT DISTINCT blockSize(), ignore(p, s) FROM preferred_block_size_bytes"
@ -17,7 +17,7 @@ $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS preferred_block_size_bytes"
# PREWHERE using empty column # PREWHERE using empty column
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS pbs" $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS pbs"
$CLICKHOUSE_CLIENT -q "CREATE TABLE pbs (p Date, i UInt64, sa Array(String)) ENGINE = MergeTree(p, p, 100)" $CLICKHOUSE_CLIENT -q "CREATE TABLE pbs (p Date, i UInt64, sa Array(String)) ENGINE = MergeTree PARTITION BY p ORDER BY p SETTINGS index_granularity=100, index_granularity_bytes=0"
$CLICKHOUSE_CLIENT -q "INSERT INTO pbs (p, i, sa) SELECT toDate(i % 30) AS p, number AS i, ['a'] AS sa FROM system.numbers LIMIT 1000" $CLICKHOUSE_CLIENT -q "INSERT INTO pbs (p, i, sa) SELECT toDate(i % 30) AS p, number AS i, ['a'] AS sa FROM system.numbers LIMIT 1000"
$CLICKHOUSE_CLIENT -q "ALTER TABLE pbs ADD COLUMN s UInt8 DEFAULT 0" $CLICKHOUSE_CLIENT -q "ALTER TABLE pbs ADD COLUMN s UInt8 DEFAULT 0"
$CLICKHOUSE_CLIENT --preferred_block_size_bytes=100000 -q "SELECT count() FROM pbs PREWHERE s = 0" $CLICKHOUSE_CLIENT --preferred_block_size_bytes=100000 -q "SELECT count() FROM pbs PREWHERE s = 0"
@ -28,7 +28,7 @@ $CLICKHOUSE_CLIENT -q "DROP TABLE pbs"
# Nullable PREWHERE # Nullable PREWHERE
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS nullable_prewhere" $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS nullable_prewhere"
$CLICKHOUSE_CLIENT -q "CREATE TABLE nullable_prewhere (p Date, f Nullable(UInt64), d UInt64) ENGINE = MergeTree(p, p, 8)" $CLICKHOUSE_CLIENT -q "CREATE TABLE nullable_prewhere (p Date, f Nullable(UInt64), d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY p SETTINGS index_granularity=8, index_granularity_bytes=0"
$CLICKHOUSE_CLIENT -q "INSERT INTO nullable_prewhere SELECT toDate(0) AS p, if(number % 2 = 0, CAST(number AS Nullable(UInt64)), CAST(NULL AS Nullable(UInt64))) AS f, number as d FROM system.numbers LIMIT 1001" $CLICKHOUSE_CLIENT -q "INSERT INTO nullable_prewhere SELECT toDate(0) AS p, if(number % 2 = 0, CAST(number AS Nullable(UInt64)), CAST(NULL AS Nullable(UInt64))) AS f, number as d FROM system.numbers LIMIT 1001"
$CLICKHOUSE_CLIENT -q "SELECT sum(d), sum(f), max(d) FROM nullable_prewhere PREWHERE NOT isNull(f)" $CLICKHOUSE_CLIENT -q "SELECT sum(d), sum(f), max(d) FROM nullable_prewhere PREWHERE NOT isNull(f)"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS nullable_prewhere" $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS nullable_prewhere"

View File

@ -22,12 +22,12 @@ ${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS unsigned_integer_test_table;"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS enum_test_table;" ${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS enum_test_table;"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS date_test_table;" ${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS date_test_table;"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE string_test_table (val String) ENGINE = MergeTree ORDER BY val SETTINGS index_granularity = 1;" ${CLICKHOUSE_CLIENT} --query="CREATE TABLE string_test_table (val String) ENGINE = MergeTree ORDER BY val SETTINGS index_granularity = 1, index_granularity_bytes = 0;"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE fixed_string_test_table (val FixedString(1)) ENGINE = MergeTree ORDER BY val SETTINGS index_granularity = 1;" ${CLICKHOUSE_CLIENT} --query="CREATE TABLE fixed_string_test_table (val FixedString(1)) ENGINE = MergeTree ORDER BY val SETTINGS index_granularity = 1, index_granularity_bytes = 0;"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE signed_integer_test_table (val Int32) ENGINE = MergeTree ORDER BY val SETTINGS index_granularity = 1;" ${CLICKHOUSE_CLIENT} --query="CREATE TABLE signed_integer_test_table (val Int32) ENGINE = MergeTree ORDER BY val SETTINGS index_granularity = 1, index_granularity_bytes = 0;"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE unsigned_integer_test_table (val UInt32) ENGINE = MergeTree ORDER BY val SETTINGS index_granularity = 1;" ${CLICKHOUSE_CLIENT} --query="CREATE TABLE unsigned_integer_test_table (val UInt32) ENGINE = MergeTree ORDER BY val SETTINGS index_granularity = 1, index_granularity_bytes = 0;"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE enum_test_table (val Enum16('hello' = 1, 'world' = 2, 'yandex' = 256, 'clickhouse' = 257)) ENGINE = MergeTree ORDER BY val SETTINGS index_granularity = 1;" ${CLICKHOUSE_CLIENT} --query="CREATE TABLE enum_test_table (val Enum16('hello' = 1, 'world' = 2, 'yandex' = 256, 'clickhouse' = 257)) ENGINE = MergeTree ORDER BY val SETTINGS index_granularity = 1, index_granularity_bytes = 0;"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE date_test_table (val Date) ENGINE = MergeTree ORDER BY val SETTINGS index_granularity = 1;" ${CLICKHOUSE_CLIENT} --query="CREATE TABLE date_test_table (val Date) ENGINE = MergeTree ORDER BY val SETTINGS index_granularity = 1, index_granularity_bytes = 0;"
${CLICKHOUSE_CLIENT} --query="INSERT INTO string_test_table VALUES ('0'), ('2'), ('2');" ${CLICKHOUSE_CLIENT} --query="INSERT INTO string_test_table VALUES ('0'), ('2'), ('2');"