Merge branch 'variant-new-serialization' of github.com:Avogar/ClickHouse into variant-dynamic-null-subcolumn

This commit is contained in:
avogar 2024-07-06 13:59:43 +00:00
commit 5fcce61dd3
10 changed files with 497 additions and 89 deletions

View File

@ -974,6 +974,13 @@ Default value: false
- [exclude_deleted_rows_for_part_size_in_merge](#exclude_deleted_rows_for_part_size_in_merge) setting
## use_compact_variant_discriminators_serialization {#use_compact_variant_discriminators_serialization}
Enables compact mode for binary serialization of discriminators in Variant data type.
This mode allows to use significantly less memory for storing discriminators in parts when there is mostly one variant or a lot of NULL values.
Default value: true
## merge_workload
Used to regulate how resources are utilized and shared between merges and other workloads. Specified value is used as `workload` setting value for background merges of this table. If not specified (empty string), then server setting `merge_workload` is used instead.

View File

@ -256,6 +256,8 @@ public:
bool position_independent_encoding = true;
bool use_compact_variant_discriminators_serialization = false;
enum class DynamicStatisticsMode
{
NONE, /// Don't write statistics.

View File

@ -30,12 +30,18 @@ namespace ErrorCodes
struct SerializeBinaryBulkStateVariant : public ISerialization::SerializeBinaryBulkState
{
std::vector<ISerialization::SerializeBinaryBulkStatePtr> states;
explicit SerializeBinaryBulkStateVariant(UInt64 mode) : discriminators_mode(mode)
{
}
SerializationVariant::DiscriminatorsSerializationMode discriminators_mode;
std::vector<ISerialization::SerializeBinaryBulkStatePtr> variant_states;
};
struct DeserializeBinaryBulkStateVariant : public ISerialization::DeserializeBinaryBulkState
{
std::vector<ISerialization::DeserializeBinaryBulkStatePtr> states;
ISerialization::DeserializeBinaryBulkStatePtr discriminators_state;
std::vector<ISerialization::DeserializeBinaryBulkStatePtr> variant_states;
};
void SerializationVariant::enumerateStreams(
@ -71,7 +77,7 @@ void SerializationVariant::enumerateStreams(
.withType(type_variant ? type_variant->getVariant(i) : nullptr)
.withColumn(column_variant ? column_variant->getVariantPtrByGlobalDiscriminator(i) : nullptr)
.withSerializationInfo(data.serialization_info)
.withDeserializeState(variant_deserialize_state ? variant_deserialize_state->states[i] : nullptr);
.withDeserializeState(variant_deserialize_state ? variant_deserialize_state->variant_states[i] : nullptr);
addVariantElementToPath(settings.path, i);
settings.path.back().data = variant_data;
@ -87,17 +93,26 @@ void SerializationVariant::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
const ColumnVariant & col = assert_cast<const ColumnVariant &>(column);
settings.path.push_back(Substream::VariantDiscriminators);
auto * discriminators_stream = settings.getter(settings.path);
settings.path.pop_back();
auto variant_state = std::make_shared<SerializeBinaryBulkStateVariant>();
variant_state->states.resize(variants.size());
if (!discriminators_stream)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty stream for VariantDiscriminators in SerializationVariant::serializeBinaryBulkStatePrefix");
UInt64 mode = settings.use_compact_variant_discriminators_serialization ? DiscriminatorsSerializationMode::COMPACT : DiscriminatorsSerializationMode::BASIC;
writeBinaryLittleEndian(mode, *discriminators_stream);
const ColumnVariant & col = assert_cast<const ColumnVariant &>(column);
auto variant_state = std::make_shared<SerializeBinaryBulkStateVariant>(mode);
variant_state->variant_states.resize(variants.size());
settings.path.push_back(Substream::VariantElements);
for (size_t i = 0; i < variants.size(); ++i)
{
addVariantElementToPath(settings.path, i);
variants[i]->serializeBinaryBulkStatePrefix(col.getVariantByGlobalDiscriminator(i), settings, variant_state->states[i]);
variants[i]->serializeBinaryBulkStatePrefix(col.getVariantByGlobalDiscriminator(i), settings, variant_state->variant_states[i]);
settings.path.pop_back();
}
@ -116,7 +131,7 @@ void SerializationVariant::serializeBinaryBulkStateSuffix(
for (size_t i = 0; i < variants.size(); ++i)
{
addVariantElementToPath(settings.path, i);
variants[i]->serializeBinaryBulkStateSuffix(settings, variant_state->states[i]);
variants[i]->serializeBinaryBulkStateSuffix(settings, variant_state->variant_states[i]);
settings.path.pop_back();
}
settings.path.pop_back();
@ -128,14 +143,19 @@ void SerializationVariant::deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkStatePtr & state,
SubstreamsDeserializeStatesCache * cache) const
{
DeserializeBinaryBulkStatePtr discriminators_state = deserializeDiscriminatorsStatePrefix(settings, cache);
if (!discriminators_state)
return;
auto variant_state = std::make_shared<DeserializeBinaryBulkStateVariant>();
variant_state->states.resize(variants.size());
variant_state->discriminators_state = discriminators_state;
variant_state->variant_states.resize(variants.size());
settings.path.push_back(Substream::VariantElements);
for (size_t i = 0; i < variants.size(); ++i)
{
addVariantElementToPath(settings.path, i);
variants[i]->deserializeBinaryBulkStatePrefix(settings, variant_state->states[i], cache);
variants[i]->deserializeBinaryBulkStatePrefix(settings, variant_state->variant_states[i], cache);
settings.path.pop_back();
}
@ -143,6 +163,29 @@ void SerializationVariant::deserializeBinaryBulkStatePrefix(
state = std::move(variant_state);
}
ISerialization::DeserializeBinaryBulkStatePtr SerializationVariant::deserializeDiscriminatorsStatePrefix(
DeserializeBinaryBulkSettings & settings,
SubstreamsDeserializeStatesCache * cache)
{
settings.path.push_back(Substream::VariantDiscriminators);
DeserializeBinaryBulkStatePtr discriminators_state = nullptr;
if (auto cached_state = getFromSubstreamsDeserializeStatesCache(cache, settings.path))
{
discriminators_state = cached_state;
}
else if (auto * discriminators_stream = settings.getter(settings.path))
{
UInt64 mode;
readBinaryLittleEndian(mode, *discriminators_stream);
discriminators_state = std::make_shared<DeserializeBinaryBulkStateVariantDiscriminators>(mode);
addToSubstreamsDeserializeStatesCache(cache, settings.path, discriminators_state);
}
settings.path.pop_back();
return discriminators_state;
}
void SerializationVariant::serializeBinaryBulkWithMultipleStreamsAndUpdateVariantStatistics(
const IColumn & column,
@ -165,13 +208,71 @@ void SerializationVariant::serializeBinaryBulkWithMultipleStreamsAndUpdateVarian
auto * variant_state = checkAndGetState<SerializeBinaryBulkStateVariant>(state);
/// If offset = 0 and limit == col.size() or we have only NULLs, we don't need to calculate
/// Don't write anything if column is empty.
if (limit == 0)
return;
/// Write number of rows in this granule in compact mode.
if (variant_state->discriminators_mode.value == DiscriminatorsSerializationMode::COMPACT)
writeVarUInt(UInt64(limit), *discriminators_stream);
/// If column has only one none empty discriminators and no NULLs we don't need to
/// calculate limits for variants and use provided offset/limit.
if (auto non_empty_local_discr = col.getLocalDiscriminatorOfOneNoneEmptyVariantNoNulls())
{
auto non_empty_global_discr = col.globalDiscriminatorByLocal(*non_empty_local_discr);
/// In compact mode write the format of the granule and single non-empty discriminator.
if (variant_state->discriminators_mode.value == DiscriminatorsSerializationMode::COMPACT)
{
writeBinaryLittleEndian(UInt8(CompactDiscriminatorsGranuleFormat::COMPACT), *discriminators_stream);
writeBinaryLittleEndian(non_empty_global_discr, *discriminators_stream);
}
/// For basic mode just serialize this discriminator limit times.
else
{
for (size_t i = 0; i < limit; ++i)
writeBinaryLittleEndian(non_empty_global_discr, *discriminators_stream);
}
settings.path.push_back(Substream::VariantElements);
addVariantElementToPath(settings.path, non_empty_global_discr);
/// We can use the same offset/limit as for whole Variant column
variants[non_empty_global_discr]->serializeBinaryBulkWithMultipleStreams(col.getVariantByGlobalDiscriminator(non_empty_global_discr), offset, limit, settings, variant_state->variant_states[non_empty_global_discr]);
variants_statistics[variant_names[non_empty_global_discr]] += limit;
settings.path.pop_back();
settings.path.pop_back();
return;
}
/// If column has only NULLs, just serialize NULL discriminators.
else if (col.hasOnlyNulls())
{
/// In compact mode write single NULL_DISCRIMINATOR.
if (variant_state->discriminators_mode.value == DiscriminatorsSerializationMode::COMPACT)
{
writeBinaryLittleEndian(UInt8(CompactDiscriminatorsGranuleFormat::COMPACT), *discriminators_stream);
writeBinaryLittleEndian(ColumnVariant::NULL_DISCRIMINATOR, *discriminators_stream);
}
/// In basic mode write NULL_DISCRIMINATOR limit times.
else
{
for (size_t i = 0; i < limit; ++i)
writeBinaryLittleEndian(ColumnVariant::NULL_DISCRIMINATOR, *discriminators_stream);
}
return;
}
/// If offset = 0 and limit == col.size() we don't need to calculate
/// offsets and limits for variants and need to just serialize whole columns.
if ((offset == 0 && limit == col.size()) || col.hasOnlyNulls())
if ((offset == 0 && limit == col.size()))
{
/// First, serialize discriminators.
/// If we have only NULLs or local and global discriminators are the same, just serialize the column as is.
if (col.hasOnlyNulls() || col.hasGlobalVariantsOrder())
/// Here we are sure that column contains different discriminators, use plain granule format in compact mode.
if (variant_state->discriminators_mode.value == DiscriminatorsSerializationMode::COMPACT)
writeBinaryLittleEndian(UInt8(CompactDiscriminatorsGranuleFormat::PLAIN), *discriminators_stream);
/// If local and global discriminators are the same, just serialize the column as is.
if (col.hasGlobalVariantsOrder())
{
SerializationNumber<ColumnVariant::Discriminator>().serializeBinaryBulk(col.getLocalDiscriminatorsColumn(), *discriminators_stream, offset, limit);
}
@ -188,7 +289,7 @@ void SerializationVariant::serializeBinaryBulkWithMultipleStreamsAndUpdateVarian
for (size_t i = 0; i != variants.size(); ++i)
{
addVariantElementToPath(settings.path, i);
variants[i]->serializeBinaryBulkWithMultipleStreams(col.getVariantByGlobalDiscriminator(i), 0, 0, settings, variant_state->states[i]);
variants[i]->serializeBinaryBulkWithMultipleStreams(col.getVariantByGlobalDiscriminator(i), 0, 0, settings, variant_state->variant_states[i]);
variants_statistics[variant_names[i]] += col.getVariantByGlobalDiscriminator(i).size();
settings.path.pop_back();
}
@ -196,36 +297,16 @@ void SerializationVariant::serializeBinaryBulkWithMultipleStreamsAndUpdateVarian
return;
}
/// If we have only one non empty variant and no NULLs, we can use the same limit offset for this variant.
if (auto non_empty_local_discr = col.getLocalDiscriminatorOfOneNoneEmptyVariantNoNulls())
{
/// First, serialize discriminators.
/// We know that all discriminators are the same, so we just need to serialize this discriminator limit times.
auto non_empty_global_discr = col.globalDiscriminatorByLocal(*non_empty_local_discr);
for (size_t i = 0; i != limit; ++i)
writeBinaryLittleEndian(non_empty_global_discr, *discriminators_stream);
/// Second, serialize non-empty variant (other variants are empty and we can skip their serialization).
settings.path.push_back(Substream::VariantElements);
addVariantElementToPath(settings.path, non_empty_global_discr);
/// We can use the same offset/limit as for whole Variant column
variants[non_empty_global_discr]->serializeBinaryBulkWithMultipleStreams(col.getVariantByGlobalDiscriminator(non_empty_global_discr), offset, limit, settings, variant_state->states[non_empty_global_discr]);
variants_statistics[variant_names[non_empty_global_discr]] += limit;
settings.path.pop_back();
settings.path.pop_back();
return;
}
/// In general case we should iterate through local discriminators in range [offset, offset + limit] to serialize global discriminators and calculate offset/limit pair for each variant.
const auto & local_discriminators = col.getLocalDiscriminators();
const auto & offsets = col.getOffsets();
std::vector<std::pair<size_t, size_t>> variant_offsets_and_limits(variants.size(), {0, 0});
size_t end = offset + limit;
size_t num_non_empty_variants_in_range = 0;
ColumnVariant::Discriminator last_non_empty_variant_discr = 0;
for (size_t i = offset; i < end; ++i)
{
auto global_discr = col.globalDiscriminatorByLocal(local_discriminators[i]);
writeBinaryLittleEndian(global_discr, *discriminators_stream);
if (global_discr != ColumnVariant::NULL_DISCRIMINATOR)
{
/// If we see this discriminator for the first time, update offset
@ -233,9 +314,38 @@ void SerializationVariant::serializeBinaryBulkWithMultipleStreamsAndUpdateVarian
variant_offsets_and_limits[global_discr].first = offsets[i];
/// Update limit for this discriminator.
++variant_offsets_and_limits[global_discr].second;
++num_non_empty_variants_in_range;
last_non_empty_variant_discr = global_discr;
}
}
/// In basic mode just serialize discriminators as is row by row.
if (variant_state->discriminators_mode.value == DiscriminatorsSerializationMode::BASIC)
{
for (size_t i = offset; i < end; ++i)
writeBinaryLittleEndian(col.globalDiscriminatorByLocal(local_discriminators[i]), *discriminators_stream);
}
/// In compact mode check if we have the same discriminator for all rows in this granule.
/// First, check if all values in granule are NULLs.
else if (num_non_empty_variants_in_range == 0)
{
writeBinaryLittleEndian(UInt8(CompactDiscriminatorsGranuleFormat::COMPACT), *discriminators_stream);
writeBinaryLittleEndian(ColumnVariant::NULL_DISCRIMINATOR, *discriminators_stream);
}
/// Then, check if there is only 1 variant and no NULLs in this granule.
else if (num_non_empty_variants_in_range == 1 && variant_offsets_and_limits[last_non_empty_variant_discr].second == limit)
{
writeBinaryLittleEndian(UInt8(CompactDiscriminatorsGranuleFormat::COMPACT), *discriminators_stream);
writeBinaryLittleEndian(last_non_empty_variant_discr, *discriminators_stream);
}
/// Otherwise there are different discriminators in this granule.
else
{
writeBinaryLittleEndian(UInt8(CompactDiscriminatorsGranuleFormat::PLAIN), *discriminators_stream);
for (size_t i = offset; i < end; ++i)
writeBinaryLittleEndian(col.globalDiscriminatorByLocal(local_discriminators[i]), *discriminators_stream);
}
/// Serialize variants in global order.
settings.path.push_back(Substream::VariantElements);
for (size_t i = 0; i != variants.size(); ++i)
@ -249,7 +359,7 @@ void SerializationVariant::serializeBinaryBulkWithMultipleStreamsAndUpdateVarian
variant_offsets_and_limits[i].first,
variant_offsets_and_limits[i].second,
settings,
variant_state->states[i]);
variant_state->variant_states[i]);
variants_statistics[variant_names[i]] += variant_offsets_and_limits[i].second;
settings.path.pop_back();
}
@ -284,39 +394,68 @@ void SerializationVariant::deserializeBinaryBulkWithMultipleStreams(
/// First, deserialize discriminators.
settings.path.push_back(Substream::VariantDiscriminators);
DeserializeBinaryBulkStateVariant * variant_state = nullptr;
std::vector<size_t> variant_limits;
if (auto cached_discriminators = getFromSubstreamsCache(cache, settings.path))
{
variant_state = checkAndGetState<DeserializeBinaryBulkStateVariant>(state);
col.getLocalDiscriminatorsPtr() = cached_discriminators;
}
else
else if (auto * discriminators_stream = settings.getter(settings.path))
{
auto * discriminators_stream = settings.getter(settings.path);
if (!discriminators_stream)
return;
variant_state = checkAndGetState<DeserializeBinaryBulkStateVariant>(state);
auto * discriminators_state = checkAndGetState<DeserializeBinaryBulkStateVariantDiscriminators>(variant_state->discriminators_state);
/// Deserialize discriminators according to serialization mode.
if (discriminators_state->mode.value == DiscriminatorsSerializationMode::BASIC)
SerializationNumber<ColumnVariant::Discriminator>().deserializeBinaryBulk(*col.getLocalDiscriminatorsPtr()->assumeMutable(), *discriminators_stream, limit, 0);
else
variant_limits = deserializeCompactDiscriminators(col.getLocalDiscriminatorsPtr(), limit, discriminators_stream, settings.continuous_reading, *discriminators_state);
SerializationNumber<ColumnVariant::Discriminator>().deserializeBinaryBulk(*col.getLocalDiscriminatorsPtr()->assumeMutable(), *discriminators_stream, limit, 0);
addToSubstreamsCache(cache, settings.path, col.getLocalDiscriminatorsPtr());
}
/// It may happen that there is no such stream, in this case just do nothing.
else
{
settings.path.pop_back();
return;
}
settings.path.pop_back();
/// Second, calculate limits for each variant by iterating through new discriminators.
std::vector<size_t> variant_limits(variants.size(), 0);
auto & discriminators_data = col.getLocalDiscriminators();
size_t discriminators_offset = discriminators_data.size() - limit;
for (size_t i = discriminators_offset; i != discriminators_data.size(); ++i)
/// Second, calculate limits for each variant by iterating through new discriminators
/// if we didn't do it during discriminators deserialization.
if (variant_limits.empty())
{
ColumnVariant::Discriminator discr = discriminators_data[i];
if (discr != ColumnVariant::NULL_DISCRIMINATOR)
++variant_limits[discr];
variant_limits.resize(variants.size(), 0);
auto & discriminators_data = col.getLocalDiscriminators();
/// We can actually read less than limit discriminators and we cannot determine the actual number of read rows
/// by discriminators column as it could be taken from the substreams cache. And we need actual number of read
/// rows to fill offsets correctly later if they are not in the cache. We can determine if offsets column is in cache
/// or not by comparing it with discriminators column size (they should be the same when offsets are in cache).
/// If offsets are not in the cache, we can use it's size to determine the actual number of read rows.
size_t num_new_discriminators = limit;
size_t offsets_size = col.getOffsetsPtr()->size();
if (discriminators_data.size() > offsets_size)
num_new_discriminators = discriminators_data.size() - offsets_size;
size_t discriminators_offset = discriminators_data.size() - num_new_discriminators;
for (size_t i = discriminators_offset; i != discriminators_data.size(); ++i)
{
ColumnVariant::Discriminator discr = discriminators_data[i];
if (discr != ColumnVariant::NULL_DISCRIMINATOR)
++variant_limits[discr];
}
}
/// Now we can deserialize variants according to their limits.
auto * variant_state = checkAndGetState<DeserializeBinaryBulkStateVariant>(state);
settings.path.push_back(Substream::VariantElements);
for (size_t i = 0; i != variants.size(); ++i)
{
addVariantElementToPath(settings.path, i);
variants[i]->deserializeBinaryBulkWithMultipleStreams(col.getVariantPtrByLocalDiscriminator(i), variant_limits[i], settings, variant_state->states[i], cache);
variants[i]->deserializeBinaryBulkWithMultipleStreams(col.getVariantPtrByLocalDiscriminator(i), variant_limits[i], settings, variant_state->variant_states[i], cache);
settings.path.pop_back();
}
settings.path.pop_back();
@ -336,20 +475,49 @@ void SerializationVariant::deserializeBinaryBulkWithMultipleStreams(
}
else
{
auto & offsets = col.getOffsets();
offsets.reserve(offsets.size() + limit);
std::vector<size_t> variant_offsets;
variant_offsets.reserve(variants.size());
size_t num_non_empty_variants = 0;
ColumnVariant::Discriminator last_non_empty_discr = 0;
for (size_t i = 0; i != variants.size(); ++i)
variant_offsets.push_back(col.getVariantByLocalDiscriminator(i).size() - variant_limits[i]);
for (size_t i = discriminators_offset; i != discriminators_data.size(); ++i)
{
ColumnVariant::Discriminator discr = discriminators_data[i];
if (discr == ColumnVariant::NULL_DISCRIMINATOR)
offsets.emplace_back();
else
offsets.push_back(variant_offsets[discr]++);
if (variant_limits[i])
{
++num_non_empty_variants;
last_non_empty_discr = i;
}
variant_offsets.push_back(col.getVariantByLocalDiscriminator(i).size() - variant_limits[i]);
}
auto & discriminators_data = col.getLocalDiscriminators();
auto & offsets = col.getOffsets();
size_t num_new_offsets = discriminators_data.size() - offsets.size();
offsets.reserve(offsets.size() + num_new_offsets);
/// If there are only NULLs were read, fill offsets with 0.
if (num_non_empty_variants == 0)
{
offsets.resize_fill(discriminators_data.size(), 0);
}
/// If there is only 1 variant and no NULLs was read, fill offsets with sequential offsets of this variant.
else if (num_non_empty_variants == 1 && variant_limits[last_non_empty_discr] == num_new_offsets)
{
size_t first_offset = col.getVariantByLocalDiscriminator(last_non_empty_discr).size() - num_new_offsets;
for (size_t i = 0; i != num_new_offsets; ++i)
offsets.push_back(first_offset + i);
}
/// Otherwise iterate through discriminators and fill offsets accordingly.
else
{
size_t start = offsets.size();
for (size_t i = start; i != discriminators_data.size(); ++i)
{
ColumnVariant::Discriminator discr = discriminators_data[i];
if (discr == ColumnVariant::NULL_DISCRIMINATOR)
offsets.emplace_back();
else
offsets.push_back(variant_offsets[discr]++);
}
}
addToSubstreamsCache(cache, settings.path, col.getOffsetsPtr());
@ -357,6 +525,72 @@ void SerializationVariant::deserializeBinaryBulkWithMultipleStreams(
settings.path.pop_back();
}
std::vector<size_t> SerializationVariant::deserializeCompactDiscriminators(
DB::ColumnPtr & discriminators_column,
size_t limit,
ReadBuffer * stream,
bool continuous_reading,
DeserializeBinaryBulkStateVariantDiscriminators & state) const
{
auto & discriminators = assert_cast<ColumnVariant::ColumnDiscriminators &>(*discriminators_column->assumeMutable());
auto & discriminators_data = discriminators.getData();
/// Reset state if we are reading from the start of the granule and not from the previous position in the file.
if (!continuous_reading)
state.remaining_rows_in_granule = 0;
/// Calculate limits for variants during discriminators deserialization.
std::vector<size_t> variant_limits(variants.size(), 0);
while (limit)
{
/// If we read all rows from current granule, start reading the next one.
if (state.remaining_rows_in_granule == 0)
{
if (stream->eof())
return variant_limits;
readDiscriminatorsGranuleStart(state, stream);
}
size_t limit_in_granule = std::min(limit, state.remaining_rows_in_granule);
if (state.granule_format == CompactDiscriminatorsGranuleFormat::COMPACT)
{
auto & data = discriminators.getData();
data.resize_fill(data.size() + limit_in_granule, state.compact_discr);
if (state.compact_discr != ColumnVariant::NULL_DISCRIMINATOR)
variant_limits[state.compact_discr] += limit_in_granule;
}
else
{
SerializationNumber<ColumnVariant::Discriminator>().deserializeBinaryBulk(discriminators, *stream, limit_in_granule, 0);
size_t start = discriminators_data.size() - limit_in_granule;
for (size_t i = start; i != discriminators_data.size(); ++i)
{
ColumnVariant::Discriminator discr = discriminators_data[i];
if (discr != ColumnVariant::NULL_DISCRIMINATOR)
++variant_limits[discr];
}
}
state.remaining_rows_in_granule -= limit_in_granule;
limit -= limit_in_granule;
}
return variant_limits;
}
void SerializationVariant::readDiscriminatorsGranuleStart(DeserializeBinaryBulkStateVariantDiscriminators & state, DB::ReadBuffer * stream)
{
UInt64 granule_size;
readVarUInt(granule_size, *stream);
state.remaining_rows_in_granule = granule_size;
UInt8 granule_format;
readBinaryLittleEndian(granule_format, *stream);
state.granule_format = static_cast<CompactDiscriminatorsGranuleFormat>(granule_format);
if (granule_format == CompactDiscriminatorsGranuleFormat::COMPACT)
readBinaryLittleEndian(state.compact_discr, *stream);
}
void SerializationVariant::addVariantElementToPath(DB::ISerialization::SubstreamPath & path, size_t i) const
{
path.push_back(Substream::VariantElement);

View File

@ -6,6 +6,13 @@
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
/// Class for serializing/deserializing column with Variant type.
/// It supports both text and binary bulk serializations/deserializations.
///
@ -18,6 +25,17 @@ namespace DB
///
/// During binary bulk serialization it transforms local discriminators
/// to global and serializes them into a separate stream VariantDiscriminators.
/// There are 2 modes of serialising discriminators:
/// Basic mode, when all discriminators are serialized as is row by row.
/// Compact mode, when we avoid writing the same discriminators in granules when there is
/// only one variant (or only NULLs) in the granule.
/// In compact mode we serialize granules in the following format:
/// <number of rows in granule><granule format><granule data>
/// There are 2 different formats of granule - plain and compact.
/// Plain format is used when there are different discriminators in this granule,
/// in this format all discriminators are serialized as is row by row.
/// Compact format is used when all discriminators are the same in this granule,
/// in this case only this single discriminator is serialized.
/// Each variant is serialized into a separate stream with path VariantElements/VariantElement
/// (VariantElements stream is needed for correct sub-columns creation). We store and serialize
/// variants in a sparse form (the size of a variant column equals to the number of its discriminator
@ -32,6 +50,25 @@ namespace DB
class SerializationVariant : public ISerialization
{
public:
struct DiscriminatorsSerializationMode
{
enum Value
{
BASIC = 0, /// Store the whole discriminators column.
COMPACT = 1, /// Don't write discriminators in granule if all of them are the same.
};
static void checkMode(UInt64 mode)
{
if (mode > Value::COMPACT)
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid version for SerializationVariant discriminators column.");
}
explicit DiscriminatorsSerializationMode(UInt64 mode) : value(static_cast<Value>(mode)) { checkMode(mode); }
Value value;
};
using VariantSerializations = std::vector<SerializationPtr>;
explicit SerializationVariant(
@ -123,8 +160,43 @@ public:
static std::vector<size_t> getVariantsDeserializeTextOrder(const DataTypes & variant_types);
private:
friend SerializationVariantElement;
void addVariantElementToPath(SubstreamPath & path, size_t i) const;
enum CompactDiscriminatorsGranuleFormat
{
PLAIN = 0, /// Granule has different discriminators and they are serialized as is row by row.
COMPACT = 1, /// Granule has single discriminator for all rows and it is serialized as single value.
};
struct DeserializeBinaryBulkStateVariantDiscriminators : public ISerialization::DeserializeBinaryBulkState
{
explicit DeserializeBinaryBulkStateVariantDiscriminators(UInt64 mode_) : mode(mode_)
{
}
DiscriminatorsSerializationMode mode;
/// Deserialize state of currently read granule in compact mode.
CompactDiscriminatorsGranuleFormat granule_format = CompactDiscriminatorsGranuleFormat::PLAIN;
size_t remaining_rows_in_granule = 0;
ColumnVariant::Discriminator compact_discr = 0;
};
static DeserializeBinaryBulkStatePtr deserializeDiscriminatorsStatePrefix(
DeserializeBinaryBulkSettings & settings,
SubstreamsDeserializeStatesCache * cache);
std::vector<size_t> deserializeCompactDiscriminators(
ColumnPtr & discriminators_column,
size_t limit,
ReadBuffer * stream,
bool continuous_reading,
DeserializeBinaryBulkStateVariantDiscriminators & state) const;
static void readDiscriminatorsGranuleStart(DeserializeBinaryBulkStateVariantDiscriminators & state, ReadBuffer * stream);
bool tryDeserializeTextEscapedImpl(IColumn & column, const String & field, const FormatSettings & settings) const;
bool tryDeserializeTextQuotedImpl(IColumn & column, const String & field, const FormatSettings & settings) const;
bool tryDeserializeWholeTextImpl(IColumn & column, const String & field, const FormatSettings & settings) const;

View File

@ -1,5 +1,6 @@
#include <DataTypes/Serializations/SerializationVariantElement.h>
#include <DataTypes/Serializations/SerializationNumber.h>
#include <DataTypes/Serializations/SerializationVariant.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnNullable.h>
#include <IO/ReadHelpers.h>
@ -12,7 +13,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
struct DeserializeBinaryBulkStateVariantElement : public ISerialization::DeserializeBinaryBulkState
struct SerializationVariantElement::DeserializeBinaryBulkStateVariantElement : public ISerialization::DeserializeBinaryBulkState
{
/// During deserialization discriminators and variant streams can be shared.
/// For example we can read several variant elements together: "select v.UInt32, v.String from table",
@ -24,7 +25,7 @@ struct DeserializeBinaryBulkStateVariantElement : public ISerialization::Deseria
/// substream cache correctly.
ColumnPtr discriminators;
ColumnPtr variant;
ISerialization::DeserializeBinaryBulkStatePtr discriminators_state;
ISerialization::DeserializeBinaryBulkStatePtr variant_element_state;
};
@ -65,7 +66,12 @@ void SerializationVariantElement::serializeBinaryBulkStateSuffix(SerializeBinary
void SerializationVariantElement::deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & settings, DeserializeBinaryBulkStatePtr & state, SubstreamsDeserializeStatesCache * cache) const
{
DeserializeBinaryBulkStatePtr discriminators_state = SerializationVariant::deserializeDiscriminatorsStatePrefix(settings, cache);
if (!discriminators_state)
return;
auto variant_element_state = std::make_shared<DeserializeBinaryBulkStateVariantElement>();
variant_element_state->discriminators_state = discriminators_state;
addVariantToPath(settings.path);
nested_serialization->deserializeBinaryBulkStatePrefix(settings, variant_element_state->variant_element_state, cache);
@ -86,35 +92,54 @@ void SerializationVariantElement::deserializeBinaryBulkWithMultipleStreams(
DeserializeBinaryBulkStatePtr & state,
SubstreamsCache * cache) const
{
auto * variant_element_state = checkAndGetState<DeserializeBinaryBulkStateVariantElement>(state);
/// First, deserialize discriminators from Variant column.
settings.path.push_back(Substream::VariantDiscriminators);
DeserializeBinaryBulkStateVariantElement * variant_element_state = nullptr;
std::optional<size_t> variant_limit;
if (auto cached_discriminators = getFromSubstreamsCache(cache, settings.path))
{
variant_element_state = checkAndGetState<DeserializeBinaryBulkStateVariantElement>(state);
variant_element_state->discriminators = cached_discriminators;
}
else
else if (auto * discriminators_stream = settings.getter(settings.path))
{
auto * discriminators_stream = settings.getter(settings.path);
if (!discriminators_stream)
return;
variant_element_state = checkAndGetState<DeserializeBinaryBulkStateVariantElement>(state);
auto * discriminators_state = checkAndGetState<SerializationVariant::DeserializeBinaryBulkStateVariantDiscriminators>(variant_element_state->discriminators_state);
/// If we started to read a new column, reinitialize discriminators column in deserialization state.
if (!variant_element_state->discriminators || result_column->empty())
variant_element_state->discriminators = ColumnVariant::ColumnDiscriminators::create();
SerializationNumber<ColumnVariant::Discriminator>().deserializeBinaryBulk(*variant_element_state->discriminators->assumeMutable(), *discriminators_stream, limit, 0);
/// Deserialize discriminators according to serialization mode.
if (discriminators_state->mode.value == SerializationVariant::DiscriminatorsSerializationMode::BASIC)
SerializationNumber<ColumnVariant::Discriminator>().deserializeBinaryBulk(*variant_element_state->discriminators->assumeMutable(), *discriminators_stream, limit, 0);
else
variant_limit = deserializeCompactDiscriminators(variant_element_state->discriminators, limit, discriminators_stream, settings.continuous_reading, *variant_element_state);
addToSubstreamsCache(cache, settings.path, variant_element_state->discriminators);
}
else
{
settings.path.pop_back();
return;
}
settings.path.pop_back();
/// Iterate through new discriminators to calculate the limit for our variant.
/// We could read less than limit discriminators, but we will need actual number of read rows later.
size_t num_new_discriminators = variant_element_state->discriminators->size() - result_column->size();
/// Iterate through new discriminators to calculate the limit for our variant
/// if we didn't do it during discriminators deserialization.
const auto & discriminators_data = assert_cast<const ColumnVariant::ColumnDiscriminators &>(*variant_element_state->discriminators).getData();
size_t discriminators_offset = variant_element_state->discriminators->size() - limit;
size_t variant_limit = 0;
for (size_t i = discriminators_offset; i != discriminators_data.size(); ++i)
variant_limit += (discriminators_data[i] == variant_discriminator);
size_t discriminators_offset = variant_element_state->discriminators->size() - num_new_discriminators;
if (!variant_limit)
{
variant_limit = 0;
for (size_t i = discriminators_offset; i != discriminators_data.size(); ++i)
*variant_limit += (discriminators_data[i] == variant_discriminator);
}
/// Now we know the limit for our variant and can deserialize it.
@ -125,19 +150,19 @@ void SerializationVariantElement::deserializeBinaryBulkWithMultipleStreams(
auto & nullable_column = assert_cast<ColumnNullable &>(*mutable_column);
NullMap & null_map = nullable_column.getNullMapData();
/// If we have only our discriminator in range, fill null map with 0.
if (variant_limit == limit)
if (variant_limit == num_new_discriminators)
{
null_map.resize_fill(null_map.size() + limit, 0);
null_map.resize_fill(null_map.size() + num_new_discriminators, 0);
}
/// If no our discriminator in current range, fill null map with 1.
else if (variant_limit == 0)
{
null_map.resize_fill(null_map.size() + limit, 1);
null_map.resize_fill(null_map.size() + num_new_discriminators, 1);
}
/// Otherwise we should iterate through discriminators to fill null map.
else
{
null_map.reserve(null_map.size() + limit);
null_map.reserve(null_map.size() + num_new_discriminators);
for (size_t i = discriminators_offset; i != discriminators_data.size(); ++i)
null_map.push_back(discriminators_data[i] != variant_discriminator);
}
@ -159,12 +184,12 @@ void SerializationVariantElement::deserializeBinaryBulkWithMultipleStreams(
/// If nothing to deserialize, just insert defaults.
if (variant_limit == 0)
{
mutable_column->insertManyDefaults(limit);
mutable_column->insertManyDefaults(num_new_discriminators);
return;
}
addVariantToPath(settings.path);
nested_serialization->deserializeBinaryBulkWithMultipleStreams(variant_element_state->variant, variant_limit, settings, variant_element_state->variant_element_state, cache);
nested_serialization->deserializeBinaryBulkWithMultipleStreams(variant_element_state->variant, *variant_limit, settings, variant_element_state->variant_element_state, cache);
removeVariantFromPath(settings.path);
/// If nothing was deserialized when variant_limit > 0
@ -173,16 +198,16 @@ void SerializationVariantElement::deserializeBinaryBulkWithMultipleStreams(
/// In this case we should just insert default values.
if (variant_element_state->variant->empty())
{
mutable_column->insertManyDefaults(limit);
mutable_column->insertManyDefaults(num_new_discriminators);
return;
}
size_t variant_offset = variant_element_state->variant->size() - variant_limit;
size_t variant_offset = variant_element_state->variant->size() - *variant_limit;
/// If we have only our discriminator in range, insert the whole range to result column.
if (variant_limit == limit)
if (variant_limit == num_new_discriminators)
{
mutable_column->insertRangeFrom(*variant_element_state->variant, variant_offset, variant_limit);
mutable_column->insertRangeFrom(*variant_element_state->variant, variant_offset, *variant_limit);
}
/// Otherwise iterate through discriminators and insert value from variant or default value depending on the discriminator.
else
@ -197,6 +222,57 @@ void SerializationVariantElement::deserializeBinaryBulkWithMultipleStreams(
}
}
size_t SerializationVariantElement::deserializeCompactDiscriminators(
DB::ColumnPtr & discriminators_column,
size_t limit,
DB::ReadBuffer * stream,
bool continuous_reading,
DeserializeBinaryBulkStateVariantElement & variant_element_state) const
{
auto * discriminators_state = checkAndGetState<SerializationVariant::DeserializeBinaryBulkStateVariantDiscriminators>(variant_element_state.discriminators_state);
auto & discriminators = assert_cast<ColumnVariant::ColumnDiscriminators &>(*discriminators_column->assumeMutable());
auto & discriminators_data = discriminators.getData();
/// Reset state if we are reading from the start of the granule and not from the previous position in the file.
if (!continuous_reading)
discriminators_state->remaining_rows_in_granule = 0;
/// Calculate our variant limit during discriminators deserialization.
size_t variant_limit = 0;
while (limit)
{
/// If we read all rows from current granule, start reading the next one.
if (discriminators_state->remaining_rows_in_granule == 0)
{
if (stream->eof())
return variant_limit;
SerializationVariant::readDiscriminatorsGranuleStart(*discriminators_state, stream);
}
size_t limit_in_granule = std::min(limit, discriminators_state->remaining_rows_in_granule);
if (discriminators_state->granule_format == SerializationVariant::CompactDiscriminatorsGranuleFormat::COMPACT)
{
auto & data = discriminators.getData();
data.resize_fill(data.size() + limit_in_granule, discriminators_state->compact_discr);
if (discriminators_state->compact_discr == variant_discriminator)
variant_limit += limit_in_granule;
}
else
{
SerializationNumber<ColumnVariant::Discriminator>().deserializeBinaryBulk(discriminators, *stream, limit_in_granule, 0);
size_t start = discriminators_data.size() - limit_in_granule;
for (size_t i = start; i != discriminators_data.size(); ++i)
variant_limit += (discriminators_data[i] == variant_discriminator);
}
discriminators_state->remaining_rows_in_granule -= limit_in_granule;
limit -= limit_in_granule;
}
return variant_limit;
}
void SerializationVariantElement::addVariantToPath(DB::ISerialization::SubstreamPath & path) const
{
path.push_back(Substream::VariantElements);

View File

@ -80,6 +80,15 @@ public:
private:
friend SerializationVariant;
struct DeserializeBinaryBulkStateVariantElement;
size_t deserializeCompactDiscriminators(
ColumnPtr & discriminators_column,
size_t limit,
ReadBuffer * stream,
bool continuous_reading,
DeserializeBinaryBulkStateVariantElement & variant_element_state) const;
void addVariantToPath(SubstreamPath & path) const;
void removeVariantFromPath(SubstreamPath & path) const;
};

View File

@ -154,7 +154,8 @@ void writeColumnSingleGranule(
const SerializationPtr & serialization,
ISerialization::OutputStreamGetter stream_getter,
size_t from_row,
size_t number_of_rows)
size_t number_of_rows,
const MergeTreeWriterSettings & settings)
{
ISerialization::SerializeBinaryBulkStatePtr state;
ISerialization::SerializeBinaryBulkSettings serialize_settings;
@ -162,6 +163,7 @@ void writeColumnSingleGranule(
serialize_settings.getter = stream_getter;
serialize_settings.position_independent_encoding = true;
serialize_settings.low_cardinality_max_dictionary_size = 0;
serialize_settings.use_compact_variant_discriminators_serialization = settings.use_compact_variant_discriminators_serialization;
serialize_settings.dynamic_write_statistics = ISerialization::SerializeBinaryBulkSettings::DynamicStatisticsMode::PREFIX;
serialization->serializeBinaryBulkStatePrefix(*column.column, serialize_settings, state);
@ -259,7 +261,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
writeColumnSingleGranule(
block.getByName(name_and_type->name), getSerialization(name_and_type->name),
stream_getter, granule.start_row, granule.rows_to_write);
stream_getter, granule.start_row, granule.rows_to_write, settings);
/// Each type always have at least one substream
prev_stream->hashing_buf.next();

View File

@ -433,6 +433,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
if (inserted)
{
ISerialization::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.use_compact_variant_discriminators_serialization = settings.use_compact_variant_discriminators_serialization;
serialize_settings.getter = createStreamGetter(name_and_type, offset_columns);
serialization->serializeBinaryBulkStatePrefix(column, serialize_settings, it->second);
}
@ -441,6 +442,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
serialize_settings.getter = createStreamGetter(name_and_type, offset_columns);
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part;
serialize_settings.use_compact_variant_discriminators_serialization = settings.use_compact_variant_discriminators_serialization;
for (const auto & granule : granules)
{
@ -630,6 +632,7 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(MergeTreeDataPartChecksums &
ISerialization::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part;
serialize_settings.use_compact_variant_discriminators_serialization = settings.use_compact_variant_discriminators_serialization;
WrittenOffsetColumns offset_columns;
if (rows_written_in_last_mark > 0)
{

View File

@ -76,6 +76,7 @@ struct MergeTreeWriterSettings
, max_threads_for_annoy_index_creation(global_settings.max_threads_for_annoy_index_creation)
, low_cardinality_max_dictionary_size(global_settings.low_cardinality_max_dictionary_size)
, low_cardinality_use_single_dictionary_for_part(global_settings.low_cardinality_use_single_dictionary_for_part != 0)
, use_compact_variant_discriminators_serialization(storage_settings->use_compact_variant_discriminators_serialization)
{
}
@ -98,6 +99,7 @@ struct MergeTreeWriterSettings
size_t low_cardinality_max_dictionary_size;
bool low_cardinality_use_single_dictionary_for_part;
bool use_compact_variant_discriminators_serialization;
};
}

View File

@ -43,6 +43,7 @@ struct Settings;
M(UInt64, compact_parts_max_granules_to_buffer, 128, "Only available in ClickHouse Cloud", 0) \
M(UInt64, compact_parts_merge_max_bytes_to_prefetch_part, 16 * 1024 * 1024, "Only available in ClickHouse Cloud", 0) \
M(Bool, load_existing_rows_count_for_old_parts, false, "Whether to load existing_rows_count for existing parts. If false, existing_rows_count will be equal to rows_count for existing parts.", 0) \
M(Bool, use_compact_variant_discriminators_serialization, true, "Use compact version of Variant discriminators serialization.", 0) \
\
/** Merge settings. */ \
M(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \